查看原文
其他

Flink 在米哈游的落地实践

张剑 Apache Flink 2023-05-01
▼ 关注「Apache Flink」,获取更多技术干货 ▼

摘要本文作者米哈游大数据部实时计算负责人张剑,分享 Flink 在米哈游的应用及实践。本篇内容主要分为四个部分:


  1. 背景介绍
  2. 实时平台建设
  3. 实时数仓和数据湖探索
  4. 未来发展与展望

Tips:点击「阅读原文」查看更多技术内容~


一、背景介绍


米哈游成立于 2011 年,致力于为用户提供美好的、超出预期的产品与内容。公司陆续推出了多款高品质人气产品,包括《崩坏学园2》、《崩坏3》、《未定事件簿》、《原神》,动态桌面软件《人工桌面》以及社区产品《米游社》,并围绕原创 IP 打造了动画、漫画、音乐、小说及周边等多元产品。总部位于中国上海,并在新加坡、美国、加拿大、日本、韩国等国家和地区进行全球化布局。


Flink 在米哈游大数据发展过程中,一直扮演着重要角色。自实时计算平台建立以来,Flink 作为实时计算引擎,经历了多个发展阶段,实时计算平台也在不断地迭代完善。在米哈游内部,实时计算平台被称作 Mlink,主要以 Flink 为主,兼容 Spark Streaming 任务。从起初的 Flink Jar 包任务为主,发展到以 Flink Sql 为主,不断的降低了使用门槛和提高了任务的开发效率;从起初基础的 Flink 任务开发,发展到跨区域、跨云厂商的任务多版本管理,满足了业务发展的需求。在发展的过程中,米哈游不断地关注着社区的发展,并同社区和阿里云同学保持密切的联系。


Mlink 主要是基于 Yarn 资源管理的计算平台,支持了数仓、算法、推荐、风控、大屏等业务。任务数 1000+,Sql 任务占比 80% 左右。使用的 Yarn Vcores 超 5000 核,内存 10T 左右,其中单个任务峰值吞吐在 500 万 QPS,每天吞吐的数据规模超千亿。


二、实时平台建设


2.1 遇到的问题


在 Flink 探索发展的过程中,都会遇到 Flink 使用的一些痛点,大家遇到的,同样在我们探索和实践的过程中也有所感触。总结起来,大概是以下五个方面:


  • 一是 Jar 任务的开发成本高,对于不熟悉 Flink 代码的同学来说使用成本过高。同时,Jar 任务维护成本高,一些代码逻辑的改动会涉及到重新打包、上传,上线等动作;

  • 二是任务管理功能缺失,其中多租户、历史版本回溯、开发版本和线上版本管理、UDF 管理、血缘管理是实时平台管理的重要内容;

  • 三是 Flink 引擎本身管理,主要涉及到多 Flink 版本管理,任务参数配置、常用 Connector 的二次开发、多资源环境管理等问题;

  • 四是任务的告警监控管理,任务问题诊断;

  • 五是同离线数仓互通,包括 Hive Catalog 管理,实时和离线调度依赖管理等。

上面的五个问题,可能是普遍的问题,所以各家公司都会基于内部自建或者开源项目二次开发,来满足自身任务开发管理需求。对于米哈游,除了上述五个问题,还存在跨区域、跨云厂商中遇到的问题需要解决,主要是跨区域之后,任务上线和提交效率,跨云厂商,资源环境不一致等。

2.2 解决方案


实时平台建设主要围绕如上问题。目前实时平台架构如下:

图 1:多云多环境实时平台架构

前端控制云环境的切换。Backend Service 主要负责用户权限管理、任务的多版本管理、血缘管理,任务运维,任务上下线,任务监控和告警等工作。Executor Service 主要负责任务解析、任务提交运行、任务下线和同各类资源管理器交互等工作。其中,Backend Service 到 Executor Service 通过 Thrift 协议通信,Executor Service 的实现可以多语言扩展。架构设计主要解决跨地区跨云厂商问题,实现任务管理和任务运行之间解耦。

