查看原文
其他

Apache Hudi在腾讯的落地与应用

ForwardXu 腾讯 ApacheHudi 2023-04-18

Apache Hudi核心概念

Apache Hudi是一个基于数据库内核的流式数据湖平台,支持流式工作负载,事务,并发控制,Schema演进与约束;同时支持Spark/Presto/Trino/HIve等生态对接,在数据库内核侧支持可插拔索引的更新,删除,同时会自动管理文件大小,数据Clustering,Compaction,Cleanning等

可以基于云存储/HDFS构建基于Hudi的Lakehouse,Hudi支持Parquet、ORC、HFile、avro格式,同时提供了非常丰富的API,如Spark DF、RDD、FlinkSQL、Flink DataStream API,利用这些API可以非常方便地对Hudi表进行操作,同时Hudi也集成了其他生态,如MPP引擎Starrocks,doris等

Hudi的基本概念由Timeline和File Layout组成

  • • Timeline由一个个commit组成,commit包含delta_commit,commit,clean,rollback,replacecommit等,同时每个commit都包含对应的状态,如requested,inflight,completed三种状态,分别代表请求开始处理,正在处理,处理完成。

  • • File Layout主要由FileGroup构成,FileGroup由FileSlice组成,每个FileSlice相当于一个版本,包含一个Base文件和多个Log文件

Hudi支持MOR和COW两种类型,MOR表对流式写入更友好,延迟更低,对于更新的log文件支持同步和异步两种模式进行Compaction生成新的Base文件,以加速查询,支持Snapshot,Read Optimized,Incremental读取

而对于COW表,每次写入需要重写文件,写放大相对严重,延迟相对MOR较高,更适合写少读多的场景。

为了加速数据的更新,Hudi支持多种索引,如分区级别的索引以及全表索引,分区级别的索引可以保证数据在分区内的唯一性,全表索引保证数据在表级的唯一性(开销较大)。Hudi支持了多种类型的索引实现,典型的如BLOOM、BUCKET索引,以及自定义索引等方式。

另外一个核心的概念是Hudi的Table Service,包含Compaction操作,Compaction针对FileSlice进行操作,会将Base文件和其对应的Log文件进行合并,产生新的Base文件;可以通过指定NUM_COMMITS或TIME_ELAPSED两种策略调度执行Compaction,对于调度执行而言,Hudi为不影响主链路的写入,支持了异步调度与执行,以及同步调度与执行,同步调度异步执行方式,满足不同的需求。

另外一个Table Service是Clean,Clean用于删除过期的文件,同样与Compaction类型也提供了多种策略以及调度执行策略,值得注意的是对于做了Savepoint的时间点,其对应的文件不会被删除。

接下来分析对于COW表的不同查询的实现,如在instant 0 时刻写入一部分数据(ABCDE),在instant 1时刻更新A -> A',D -> D',在instant 2时刻更新A' -> A'',E -> E',并插入F 那么对于快照查询(Snapshot Query)每次都是读取的最新的FileSlice,增量查询(Incremental Query)读取指定commit之间的Parquet文件,然后再将时间范围下推至Parquet文件进行过滤,只读取符合条件的变更的数据。

对于MOR表,快照查询(SNAPSHOT Query)读取的是Base文件与Log合并后的最新结果;而增量查询读取指定commit之间的Parquet以及Log文件,然后再对Log文件进行Block级别的过滤(根据Commit时间),合并重复key后返回结果。

CDC数据入湖

这个场景主要是DB数据入湖入仓把原来T + 1的数据新鲜度提升到分钟级别。数据新鲜度通过目前比较火的以Debezium、Maxwell为代表的CDC(change Data Capture)技术实现。以Streaming近实时的方式同步到数仓里面。在传统的Hive数仓中想保证实时是非常困难的,尤其是文件更新,湖表实时写入更新,基本不可能实现。

CDC技术对数仓本身存储是有要求的,首先是更新效率得足够高,能够支持以Streaming方式写入,并且能够非常高效的更新。尤其是CDC log在更新过程还可能会乱序,如何保证这种乱序更新的ACID语义,是有很高要求的,当前能满足乱序更新的湖格式只有Hudi能做到,而且Hudi还考虑到了更新的效率问题,是目前来说比较先进的架构。

图中方案3相比上面的方案,比较适合目前体量比较大(每天增量能达到亿级别地)、数据平台比较健全的公司,中间有一套统一的数据同步方案(汇总不同源表数据同步至消息队列),消息队列承担了数据的容错、容灾、缓存功能。同时,这套方案的扩展性也更加好。通过kafka的topic subscribe方式,可以比较灵活地分发数据。通过以上三种方式入湖hudi,以某数据中台为例已经有6000多张源表写入Hudi日增几十亿数据入湖。

