查看原文
其他

Flink SQL 在米哈游的平台建设和应用实践

张剑@米哈游 Apache Flink 2023-05-01

摘要:本文整理自米哈游大数据实时计算团队负责人张剑,在 FFA 行业案例专场的分享。本篇内容主要分为三个部分:


  1. 发展历程

  2. 平台建设

  3. 未来展望


Tips:点击「阅读原文」查看原文视频&演讲 ppt

01

发展历程


随着公司业务的发展,实时计算需求应运而生。我们根据重点的工作内容将发展阶段划分为三个部分:


  • 第一阶段是以 DataStream API 开发为主的 Flink 平台


  • 第二个阶段是以 Flink SQL 为主一站式开发平台


  • 第三阶段是一站式开发平台的功能深化和场景覆盖


第一阶段,以 DataStream API 开发为主的 Flink 平台,很好的解决了我们对于实时计算的需求。但随着开发同学越来越多,大家发现基于 DataStream API 开发为主的实时计算平台,具有三个弊端,分别是开发成本高、版本易冲突、运维难度大,因此大家对 Flink SQL 的呼声就越来越高。


第二阶段,以 Flink SQL 为主一站式开发平台。主要的工作内容有:Flink SQL 能力提升、指标和日志体系建设、元数据和血缘管理。基于此,业务人员有了新的期望。


  • 第一,希望平台能够更加智能化,降低用户的使用调参、调优等成本


  • 第二,希望流量的波动能够具有自动扩缩容的资源管理能力


  • 第三,希望数据更具时效性。比如数据入仓、入湖后分钟级可查,或者基于近实时数仓开发


第三阶段,一站式开发平台功能深化和场景覆盖。主要的工作和未来要持续做的工作包含如下几个方面:


  • 第一,任务资源的静态和动态调优能力


  • 第二,资源的弹性扩缩容能力


  • 第三,加强近实时数仓的建设


下面我们进入平台的整体架构,从下图中可以看到平台总体包含三个部分,分别是用户权限及鉴权、功能和服务模块、以及环境和资源功能。


功能和服务主要包含作业大盘、概览、开发、运维、日志、元数据、血缘、监控告警、资源调优、自动扩缩容、弹性资源管理以及安全管控等。


02

平台建设


那么基于这样的实时计算平台,我们是如何建设的呢?围绕 Flink SQL 或者平台化的主要工作有如下四个方面:


  • 第一,语义表达和控制能力的建设


  • 第二,资源调优和弹性能力的建设


  • 第三,指标体系建设


  • 第四,近实时数仓建设


截止目前,Flink SQL 占比总任务数已经在 90%以上,极大的提高了大家的开发效率。下面我们将对每一个部分进行详细的讲解,来看一看具体都是怎么做的。


DataStream API 相较于 Flink SQL 有如下几个优点:

  • 第一,算子并行度和传输方式可控

  • 第二,执行图直观易于理解

  • 第三,状态保存时间可以分别设置。

但在转变到 SQL 的时候,会产生一些问题。基于此,我们举个例子,来看一看为什么算子并行度和传输方式不可控了。

比如用户定义了一个 UDF 函数,用来处理 Kafka 数据源的某一个日志,然后将这个处理后的数据写入下游的 MySQL 或者其他存储。我们假定 Kafka 某一个 Topic 分区有 10 个,整个任务的并行度设置为 20。这个时候就会发现,UDF 实际只会处理 10 个并行度的数据。Flink SQL 需要怎样才能拓展呢?


针对这种情况,我们当前的解决方案是提供对执行图编辑的功能,按照编辑结果同 SQL 一起保存。如下图所示,有三个 Operator,Data Source Operator 的 ID=1,UDF Operator 的 ID=2,Data Sink Operator 的 ID=3。


在这个过程中,将整个作业的并行度设为 20,Source 源 Operator1 的并行度设置为 10,1 和 2 之间的传输方式设为 rescale。然后在后端接收到后,同步将 Job Graph 进行修改,就会得到如下的执行图,用户就能够比较好的解决掉这个问题了。


