摘要:本文整理自小米软件开发工程师周超,在 Flink Forward Asia 2022 平台建设专场的分享。本篇内容主要分为四个部分:
1. 小米数仓架构演变 2. Flink+Iceberg 架构升级实践 3. 流批一体实时数仓探索 4. 未来展望
Tips:点击「阅读原文」查看原文视频&演讲 ppt
1.1 数仓架构现状
上图展示的是小米目前的技术架构,在存储侧我们主要应用数据湖 Iceberg 和自研消息队列 Talos,计算层主要应用 Flink 和 Spark,他们统一运行在 Yarn 上,统一通过 Metacat 获取元数据信息,并通过 Ranger 来进行统一的鉴权服务。我们内部使用 Spark 和 Presto 来支撑 OLAP 查询场景,并通过 Kyuubi 来实现路由。在实时数仓场景中,我们选择 Flink 作为计算底座,Hive、Talos、Iceberg 作为存储底座,其中,消息队列 Talos 作为传统 Lambda 架构的通用选择,在我们内部占比较大且很稳定,Iceberg 作为一款优秀的湖存储,兼具时效性和低成本,其使用占比也在逐步提升,使用到 Iceberg 的 Flink 作业在总占比中已经达到近 50%。
我们对内部实时链路进行了统计,Iceberg 在大多数场景下已经对 Hive 进行了替换,对分钟级的实时链路进行了较好的支撑;因为使用 Iceberg 搭建的实时链路目前仅能达到分钟级的时效,消息队列仍有着较高占比。1.2 数仓架构演变
在引入数据湖前,针对日志埋点这样的聚合计算场景,业务会使用离线计算来搭建链路,采集模块会将日志或埋点数据统一收集到消息队列中,Flink 消费消息队列中的数据实时写入 ODS 层 Hive 表,下游的计算则采用 Spark 或者 Hive 按小时或天进行清洗、聚合。显然,这样的链路处理延迟和成本都较高,这些离线作业往往都在凌晨进行调度,给整个集群带来较大压力。
针对 CDC 数据源,实时数据通常会通过消息队列进行流转,保证处理的实时性,数据在消息队列中以 Changelog-Json 的格式进行存储。但为了保证计算的准确性,业务链路通常会使用 Lambda 架构来搭建,会额外引入一条离线链路。离线链路基于 Hive 或 Kudu 构建,ODS 层使用 Spark Streaming 近实时导入,部分场景也会定期全量导入,下游计算依赖 Spark 做定时调度。显然,这样的架构开发和维护的成本都会很高。
带着上面的问题,我们想要对批和流链路进行统一,并能够满足低成本和低延迟,为此我们引入了 Iceberg,在引入 Iceberg 初期,小米内部的使用以 v1 表为主(v1 表是数据分析表,支持 Append 类型数据的增量读写)。因为 Flink 旧架构(1.12 版本)读取 Iceberg 的数据时效性不高,所以在日志埋点场景的应用主要是替换了 Hive,使用 Iceberg 来存储 ODS、DWD 层数据,可以降低存储成本,同时配合 Spark、Presto 可以获得更快的查询速度。
针对 CDC 数据源的场景,在初期也同样以替换 Hive 为主以获取更低的成本。
在中期,我们开始大规模使用 Iceberg V2 表,并对 Iceberg V2 表的生态不断进行完善,v2 表在 v1 表的基础上支持了行级别的更新和删除,同时也支持了 Merge on read 模式,并且有着不错的性能。业务的实时链路也可以完全依赖 Flink 和 Iceberg 来进行搭建。之前的日志埋点链路通过 Iceberg v2 表的升级后,使用 Flink+Iceberg v2 替换了原先的 Spark + Iceberg v1,将链路时效性由小时级提升至分钟级。
由于 v2 表能够支持行级别的更新,而且数据实时可查,原本针对 CDC 数据源的 Lambda 架构链路可以升级到 Kappa 架构,由 Flink 和 Iceberg v2 表来构建,兼顾时效性和成本,依赖 Parquet+ZSTD 压缩,存储成本相比于原先 Parquet+snappy 能够节省 30%。1.3 当前架构遇到的问题
经过我们一段时间的使用,我们发现目前 Iceberg 能够很好地兼顾成本、查询效率,社区的很多优化也以离线为主,但在实时场景中存在着时效性和稳定性方面的不足,距离消息队列仍有差距,同时,Iceberg 作为统一的存储 Format,在实际消费时需要读取底层文件,而 v2 表有着多种文件类型,读取时需要组织 DataFile 和两类 DleteFile(Equlity delete 和 Position delete)的关系,逻辑较为复杂。
我们在基于 Flink+Iceberg 的实时链路构建中,经常会遇到以下两类问题:2.1 基于 Flink1.12 的旧架构实现
针对上述的两个问题,我们对 Flink+Iceberg 的架构进行了升级。上图中的实时数仓链路由多张 Iceberg 表和多个 Flink 作业组成,其中 Iceberg 负责数据的存储,Flink 负责数据的清洗、流转,显然对一条链路的实时性和稳定性支撑,Flink 起了关键作用。在一个 Flink 流式作业中,数据会经过读取、计算、写入,在实际场景中,我们发现数据的读取效率低,严重影响了作业吞吐,后续的相关优化也主要围绕读取部分展开。
在优化前,我们的 Flink+Iceberg 实时链路主要依托于 Flink 1.12 版本构建,在 1.12 版本中,读取逻辑被拆分为 Monitor 和 Reader 两个算子,在进行增量消费时,Monitor 算子扫描 Snapshot 中的文件,并组织成 Split 发往下游给 Reader 算子消费。这样的架构做到了很好的扫描和读取逻辑分离,但是仍有几点重要缺陷,例如:2.2 旧架构遇到的主要问题
这样的缺陷在实际作业中会有实时性和稳定性两大问题表现。在实时性方面,存在着消费速度慢、消费存在波动;在稳定性方面,存在着 Task OOM,Checkpoint 容易超时。
在时效性方面,目前主要有三个主要问题,分别是消费波动、消费延迟以及消费瓶颈。Split 的单向传输和扫描操作时间触发使得消费存在波动和延迟,考虑 Monitor 算子在一个时间周期内仅能够发送固定数量的 Split 给下游进行消费,如果 Split 数量少,那么 Reader 算子会有部分时间处于空闲状态,导致消费存在波动,存在资源浪费;而如果一个周期内下发的 Split 超过了 Reader 的消费能力,那么 Split 就会在 Reader 侧堆积,占用额外的堆内存。同时固定的扫描间隔也会导致消费的延迟,新数据需要等待一定扫描间隔后才可能被消费到,如果用户配置了一个较大的扫描间隔,那么数据的时效性会继续降低。
这样的机制不仅影响着实时性,对稳定性也有不小的影响。Monitor 和 Reader 的单向同步机制,使得消费需要指定间隔和间隔内下发的 Split 数,未消费完的 Split 会存储在堆内存中,积压较多会导致 OOM、Full gc 频繁,Task 吞吐降低。同时,旧架构的 SourceFunction 在实现数据下发时需要持有 Checkpoint 锁从而保证数据下发和状态更新的一致,而 Reader 算子 Checkpoint 粒度仅细化到 Split 级别,所以 Reader 算子需要长时间去持有 Checkpoint 锁,只有消费完一个 Split 后才会释放,这在下游处理慢,反压情况下是致命的缺陷,很容易导致 Checkpoint 超时。这些点一同促使着作业稳定性的降低。2.3 基于 Flink1.14 的新架构实现
为了解决上述实时性和稳定性问题,我们在社区基于 FLIP-27 的改动上改进了读取逻辑,主要涵盖了上图右侧的七点,其中双向通信,Monitor 逻辑移至 JobManager 是 FLIP-27 的关键优化点。我们内部主要对后面的五点进行了优化,分别是 Snapshot 的依次扫描、自适应的扫描模式、分区多并发消费等。
增量消费 Iceberg 存在着两种方式,分别为依次扫描 Snapshot 和合并多个 Snapshot 扫描。在合并多个 Snapshot 的扫描模式中,需要依赖 Merge on read 模式,用后续 Snapshot 中的 Delete 文件对当前 Snapshot 中的 Data 文件数据进行过滤。如果合并多个 Snapshot 进行消费,那么一个 DataFile 可能会关联到很多后续 Snapshot 的 DeleteFile,使得 Split 的组织变得复杂,同时 Reader 算子在使用 DeleteFile 过滤 DataFile 时,需要将 Equlity delete file 全部读取到内存中,这也很容易导致 Task 产生内存问题。考虑到上面的文件组织复杂度和内存文件,我们默认选择将扫描模式设置为了依次扫描,该模式可以更好地追踪数据变化,并且降低文件组织复杂度,避免了在合并多个 Snapshot 模式中因为 Delete 文件较大而产生的内存问题,对稳定性更加友好。
旧架构中,扫描逻辑主要由时间驱动,定时触发,在新架构中,我们引入了自适应的扫描模式,增加了事件驱动,解决了消费波动和 Task 潜在的内存问题。在实际扫描过程中,动态 Enumerator 会根据内存中 Buffer 的反馈进行决策,小于阈值就立刻执行扫描操作,保证 Reader 能够连续消费,大于阈值就阻塞扫描,避免将更多的 Split 缓存在内存中。
在新架构中,我们针对 v2 表实现了并发消费,将原本的单一队列 Buffer 按照下游 Task 拆分成多个队列 Buffer,Iceberg 表中不同分区的数据文件会按照写入排序,并被 Hash 到不同的队列,实现消费的分区有序。同时为了保证各个 Task 消费数据的对齐,我们使用 Snapshot 的提交时间来生成 Watermark,引入 AlignedAssigner 来实现统一的 Split 分配,在分配端实现对齐,保证下游各个 Task 消费数据的对齐。
上面我们讲到的自适应扫描只能解决单个 Source 实例的问题,在实际应用中,部分场景仍有潜在稳定性问题存在,例如集成场景中的指标拆分,将一张表的数据拆分至多张表;数仓场景中,对同一张表进行多次引用,筛选不同部分的数据进行 Join。在这两个使用场景中,因为不满足 Source 复用规则,会有多个读取同一张 Source 表的实例存在。在 Flink 中,Source 的复用受 Partition、Limit、Project、Filter 影响,以 Project 和 Filter 为例。上图左边的 SQL 描述了 Project 下推导致的复用失效,因一个字段的区别,同一份数据就会被读取三次;上图右边的 SQL 描述了 Filter 下推导致复用失效的场景,即使选取的范围有很大重复,但 Source 仍不会得到复用。由于复用的失效,同一个表的相同 Split 会在内存中存在多份,依然有出现内存问题的可能。
通过切换至新架构,消费 Iceberg 表的平均扫描间隔降至小于 1 秒,单个 Task 吞吐提升至 70 万条每秒,实时数仓链路新鲜度提升至 5 分钟内。上一章介绍了 Flink 读取 Iceberg 架构的优化,这一章将主要介绍小米在 Flink 流批一体实时数仓上遇到的问题以及相关探索。
第一类是数据波动,实时数仓中数据是不断变化的,由于 Flink 回撤机制的存在,-U 和+U 会拆分为两条数据写入,在-U 写入,+U 未写入时执行查询,会查询到异常数据,而在+U 写入后又能查询到正常结果。第二类是计算不确定性,Flink 中算子的状态过期会导致计算结果的不确定。同时针对这部分异常数据,往往没有简单的对比、修复手段,这也会导致实时链路产生的数据修复起来比较麻烦。
针对数据波动问题,考虑到下游绝大多数系统都能够支持 Upsert 写入,我们引入了写入前数据丢弃能力,用于丢弃无关紧要的数据,将其称为 Drop Operator。该算子作用在 Sink 节点前,能够根据配置丢弃指定类型的数据。针对 Flink 聚合增量数据写入 ADS 层 MySQL 的场景,可以配置丢弃-U,避免 ADS 层查询波动。同样,该配置可以很方便的将 Changelog 流丢弃-U 和-D 转为 Append 流,满足一些特殊的业务场景。
在解决计算的不确定性前,我们需要先了解其产生的原因。在 Flink SQL 中,状态起着重要作用,正确的中间状态是计算结果正确的必要条件。但显然,目前状态的保持是昂贵的,我们需要一个状态过期策略来进行平衡。在 Flink 内部,有着 Watermark 清理和 TTL 清理两类算子。Watermark 可以根据业务的需要去生成,清理的策略根据实际使用场景制定,所以对计算结果影响可控。而依赖 TTL 清理的这类算子,在 Flink SQL 中状态过期的策略无法得到准确控制,只能设置一个统一的状态过期时间,往往因为过期时间设置不合理或者满足不了业务需求,从而产生预料之外的计算结果。
例如物流、服务单场景,订单从创建到关闭的时间跨度往往很长,很容易出现在订单还没有结束前,状态就过期了。为了解决订单跨度时间长导致状态丢失的问题,业务会设置一个离线的 Topic,通过离线链路定期往离线 Topic 里补数据,补充的数据重新流入实时链路中,将过期状态重新补回。
针对由状态过期而导致的计算不确定问题,我们有两种解决思路。
为了能够让状态按需过期,我们引入了算子级的状态清理功能,将清理规则应用范围从作业细化到各个算子,将清理规则从时间规则拓展到业务规则,并通过 Query Hint 对算子提供灵活、方便的定义。
目前该功能支持两类算子,分别是 Group 聚合算子和 Regular Join 算子,上图表格为支持的参数,通过 TTL 的参数可以设置该算子状态的过期时间,condition 参数可以填写清理规则,为了方便判断,清理规则需要是布尔表达式。
上图的 SQL 展示了求某类商品总销量的聚合计算逻辑,该聚合算子状态保存时间为 30 天,覆盖了作业级的 1 天保存时间,且当商品状态为售罄或下架,那么就清除该商品的状态,这意味着有关该商品的销售记录后续不会再出现。在聚合算子里,我们加入了一个状态清理的检查器,将用户设置的清理规则经过 codegen 转换为 Java 代码,在聚合计算后进行规则检查,匹配成功后执行清理。
同样针对 Join 算子,状态清理检查器的实现类似,只是在 Join 算子会对左右表的状态分别进行清理,清理完后会去对方状态中将引用计数-1。上图的 SQL 示例描述了一个物流表的 Join 场景,左表为物流订单表,保存着订单状态以及更新时间,右表为维度表,保存着该订单的一些基础信息,包括创建时间。在图中的例子中,Join 算子的状态清理不再依赖 Proctime,只依赖于运单状态和运单的持续时间。
虽然算子级状态清理能够解决一部分需求,但它的使用门槛较高,且并非所有业务都有明确的清理规则,一个简单方便的修复手段才适用于所有场景。如果想要用 Flink Batch 对数据进行修复,目前有 INSERT 和 OVERWRITE 两种方式。使用 INSERT 实现 SQL 逻辑较为复杂,且只能对数据进行覆盖,不能删除;OVERWRITE 的修复方式粒度较粗,而且会使下游实时作业产生较大波动。在这样的场景下,我们使用 Flink 实现了 Merge 语法。Merge 语法会对两个数据源做 Join,并可以针对不同的 Join 情况执行增、删、改操作,对下游影响小。
在具体的实现上,我们在原本的 Calcite 语法上完善了 Merge 语法的解析逻辑,支持为每个 Action 设置独立的判断条件,在 Schema 匹配情况下支持 Insert * 和 Update * 语句,简化逻辑。在 SQL 校验阶段,Merge 逻辑会被转为 Outer Join 和多个 Merge Action 的结合。在优化阶段,目前我们会根据实际的 Merge Action 情况来优化 Join 方式,将默认的 Outer Join 改写为 Anti Join 或者 Inner Join,减少处理的数据量。最终,Merge 逻辑会生成 Join 和 MergeAction 两个算子,Merge Action 算子根据上游 Join 情况来生成增、删、改数据并发往下游。由于 Flink SQL 目前提供了优秀的流批一体架构,可以复用当前的逻辑,将增删改数据写入下游数据系统。
在我们内部,Spark 和 Flink 目前都支持 Merge 语法,但 Spark 在框架层只提供了语法侧的支持,Runtime 层的支持在 Iceberg 侧由插件实现。Flink 则在框架中实现了语法和 Runtime 层的支持,使得 Merge 的功能更加通用,也能够支持更多存储系统。目前,在我们内部,Flink merge into 入湖和入库场景使用较多。
因为实现原因,Spark 在我们内部目前仅能支持 Merge into 入湖,所以我们在 Merge into 入湖场景下对 Spark 和 Flink 的处理速度做了测试,目前中小批量数据的 Merge 操作 Flink 执行速度会略快,大数据场景下 Flink 因为入湖速度较 Spark 慢,所以耗时稍多,但整体来看,Flink 已经能够满足日常修复需求。
基于 Iceberg 的秒级湖仓建设,目前 Flink+Iceberg 在我们内部实时链路中能够很好的支持分钟级的场景,我们希望未来在实时性上有所突破,将链路新鲜度提升至秒级。
基于 Iceberg 的完整 CDC 的支持。目前,如果一张 Iceberg 被多个上游并行写入,或者单个作业回溯写入,我们需要使用 Upsert 模式,写入+I 或+U 前默认写入一条-D,但因为缺少信息,写入的 Delete 可能是多余的且无法获取到正确的非主键列值,我们希望在后期能够对其完善,使得下游能够读取到正确完整的 CDC 数据。
- 跟进基于 Flink SQL Gateway,完善动态查询的支持。
▼ 关注「Apache Flink」,获取更多技术干货 ▼ 点击「阅读原文」,查看原文视频&演讲 PPT