图 2:Mlink 平台开发页面

图 3:Mlink平台运维页面

图 4:Mlink 平台同步任务页面

Mlink 实时计算平台主要设计了概览、开发、资源管理、运维、数据探查、同步任务、用户管理和执行器管理等模块。其中开发页面主要是用户编写任务和参数配置,包含历史版本管理等内容。资源管理主要是 Jar 包任务和 UDF 管理。运维主要是任务启停、任务运行监控、任务告警配置等。数据探查部分主要是预览部分数据功能,比如 Kafka Topic 支持按分区、按时间或者 Offset 预览数据。同步任务主要是为了方便管理同步任务,比如 CDC 到 Iceberg 一键同步和运行管理。执行器负责 Executor 的运维工作,包括 Executor 上下线,健康状态监控等。

2.3 遇到的挑战


平台建设和迭代过程中,我们遇到了不少的挑战,也产生了一些比较好的实践。主要分享四个方面。


■ 第一是 Executor Service 开发和维护方面


Executor 主要涉及到 Jar 和 Sql 任务解析提交部分。一开始的方案为了解决跨地区传输效率问题,特别是大的 jar 包传输,由后端进行任务解析,最后传输 job graph 到 Executor,Executor 再通过资源管理器 Api 提交,这个因为后端解析环境不一致问题,部分任务解析过程中会存在 action 动作,特别是涉及到 Hive 表和 Iceberg 表部分。最后采用后端不执行,改由 Executor 解析的方案。Executor 在解析过程中,遇到了 Executor 在运行很长一段时间后,会出现元空间 OOM 的情况。这个主要是因为 Executor 不断的加载任务需要 Class 类,会导致使用的元空间内存不断增加。这个主要是通过任务解析完成之后,卸载类加载器和堆 GC 设置来解决。

■ 第二是监控方面


监控采用的是 Influxdb 加 Grafana 的方案。随着任务量的不断增加,Influxdb 存储的 Series 超过百万,影响监控查看的稳定性,查询响应缓慢。一是扩展 Influxdb,执行端通过一致性 hash 的方案,分配任务 Metric 上报到不同 Influxdb。本身通过对 Flink 任务上报 Metric 进行一定程度的精简。其次在监控上,比如 Kafka 消费监控,目前是支持消费条数的延迟监控,自定义了 Kafka 消费延迟时间的监控,主要是采集了 Kafka 最慢并行度消费的时间,能够反映 Kafka 消费的最大延迟时间,能够反映某个时间点的数据一定被消费了。

图 5:Grafana 监控示例

■ 第三是 Connector 二次开发方面


在 CDC 1.0 版本基础上迭代,支持 Mysql 采集的时候动态扩展字段和基于时间启动消费位点、采集的库表、位点等 Schema 信息。在 CDC 2.0 版本基础上,增加了全量读取库表流控和不需要 MySQL 开启 Binlog 的全量初始化功能。其中多 CDC 实例同步可能会对上游 Mysql 造成压力,采用了 Kafka 作为数据中转,根据库表主键字段作为 Topic 的 Key,保证 Binlog 的顺序,在下游不会出现数据乱序。

Iceberg 作为数据湖方案,改造的点主要是 Iceberg V2 表的支持上面,也就是 Upsert 表。建立 Iceberg 管理中心,会根据合并策略定期优化和清理,Flink 写入主要保证在 CDC 到 Iceberg V2 表顺序性,在如何减少 Delete File 上,在 Iceberg 写入上增加了 BloomFilter 的支持,能够显著减少 Delete File 大小。Iceberg 管理中心,支持了 V2 表合并和 Flink 提交冲突问题。

Clickhouse 方面,重构了 Clickhouse 写入代码,优化了 Clickhouse 的写入性能,支持了本地表和分布式表写入。

■ 第四是数据入湖和离线调度方面