对于这个问题,未来我们的改进思路是通过 SQL 利用 Hint 功能来实现,或者更加智能化一点,根据作业指标信息,自动探测反压节点,自动化设置,来降低用户的使用成本。


对于 Create View 逻辑视图的含义是指什么呢?我也用一个案例来加以说明。从下图可以看到,用户自定义了一个 UDF 函数模拟了一个数据源。我们将这个数据进行解析,创建 Create View,比如叫 Row Table,然后向下游两个目标表 SinkTable1 和 SinkTable2 写入。最后看执行图,会发现 UDF 函数被执行了两次。


目前我们针对这一个问题收集并提供了一些解决方案。但在提供解决方案之前,我想先阐述一下这个问题产生的原因。Flink SQL 利用 Apache calcite 进行 SQL 语法解析,然后将解析后的 SQL 转换成一个语法树,经过 Flink Planner 生成 RealNode,经过 Optimizer Rule 进入 Codegen 环节。之后实际代码会有一个 Physical Plan 的过程,经过 Optimizer 形成 Steam Graph,然后转化成 Job Graph,最终转化成 Execution Graph。


那么 View 是在哪一层级丢失的呢?其实是在 Apache calcite 语法解析的时候,View 它只是一个逻辑辅助,在这一过程会将其丢弃。那么我们如何让 View 这一信息被底层感知到呢?


主要有两个办法:


  • 办法一是 SQL 解析的时候不丢失 View 信息


  • 办法二是在 RealNode 到 Optimizer Rule 能够识别到 View 的特征信息,这样就可以把 View 当成一个真正的代码去翻译了


办法一是一个非常好的解决办法,但是需要对 Apache calcite 进行很多改动,实现难度比较大,成本也比较高,所以采用了办法二。最终的方案是采用识别特定函数实现,内置了一个 breakpoint 函数。在创建 View 的时候可以同时多 select 一个 breakpoint,这样在底层翻译的时候,就可以把它当成一个真正的 RealNode 处理。这个问题,未来我们是也是希望通过 SQL 利用 Hint 功能来实现。


对于状态的保存时间方面我们要怎么处理呢?以数据流关联 MySQL 分库分表的数据举例。常见的解决方案是利用 Flink CDC 将 MySQL 中的分库分表数据,抽取写入下游的 KV 存储中,然后再通过另一个 Flink SQL 任务接入 Kafka 关联,用时态表 Join 的方式将数据打宽,最终输出结果。


这一过程可能会有两个问题。第一,引入 HBase,我们的任务就会从一个拆分成两个。其次需要假定下面这条链路的速度快于流的速度,否则上面 Topic 的数据到达的时候,而维表的数据还没到达就关联不上。那么怎样去解决这个问题,也是我们思考的地方。


我们采用的方案是用 Flink SQL+CDC+Regular Join 的方式来实现。接入还是一样消费 Kafka,通过 CDC 来消费数据库分库分表的数据,最后通过正常的 Regular Join 来实现。


这里的 Regular join 底层同时依赖两个 MapState,比如 Topic A 对应 MapState 是 A,MySQL 里的数据库的数据对应的是 B。如果我们能轻易的将 MapState B 的状态设置为 0 或者不过期,那么这个状态的数据就会被永久的保存下来。即使流的数据先到达了,后面状态数据到达也能触发数据的关联,从而比较好的解决这类问题。


具体的解决办法是,我们可以在 Flink SQL 中指定左右流 Join 的状态时间,在 Graph 中识别有 Join 的算子,最终透传到 Join 算子做状态时间的设置。


任务开发完成,需要多少资源呢?线上流量波动,出现延迟怎么办?任务越来越多或任务并发调整,资源不足怎么办?


针对这些问题,我们对应的解决办法主要包含:静态资源调优、动态资源调优及扩缩容、资源弹性能力的建设。那么具体我们是怎么做的呢?下面请大家跟着我来一起来看一看。