分钟级实时数仓

第二个场景是构造分钟级别的实时数仓,分钟级别的端到端数据新鲜度,同时又非常开放的OLAP查询引擎可以适配。其实是对kappa架构或者是原先Streaming数仓架构的一套新解法。在没有这套架构之前,实时分析会跳过Hudi直接把数据双写到OLAP系统中,比如ClickHouse、ES、MongoDB等。当仓存储已经可以支持高效率分级别更新,能够对接OLAP引擎,那么这套架构就被大大简化,首先不用双写,一份数据就可以保证only one truth语义,避免双写带来数据完整性的问题。其次因为湖格式本身是非常开放的,在查询端引擎可以有更多选择,比如Hudi就支持Presto、trino、Spark、Starrocks、以及云厂商的redshift引擎,会有非常高的灵活度。多层数层数据可见性也从T+1 小时或天缩短到分钟级别。

流式计算PV/UV

Apache Hudi 的Payload是一种可扩展的数据处理机制,通过不同的Payload我们可以实现复杂场景的定制化数据写入方式,大大增加了数据处理的灵活性。Hudi Payload在写入和读取Hudi表时对数据进行去重、过滤、合并等操作的工具类,通过使用参数 "hoodie.datasource.write.payload.class"指定我们需要使用的Payload class。为了实现pv/uv计算,我们实现了 RecordCountAvroPayload ,它可以在对数据去重的时候,将重复数据的数量记录下来,这里的重复指的是HoodieKey(primary key + partition path)相同。以往处理方式是通过flink + window 聚合实现,该方式有延迟数据丢弃和state爆掉风险,Hudi Payload机制则没有这些风险。

多流拼接(大宽表)

上图是一个典型的非常复杂的业务落地, 消息流1由kafka写入hudi商品销售明细表,消息流2由kafka写入hudi用户基本属性表,然后结合hudi商品标签表和hive用户扩展属性表进行实时和离线拼接大宽表。

在实现多流拼接功能前有三个前置条件需要满足:

  1. 1. 基于乐观锁的Timeline

  2. 2. 基于marker的早期冲突检测

  3. 3. 启用occ(乐观并发控制)

这里主要描述基于时间线服务器的标记机制,该机制优化了存储标记的相关延迟。Hudi 中的时间线服务器用作提供文件系统和时间线视图。如下图所示,新的基于时间线服务器的标记机制将标记创建和其他标记相关操作从各个执行器委托给时间线服务器进行集中处理。时间线服务器在内存中为相应的标记请求维护创建的标记,时间线服务器通过定期将内存标记刷新到存储中有限数量的底层文件来实现一致性。通过这种方式,即使数据文件数量庞大,也可以显着减少与标记相关的实际文件操作次数和延迟,从而提高写入性能。

实现的原理基本上就是通过自定义的 Payload class 来实现相同 key 不同源数据的合并逻辑,写端会在批次内做多源的合并并写入 log,读端在读时合并时也会调用相同的逻辑来处理跨批次的情况。这里需要注意的是乱序和迟到数据(out-of-order and late events)的问题。如果不做处理,在下游经常会导致旧数据覆盖新数据,或者列更新不完整的情况。针对乱序和迟到数据,我们对 Hudi 做了 Multiple ordering value 的增强,保证每个源只能更新属于自己那部分列的数据,并且可以根据设置的 event time (ordering value) 列,确保只会让新数据覆盖旧数据。最后结合 lock less multiple writers 来实现多 Job 多源的并发写入。

介绍多流拼接场景下 Snapshot Query 的核心过程,即先对 LogFile 进行去重合并,然后再合并 BaseFile 和 去重后的 LogFile 中的数据。下图显示了整个数据合并的过程,具体可以拆分成以下 两个过程:

  • • Merge LogFile

    • • Hudi 现有逻辑是将 LogFile 中的数据读出来存放在 Map 中,对于 LogFile 中每条 Record,如果 Key 不存在 Map 中,则直接放入 Map,如果 Key 已经存在于 Map 中,则需要更新操作。

    • • 在多流拼接中,因为 LogFile 中存在不同数据流写入的数据,即每条数据的列可能不相同,所以在更新的时候需要判断相同 Key 的两个 Record 是否来自同一个流,是则做更新,不是则做拼接。如图 3 所示,读到 LogFile2 中的主键是 key1 的 Record 时,key1 对应的 Record 在 Map 中已经存在,但这两个 Record 来自不同流,则需要拼接形成一条新的 Record (key1,b0_new,c0_new,d0_new) 放入 Map 中。

  • • Merge BaseFile and LogFile

    • • Hudi 现有默认逻辑是对于每一条存在于 BaseFile 中的 Record,查看 Map 中是否存在 key 相同的 Record,如果存在,则用 Map 中的 Record 覆盖 BaseFile 中的 Record。在多流拼接中,Map 中的 Record 不会完整覆盖 BaseFile 中对应的 Record,可能只会更新部分列的值,即 Map 中的 Record 对应的列。

