整理|路培杰(Flink 社区志愿者)
摘要:Apache Flink 是目前大数据领域非常流行的流批统一的计算引擎,数据湖是顺应云时代发展潮流的新型技术架构,以 Iceberg、Hudi、Delta 为代表的解决方案应运而生,Iceberg 目前支持 Flink 通过 DataStream API /Table API 将数据写入 Iceberg 的表,并提供对 Apache Flink 1.11.x 的集成支持。
本文由腾讯数据平台部高级工程师苏舒分享,主要介绍腾讯大数据部门基于 Apache Flink 和 Apache Iceberg 构建实时数仓的应用实践,介绍主要包括如下几个方面:
背景及痛点
数据湖 Apache Iceberg 的介绍
Flink+Iceberg 构建实时数仓
未来规划
Tips:点击文末「阅读原文」即可回顾作者分析的原版视频~
如图 1 所示,这是当前已经助力的一些内部应用的用户,其中小程序和视频号这两款应用每天或者每个月产生的数据量都在 PB 级或者 EB 级以上。
这些应用的用户在构建他们自己的数据分析平台过程中,他们往往会采用图 2 这样的一个架构,相信大家对这个架构也非常的熟悉了。业务方比如腾讯看点或者视频号的用户,他们通常会采集应用前端的业务打点数据以及应用服务日志之类的数据,这些数据会通过消息中间件(Kafka/RocketMQ)或者数据同步服务 (flume/nifi/dataX) 接入数仓或者实时计算引擎。在数仓体系中会有各种各样的大数据组件,譬如 Hive/HBase/HDFS/S3,计算引擎如 MapReduce、Spark、Flink,根据不同的需求,用户会构建大数据存储和处理平台,数据在平台经过处理和分析,结果数据会保存到 MySQL、Elasticsearch 等支持快速查询的关系型、非关系型数据库中,接下来应用层就可以基于这些数据进行 BI 报表开发、用户画像,或基于 Presto 这种 OLAP 工具进行交互式查询等。
在整个过程中我们常常会用一些离线的调度系统,定期的(T+1 或者每隔几小时)去执行一些 Spark 分析任务,做一些数据的输入、输出或是 ETL 工作。离线数据处理的整个过程中必然存在数据延迟的现象,不管是数据接入还是中间的分析,数据的延迟都是比较大的,可能是小时级也有可能是天级别的。另外一些场景中我们也常常会为了一些实时性的需求去构建一个实时处理过程,比如借助 Flink+Kafka 去构建实时的流处理系统。整体上,数仓架构中有非常多的组件,大大增加了整个架构的复杂性和运维的成本。如下图,这是很多公司之前或者现在正在采用的 Lambda 架构,Lambda 架构将数仓分为离线层和实时层,相应的就有批处理和流处理两个相互独立的数据处理流程,同一份数据会被处理两次以上,同一套业务逻辑代码需要适配性的开发两次。Lambda 架构大家应该已经非常熟悉了,下面我就着重介绍一下我们采用 Lambda 架构在数仓建设过程中遇到的一些痛点问题。
图3
例如在实时计算一些用户相关指标的实时场景下,我们想看到当前 pv、uv 时,我们会将这些数据放到实时层去做一些计算,这些指标的值就会实时呈现出来,但同时想了解用户的一个增长趋势,需要把过去一天的数据计算出来。这样就需要通过批处理的调度任务来实现,比如凌晨两三点的时候在调度系统上起一个 Spark 调度任务把当天所有的数据重新跑一遍。很显然在这个过程中,由于两个过程运行的时间是不一样的,跑的数据却相同,因此可能造成数据的不一致。因为某一条或几条数据的更新,需要重新跑一遍整个离线分析的链路,数据更新成本很大,同时需要维护离线和实时分析两套计算平台,整个上下两层的开发流程和运维成本其实都是非常高的。为了解决 Lambda 架构带来的各种问题,就诞生了 Kappa 架构,这个架构大家应该也非常的熟悉。我们来讲一下 Kappa 架构,如图 4,它中间其实用的是消息队列,通过用 Flink 将整个链路串联起来。Kappa 架构解决了 Lambda 架构中离线处理层和实时处理层之间由于引擎不一样,导致的运维成本和开发成本高昂的问题,但 Kappa 架构也有其痛点。首先,在构建实时业务场景时,会用到 Kappa 去构建一个近实时的场景,但如果想对数仓中间层例如 ODS 层做一些简单的 OLAP 分析或者进一步的数据处理时,如将数据写到 DWD 层的 Kafka,则需要另外接入 Flink。同时,当需要从 DWD 层的 Kafka 把数据再导入到 Clickhouse,Elasticsearch,MySQL 或者是 Hive 里面做进一步的分析时,显然就增加了整个架构的复杂性。
其次,Kappa 架构是强烈依赖消息队列的,我们知道消息队列本身在整个链路上数据计算的准确性是严格依赖它上游数据的顺序,消息队列接的越多,发生乱序的可能性就越大。ODS 层数据一般是绝对准确的,把 ODS 层的数据发送到下一个 kafka 的时候就有可能发生乱序,DWD 层再发到 DWS 的时候可能又乱序了,这样数据不一致性就会变得很严重。
第三,Kafka 由于它是一个顺序存储的系统,顺序存储系统是没有办法直接在其上面利用 OLAP 分析的一些优化策略,例如谓词下推这类的优化策略,在顺序存储的 Kafka 上来实现是比较困难的事情。
那么有没有这样一个架构,既能够满足实时性的需求,又能够满足离线计算的要求,而且还能够减轻运维开发的成本,解决通过消息队列构建 Kappa 架构过程中遇到的一些痛点?答案是肯定的,后面的篇幅会详细论述。
图4
■ 传统 T+1 任务
海量的TB级 T+ 1 任务延迟导致下游数据产出时间不稳定。
任务遇到故障重试恢复代价昂贵
数据架构在处理去重和 exactly-once语义能力方面比较吃力
架构复杂,涉及多个系统协调,靠调度系统来构建任务依赖关系
对消息队列存储要求高,消息队列的回溯能力不及离线存储
消息队列本身对数据存储有时效性,且当前无法使用 OLAP 引擎直接分析消息队列中的数据
全链路依赖消息队列的实时计算可能因为数据的时序性导致结果不正确
图5
是否存在一种存储技术,既能够支持数据高效的回溯能力,支持数据的更新,又能够实现数据的批流读写,并且还能够实现分钟级到秒级的数据接入?
这也是实时数仓建设的迫切需求(图 6)。实际上是可以通过对 Kappa 架构进行升级,以解决 Kappa 架构中遇到的一些问题,接下来主要分享当前比较火的数据湖技术--Iceberg。
图 6
1.Iceberg 是什么
首先介绍一下什么是 Iceberg。官网描述如下:Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.Iceberg 的官方定义是一种表格式,可以简单理解为是基于计算层(Flink , Spark)和存储层(ORC,Parqurt,Avro)的一个中间层,用 Flink 或者 Spark 将数据写入 Iceberg,然后再通过其他方式来读取这个表,比如 Spark,Flink,Presto 等。
图 7
2.Iceberg 的 table format 介绍
Iceberg 是为分析海量数据准备的,被定义为 table format,table format 介于计算层和存储层之间。table format 主要用于向下管理在存储系统上的文件,向上为计算层提供一些接口。存储系统上的文件存储都会采用一定的组织形式,譬如读一张 Hive 表的时候,HDFS 文件系统会带一些 partition,数据存储格式、数据压缩格式、数据存储 HDFS 目录的信息等,这些信息都存在 Metastore 上,Metastore 就可以称之为一种文件组织格式。一个优秀的文件组织格式,如 Iceberg,可以更高效的支持上层的计算层访问磁盘上的文件,做一些 list、rename 或者查找等操作。3.Iceberg 的能力总结
Iceberg 目前支持三种文件格式 parquet,Avro,ORC,如图 7,无论是 HDFS 或者 S3 上的文件,可以看到有行存也有列存,后面会详细的去介绍其作用。Iceberg 本身具备的能力总结如下(如图 8),这些能力对于后面我们利用 Iceberg 来构建实时数仓是非常重要的。
图8
基于快照的读写分离和回溯
流批统一的写入和读取
不强绑定计算存储引擎
ACID 语义及数据多版本
表, 模式及分区的变更
4.Iceberg 的文件组织格式介绍
下图展示的是 Iceberg 的整个文件组织格式。从上往下看:
首先最上层是 snapshot 模块。Iceberg 里面的 snapshot 是一个用户可读取的基本的数据单位,也就是说用户每次读取一张表里面的所有数据,都是一个snapshot 下的数据。
其次,manifest。一个 snapshot 下面会有多个 manifest,如图 snapshot-0 有两个 manifest,而 snapshot-1 有三个 manifest,每个 manifest 下面会管理一个至多个 DataFiles 文件。
第三,DataFiles。manifest 文件里面存放的就是数据的元信息,我们可以打开 manifest 文件,可以看到里面其实是一行行的 datafiles 文件路径。
从图上看到,snapshot-1 包含了 snapshop-0 的数据,而 snapshot-1 这个时刻写入的数据只有 manifest2,这个能力其实就为我们后面去做增量读取提供了一个很好的支持。
图 9
5.Iceberg 读写过程介绍
■ Apache Iceberg 读写
首先,如果有一个 write 操作,在写 snapsho-1 的时候,snapshot-1 是虚线框,也就是说此时还没有发生 commit 操作。这时候对 snapshot-1 的读其实是不可读的,因为用户的读只能读到已经 commit 之后的 snapshot。发生 commit 之后才可以读。同理,会有 snapshot-2,snapshot-3。
Iceberg 提供的一个重要能力,就是读写分离能力。在对 snapshot-4 进行写的时候,其实是完全不影响对 snapshot-2 和 snapshot-3 的读。Iceberg 的这个能力对于构建实时数仓是非常重要的能力之一。
图 10
同理,读也是可以并发的,可以同时读 s1、s2、s3 的快照数据,这就提供了回溯读到 snapshot-2 或者 snapshot-3 数据的能力。Snapshot-4 写完成之后,会发生一次 commit 操作,这个时候 snapshot-4 变成了实心,此时就可以读了。另外,可以看到 current Snapshot 的指针移到 s4,也就是说默认情况下,用户对一张表的读操作,都是读 current Snapshot 指针所指向的 Snapshot,但不会影响前面的 snapshot 的读操作。接下来讲一下 Iceberg 的增量读。首先我们知道 Iceberg 的读操作只能基于已经提交完成的 snapshot-1,此时会有一个 snapshot-2,可以看到每个 snapshot 都包含前面 snapshot 的所有数据,如果每次都读全量的数据,整个链路上对计算引擎来说,读取的代价非常高。如果只希望读到当前时刻新增的数据,这个时候其实就可以根据 Iceberg 的 snapshot 的回溯机制,仅读取 snapshot1 到 snapshot2 的增量数据,也就是紫色这块的数据可以读的。
图 11
同理 s3 也是可以只读黄色的这块区域的数据,同时也可以读 s3 到 s1 这块的增量数据,基于 Flink source 的 streaming reader 功能在内部我们已经实现这种增量读取的功能,并且已经在线上运行了。刚才讲到了一个非常重要的问题,既然 Iceberg 已经有了读写分离,并发读,增量读的功能,Iceberg 要跟 Flink 实现对接,那么就必须实现 Iceberg 的 sink。社区现在已经重构了 Flink 里面的 FlinkIcebergSink,提供了 global committee 的功能,我们的架构其实跟社区的架构是保持一致的,曲线框中的这块内容是 FlinkIcebergSink。在有多个 IcebergStreamWriter 和一个 IcebergFileCommitter 的情况下,上游的数据写到 IcebergStreamWriter 的时候,每个 writer 里面做的事情都是去写 datafiles 文件。
图 12
当每个 writer 写完自己当前这一批 datafiles 小文件的时候,就会发送消息给 IcebergFileCommitter,告诉它可以提交了。而 IcebergFileCommitter 收到信息的时,就一次性将 datafiles 的文件提交,进行一次 commit 操作。commit 操作本身只是对一些原始信息的修改,当数据都已经写到磁盘了,只是让其从不可见变成可见。在这个情况下,Iceberg 只需要用一个 commit 即可完成数据从不可见变成可见的过程。Flink 实时作业一般会长期在集群中运行,为了要保证数据的时效性,一般会把 Iceberg commit 操作的时间周期设成 30 秒或者是一分钟。当 Flink 作业跑一天时,如果是一分钟一次 commit,一天需要 1440 个 commit,如果 Flink 作业跑一个月commit 操作会更多。甚至 snapshot commit 的时间间隔越短,生成的 snapshot 的数量会越多。当流式作业运行后,就会生成大量的小文件。这个问题如果不解决的话,Iceberg 在 Flink 处理引擎上的 sink 操作就不可用了。我们在内部实现了一个叫做 data compaction operator 的功能,这个 operator 是跟着 Flink sink 一起走的。当 Iceberg 的 FlinkIcebergSink 每完成一次 commit 操作的时候,它都会向下游 FileScanTaskGen 发送消息,告诉 FileScanTaskGen 已经完成了一次 commit。
图 13
FileScanTaskGen 里面会有相关的逻辑,能够根据用户的配置或者当前磁盘的特性来进行文件合并任务的生成操作。FileScanTaskGen 发送到 DataFileRewitre 的内容其实就是在 FileScanTaskGen 里面生成的需要合并的文件的列表。同理,因为合并文件是需要一定的耗时操作,所以需要将其进行异步的操作分发到不同的 task rewrite operator 中。上面讲过的 Iceberg 是有 commit 操作,对于 rewrite 之后的文件需要有一个新的 snapshot 。这里对 Iceberg 来说,也是一个 commit 操作,所以采用一个单并发的像 commit 操作一样的事件。整条链路下来,小文件的合并目前采用的是 commit 操作,如果 commit 操作后面阻塞了,会影响前面的写入操作,这块我们后面会持续优化。现在我们也在 Iceberg 社区开了一个 design doc 文档在推进,跟社区讨论进行合并的相关工作。
1.近实时的数据接入
前面介绍了 Iceberg 既支持读写分离,又支持并发读、增量读、小文件合并,还可以支持秒级到分钟级的延迟,基于这些优势我们尝试采用 Iceberg 这些功能来构建基于 Flink 的实时全链路批流一体化的实时数仓架构。如下图所示,Iceberg 每次的 commit 操作,都是对数据的可见性的改变,比如说让数据从不可见变成可见,在这个过程中,就可以实现近实时的数据记录。
图 14
此前需要先进行数据接入,比如用 Spark 的离线调度任务去跑一些数据,拉取,抽取最后再写入到 Hive 表里面,这个过程的延时比较大。有了 Iceberg 的表结构,可以中间使用 Flink,或者 spark streaming,完成近实时的数据接入。
基于以上功能,我们再来回顾一下前面讨论的 Kappa 架构,Kappa 架构的痛点上面已经描述过,Iceberg 既然能够作为一个优秀的表格式,既支持 Streaming reader,又可以支持 Streaming sink,是否可以考虑将 Kafka 替换成 Iceberg?Iceberg 底层依赖的存储是像 HDFS 或 S3 这样的廉价存储,而且 Iceberg 是支持 parquet、orc、Avro 这样的列式存储。有列式存储的支持,就可以对 OLAP 分析进行基本的优化,在中间层直接进行计算。例如谓词下推最基本的 OLAP 优化策略,基于 Iceberg snapshot 的 Streaming reader 功能,可以把离线任务天级别到小时级别的延迟大大的降低,改造成一个近实时的数据湖分析系统。
图 15
在中间处理层,可以用 presto 进行一些简单的查询,因为 Iceberg 支持 Streaming read,所以在系统的中间层也可以直接接入 Flink,直接在中间层用 Flink 做一些批处理或者流式计算的任务,把中间结果做进一步计算后输出到下游。
总的来说,Iceberg 替换 Kafka 的优势主要包括:
实现存储层的流批统一
中间层支持 OLAP 分析
完美支持高效回溯
存储成本降低
数据延迟从实时变成近实时
对接其他数据系统需要额外开发工作
图 16
由于 Iceberg 本身是将数据文件全部存储在 HDFS 上的,HDFS 读写这块对于秒级分析的场景,还是不能够完全满足我们的需求,所以接下去我们会在 Iceberg 底层支持 Alluxio 这样一个缓存,借助于缓存的能力可以实现数据湖的加速。这块的架构也在我们未来的一个规划和建设中。
图 17
如图 18 所示,腾讯内部已经实现了 Iceberg 的完全 SQL 化,其实我们在 table properties 里面可以设置一些小文件合并的参数,例如 snapshot 达到多少进行一次合并,一共有多少个 snapshot 时进行合并等,这样底层就可以直接通过一条 insert 语句启动 Flink 入湖任务,整个任务就可以持续运行,后台数据的 datafiles 文件也会在后台自动完成合并的操作。
图 18
下面这张图就是 Iceberg 中数据文件和数据文件对应的 meta 文件的信息,因为现在社区开源的 IceberFlinkSink 还没有文件合并的功能,可以尝试打开一个比较小的流处理任务,然后在自己电脑上跑一下,可以看到 Flink 任务运行之后,一段时间后,对应目录的文件数就会暴涨。
图 19
利用了 Iceberg 的实时合并小文件功能之后,可以看到文件数其实是可以控制在一个比较稳定的数量。实现实时数据的增量读取,可以将其配置到 Iceberg 的 table properties 参数里面,并且可以指定从哪个 snapshot 开始消费。如果指定了从哪个 snapshot 消费之后,每次 Flink 任务启动,就只会读取当前最新 snapshot 里面新增的数据。
图 20
在本实例中,开启了小文件合并的功能,最后用 SQL 启动了一个 Flink sink 的入湖任务。当前用户非常希望所有的任务都用 SQL 来解决,小文件合并的功能其实只适用于在线上跑的一些 Flink 任务,相较于离线任务来说,每一次 commit 周期内它所生成的文件数量或者文件大小都不会特别大。但当用户的任务跑了比较长的时间,底层的文件可能已经成千上万个了,这个时候直接在线上用实时的任务去做合并显然是不合适的,并可能会影响到线上实时任务的时效性,我们可以通过使用 SQL extension 来处理小文件合并,或者是删除遗留的文件,或者是过期 snapshot。我们内部其实已经实现了通过用 SQL extension 的方式来管理 Iceberg 在磁盘上的数据和数据元信息的文件,后面我们会持续的往 SQL extension 增加更多的功能,来完善 Iceberg 的可用性,提升用户体验。
图 21
图 22
- Row-level delete 功能。在用 Iceberg 构建整个数据链路的过程中,如果有数据的更新怎么办?Iceberg 当前只支持 copy on write 的 update 的能力,copy on write 对写是有一个放大的作用,如果要真正的在整个链路上构建一个实时数据处理过程,还是需要一个高效的 merge on read 的 update 能力。这是非常重要的,后面我们也会再继续跟社区合作,腾讯内部也会去做一些实践,去完善 Row-level delete 的功能。
- SQL Extension 能力完善。我们会更加完善 SQL Extension 的能力。
- 建立统一索引加速数据检索。Iceberg 现在并没有统一的索引来加速数据检索,现在我们也在跟社区合作,社区也提出了一个 Bloom Filter 的索引能力,通过构建统一的索引,可以加速 iceberg 检索文件的能力。
在 Iceberg 的内核提升方面,我们主要是希望先能够把这些功能给完善。
首先,自动 Schema 识别抽取建表。希望能够自动的根据前端的数据 Schema 信息,能够自动的将这个表给创建出来,更方便用户去使用整个数据入湖的一个流程。
其次,更便捷的数据元信息管理。Iceberg 现在的元信息其实都是裸的,都是直接放在 hive metastore 上的,如果用户需要查看数据元信息,其实还需要去跑 SQL,我们希望在平台化的建设中把它给继续的完善。
第三,基于 Alluxio 打造数据加速层。希望用 Alluxio 打造一个数据湖加速层功能,以方便上层更加好的去实现一个秒级分析的能力。
第四,与内部各系统打通。其实我们内部还有很多像实时离线分析的各个系统,我们也是需要将我们整个平台跟内部的各个系统之间进行一个打通串联的工作。
▼ 关注「 数仓中文社区」,获取更多技术干货 ▼