举个例子,任务终于开发完成,通过了任务校验,但是任务参数,比如并行度、Slot、内存……该给多少才能正常运行呢?提供了如下三种 case:


  • Case1:资源直接给足-->正常运行-->结束--->资源浪费


  • Case2:资源不足-->反压或者延迟严重-->反复调整资源-->费时费力


  • Case3:指标计算 Groupby-->托管内存不足/增量 Checkpoint 没开-->任务运行一段时间失败


综上所述,三个案例的共性是任务调优成本高,且对用户本身有一定的能力要求。对此我们专门做了静态资源调优的解决办法。


假定用户开发了一个 Flink SQL,第一个环节,首先进行语法校验,然后通过语法校验及后端生成 Stream Graph,拿到 Stream Graph 的同时我们还会进行 Source/Sink 连通校验和参数初步调整。


第二个环节,根据当前的任务逻辑及流量合理的调整资源。首先探测 Source 的流量,然后拿这个值和用户的作业 SQL、Stream Graph 做 Optimizer。Optimizer 部分主要包括 Restart、HighAvailable、Checkpoint、Parallelism、TaskManager、JobManager、StateBackend。


通过不断优化,得到一个比较好的任务资源参数,供用户作为初始任务资源使用。如果探测的资源流量较大,Sink 到 MySQL 的 Batch 设置较小,针对这种情况,我们会提醒 SQL 当中的参数进行调整,来帮助用户更好的调整 SQL 任务的参数。


最终我们会给用户提供给两个视图,分别是 SQL 本身调整的预览、任务所依赖参数的调整预览。如果用户觉得 ok,就可以按照当前的参数上线运行了。以上是静态资源调优。


那么任务上线后是什么情况呢?比如 Flink SQL 正常的 Running,首先将指标采集 Push 到 Kafka,然后会有实时任务进行指标的清洗聚合。针对重要的指标,比如消费延迟指标、算子速率指标、JVM 进程指标,状态大小指标等。


这些指标作为动态资源调整服务的入参,能及时感知到当前任务的运行状况,然后动态资源调整会进行需求资源的申请,将任务重启,并给用户发送通知。如果重启失败,会进行配置回滚,然后告知用户调整失败需要手工介入。


针对动态资源调整,我们的场景大概有如下四个:


  • 设定历史数据追数:Kafka 积压历史数据初次消费、CDC 全量到增量。


  • 期望时间动态调整:特定时间扩缩容,解决活动可预知的流量高峰。


  • 根据指标动态调整:延迟或反压及时调整,预测流量变化提前调整。


  • 异常指标动态调整:例如 JVM GC 频繁,及时调整 TM 内存。


如上就是我们想做的动态资源调优,最终实现的效果及具体的做法。


下面进入弹性资源能力的建设。过去我们基于 Yarn On ECS 的方式,在扩容的时候需要较长的时间。目前我们基于 Yarn On K8s 来实现的,在 Yarn Label 上我们会进行三种队列的设置打标签,固定资源队列对应的是正式任务;弹性资源队列对应的是突发流量任务;抢占资源队列对应的是测试任务。


如果突然线上流量波动,当前任务的固定资源不足。那么我们就可以将通过分钟级的时效,将弹性资源队列资源扩出来,然后将任务调度上去。这样就避免了突发流量所带来额外资源的消耗,同时我们也不需要按照最高峰值流量去预估资源,只需按照常定的任务资源数量来设定底层所需要的资源。


未来我们将引进 Flink Native K8S,希望借助 K8s 本身的资源管理能力提供资源弹性使用户有较好的体验。


指标体系在 Flink 任务中至关重要,主要包含任务可观测、动态资源调优和扩缩容、调度任务依赖三个方面。


  • 第一,任务可观测方面,我们的做法是采集指标到 Kafka,然后通过 Flink 清洗聚合写入 Influxdb/MySQL,Grafana 展示/指标异常监控告警。


  • 第二,动态资源调整和扩缩容的指标应用已经前面说明,就不再赘述了。


  • 第三,调度任务依赖,是指 Kafka/MysqlCDC 数据入湖,下游有离线调度依赖,我们需要感知当前任务是否有延迟,Checkpoint 有没有做,数据在数仓里是否具有可见性,还需要保证数据完整入仓入湖后,下游任务才会启动。