实时平台集成了 Iceberg,并支持 Iceberg Hadoop、Hive、Oss、S3 多种 Catalog。CDC 到 Iceberg 入湖链路已经在部门生产业务上线使用。在数据入湖或者入仓中,如果下游表有被离线数仓用到的地方,都会有依赖调度问题,离线任务何时启动?目前我们主要通过计算任务的延迟时间和 Checkpoint 时间来确保数据已经入仓入湖。以 CDC 或者 Kafka 到 Iceberg 为例。首先采集 CDC 端采集延迟时间,Kafka 采集最慢并行度延迟时间,同时采集任务 Checkpoint 时间。现在的 Checkpoint 完成,Iceberg 版本不一定会更新,基于此,对 Iceberg 写入进行了改造。这样一个同步任务,如果 CDC 采集端没有延迟,Checkpoint 也已经完成,可以保证某个小时的数据一定已经入仓。实时平台提供任务延迟查询接口。离线调度以此接口为调度依赖节点。这样就保证了离线任务启动时候,入仓数据的完整性。

三、实时数仓和数据湖探索


实时数据采集,目前主要是三条链路:

  • 一是日志类型,主要是通过 Filebeat 采集写入 Kafka,Es 作为 Filebeat 的监控;

  • 二是 Api 接口上报服务,后端接入 Kafka;

  • 三是 CDC 采集全量加增量 Mysql 数据,写入 Kafka 或者直接写入 Iceberg。之前是采用 Canal 作为增量采集方案,现在已经全部改为了 CDC。

实时数仓架构设计和业内基本一致,包括 ODS、DWD、DWS 层,之后输出到各应用系统,比如 Clickhouse、Doris、Mysql、Redis 等。目前主要以 Kafka 作为中间承载,也在探索 Iceberg 作为中间层的使用。Iceberg 虽然具有流读功能,但是流读时候数据的顺序性问题,一直没有较好的方案解决,我们也是在探索过程中。探索的主要方向有两个:

  • 一是将 Kafka 和 Iceberg 作为混合 Source 方案,Flink 任务读取混合 Source 之后,基于 Iceberg 快照记录的 Kafka 位点,确定读取范围和切换点;

  • 二是社区 Flip-188 提出的引入 Dynamic Table 存储实现。Flink 内置表由两部分组成,LogStore 和 FileStore。LogStore 将满足消息系统的需要,而 FileStore 是列式格式文件系统。在每个时间点,LogStore 和 FileStore 都会为最新写入的数据存储完全相同的数据 (LogStore 有 TTL),但物理布局不同。

在实时数仓探索方面,主要是 CDC 到 Iceberg 入湖任务,已经在生产上使用。其中主要解决了四个问题:

  • 一是 CDC 采集问题,特别是多库多表采集,会集中采集到 Kafka,减少多个 CDC 任务对同一数据库影响;

  • 二是 Iceberg 支持 V2 表写入,包括写入的索引过滤减少 Delete 文件,Iceberg 管理中心合并和提交冲突;

  • 三是支持分库分表的数据校验和数据延迟检查;

  • 四是一键式任务生成。对于用户而言,只需要填写数据库相关信息,目标 Iceberg 表库名和表名,并支持使用 Kafka 中转,避免多个 CDC 实例采集同一个数据库实例。

通过上述四个问题的解决,能够达到数据库数据分钟级数据入湖,入湖的数据校验和数据延迟依赖达成,方便下游离线任务调度启动。

图 6:数据入湖链路

四、未来发展与展望


主要有四点:

  • 一是 Flink 动态表存储能够尽快实现落地,实现真正的实时数仓和流表一体;

  • 二是 Flink 任务动态扩缩容、基于任务诊断的主动资源调整、细粒度资源调整;

  • 三是 Flink 对批任务的读写优化,目前批任务 Flink 的使用面不如 Spark,如果未来能够在此补足,可以做到流批操作一个引擎,开发成本会显著降低;

  • 四是 Flink 加数据湖更好的落地推广。



更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~

   戳我,查看更多技术内容~

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存