如上图所示,以最简单的覆盖逻辑为例,当读到 BaseFile 中的主键是 key1 的 Record 时,发现 key1 在 Map 中已经存在并且对应的 Record 有 BCD 三列的值,则更新 BaseFile 中的 BCD 列,得到新的 Record(key1,b0_new,c0_new,d0_new,e0),注意 E 列没有被更新,所以保持原来的值 e0。对于新增的 Key 如 Key3 对应的 Record,则需要将 BCE 三列补上默认值形成一条完整的 Record。

批流探索-广告归因

广告归因是指在用户在广告行为链路中,使用科学的匹配模型两两匹配各环节的行为数据点,可用于判断用户从何渠道下载应用(或打开小程序),通过匹配用户广告行为,分析是何原因促使用户产生转化。广告归因的数据结果是衡量广告效果、评估渠道质量的重要依据,可帮助广告主合理优化广告素材,高效开展拉新、促活营销推广,而实时广告归因则能更及时的应用到优化广告投放的过程中。

在增长买量业务场景中,买量团队在快手、百度、字节等渠道上投放广告,比如某云游戏广告素材,吸引潜在用户点击广告,进入业务开始玩云游戏,也可以下载游戏的APK安装包,从而实现将用户转化成业务新增用户和游戏新增用户的目的。如下图所示,渠道方可以获取用户的点击数据,业务可以获取新增用户的数据,在点击归因链路中,就是将业务新增用户匹配到用户在某渠道上近N天的最后一次广告点击,在正常的业务过程中,先有用户点击广告数据,后有业务新增用户数据,根据离线数据统计经验,点击转化成新增用户的窗口时间最长不超过3天,也就是N=3。


  • • 数据流一,Flink SQL消费点击数据,并通过Upsert方式(row-level update)写入数据湖Hudi点击表,MOR特性取最后一次点击数据。

  • • 数据流二,Flink SQL消费应用宝新增数据,通过Append方式写入数据湖Hudi新增表。

  • • 批处理三,Super SQL读Hudi新增表(当日)、Hudi点击表(近N天)关联,通过Merge Into语法(row-level update)写入归因结果Hudi表。Super SQL底层计算引擎是Spark3,该任务通过US系统每10分钟调度一次。

  • • 数据流四,Flink SQL通过snapshot-id方式(流式读取)将归因结果表实时出湖到CDMQ,保持数据应用接口和方案一一致。

基于Hudi方案优势如下

  • • 准实时和离线数据统一存储,归因率和T+1保持一致,Hudi归因率从原来的80%提升至 85%。

  • • Flink SQL、Super SQL开发简化了编码过程,降低了开发成本。

  • • 稳定性高,一般情况的数据延迟通过US在下个定时周期周期自动修复,维护成本低。

  • • 时效性是10分钟调度+3分钟运行 <15分钟。

批流探索-流转批

在某些业务场景下,我们需要一个标志来衡量hudi数据写入的进度,比如:Flink 实时向 Hudi 表写入数据,然后使用这个 Hudi 表来支持批量计算并通过一个 flag 来评估它的分区数据是否完整从而进一步写入分区数据进行分区级别的ETL,这也就是我们通常说的流转批

上左图中Flink Sink包含了两个算子。第一个writer 算子,它负责把数据写入文件,writer在checkpoint触发时,会把自己写入的最大的一个时间传到commit算子中,然后commit算子从多个上游传过来的时间中选取一个最小值作为这一批提交数据的时间,并写入HUDI表的元数据中。

我们的方案是将这个进度值(EventTime)存储为 hudi 提交(版本)元数据的属性里,然后通过访问这个元数据属性获取这个进度值。在下游的批处理任务之前加一个监控任务去监控最新快照元数据。如果它的时间已经超过了当前的分区时间,就认为这个表的数据已经完备了,这个监控任务就会成功触发下游的批处理任务进行计算,这样可以防止在异常场景下数据管道或者批处理任务空跑的情况。

上右图是一个flink 1分钟级别入库到HUDI ODS表, 然后通过流转批计算写入HUDI DWD表的一个执行过程。

如何解决乱序到来问题, 我们可以通过设置spedGapTime来设置允许延迟到来的范围默认是0 不会延迟到来。

未来规划

感谢


推荐阅读
医疗在线OLAP场景下基于Apache Hudi 模式演变的改造与应用
强强联合!StarRocks 支持 Apache Hudi
Flink SQL操作Apache Hudi并同步Hive使用总结
Lakehouse架构指南
从 Apache Kudu 迁移到 Apache Hudi

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存