分享两个场景。第一个场景,日志场景建设。当数据量大,入仓时间多于 10 分钟的时候,下游任务相应增大,有没有办法缩短入仓时间?当 HDFS 写入流量波动较大的时候,能不能更加平稳,且数据不丢不重?


众所周知,从日志文件通过 Kafka 到 Flink SQL、写入 Iceberg 都有可能产生数据重复,这一链路能保证数据不丢,但较难保证数据不重。


对此我们的方案是基于文件日志采集 MetaData Logs,然后将 MetaData Logs 在下游复用。其中 MetaData Logs 的文件的行数起到很重要的作用,因为这一链路能保证数据不丢。


如果数据的行数等于 MetaData Logs,就代表这个数据没有重复,一旦数据行数多于 MetaData Logs,就代表这个数据有重复了,但我们只需要基于重复的某一个文件日志进行去重处理,而不需要对全量日志文件都进行去重处理。基于这样处理方式,我们发现入仓时效从原来的 10-20 分钟,降低到分钟级别的延迟。同时这一链路也能保证入仓数据不丢不重,直接可用,等同于离线日志拉取 ETL 的场景。


针对 Iceberg 表我们建立了 Iceberg Manager 来做小文件合并、过期快照清理、孤儿文件清理。


第二个场景,数据库场景建设。比如数据库是 MySQL,我们想通过 Flink CDC 将数据直接写入 Iceberg V2 表。那么就会有如下几方面的考虑:


  • 多个 Flink CDC 任务是否会对一个 MySQL 读取?数据库是否会有压力?已经读取的数据能否复用起来?


  • Flink CDC 增量读取,支持指定读取的时间起点。


  • IcebergV2 全量数据同步时,数据量较大,容易产生了较多 Delete Files,辅助链路的 Iceberg Manager 在进行表级别优化的时候,就会产生较大的压力。


  • Flink CDC 同步任务太麻烦,希望配置化就生成好任务,希望有一键数据入湖的能力。


基于此,我们做了一个链路的辅助,一键任务生成。辅助自动任务的调优扩缩容机制,保证 Flink CDC 全量同步和增量同步资源的切换问题,通过 Kafka 来实现对同一个数据源读取时候的压力问题,将数据写入 Kafka,Kafka 的数据会被下游的 Flink SQL 任务自动感知并同步。


为了解决 Delete Files 全量数据过多的问题。我们在进行全量同步的时候,会关闭写入 Iceberg V2 表的 upsert 功能,在增量的时候才会开启,这样就可以保证全量同步的时候数据既不丢也不重。同时,Flink SQL 任务增量数据会写入 Iceberg V1 表,方便下游链路进行复用。


03

未来展望


未来 Flink SQL 或者平台建设将围绕以下四个方面进行展开:


  • 第一,批流一体。大数据离线数仓和实时数仓分为两套系统,一般离线数仓通过 Spark、Hive 来实现,实时数仓使用 Flink。随着 Flink 批处理能力的不断建设,我们认为使用一套批流一体,既能降低用户成本,还能更方便的避免两套引擎所带来的指标含义不同的影响。


  • 第二,资源弹性能力的建设。未来会基于 K8s 不断引进弹性资源能力,更好的提供给用户使用。


  • 第三,使用场景的建设,结合 Flink SQL 基于 Kafka 提供延迟消息的功能。


  • 第四,近实时数仓 TableStore 的建设。TableStore 新版本发布,计划先实践起来,同时还将结合 Iceberg 不断探索实践,实现让大家基于近实时数仓,就能够得到时效性和确定性两种融合的效果。


往期精选



▼ 关注「Apache Flink」,获取更多技术干货 ▼

   点击「阅读原文」,查看原文视频&演讲 PPT

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存