华为基于Hudi构建的实时数据湖架构与实践
导读 本文将介绍华为基于Hudi构建数据湖的整体方案架构,以及在项目中遇到的一些问题和实践经验。
文章分为以下三部分:
1. 需求场景分析&技术选型
2. 技术架构介绍
3. 后续改进点
分享嘉宾|受春柏 华为云 大数据架构师
编辑整理|张了了 聚水潭
出品社区|DataFun
需求场景分析&技术选型
华为当前实践类型是供应、采购、付款、发货等 ERP 业务,属于典型的传统数据业务。该项目有以下特点和要求:
数据源:
① 数据源都是关系型数据库;
② 大部分数据存在更新和删除操作,少部分是日志、流水类数据;
③ 数据关系之间会有大量表的关联;
④ 流量波动会非常大,会受到结算等的影响。
数据加工:
① 从数据库写到 Hudi 的时延不能超过 20s;
② 数据集成且经过 ETL,到最后的数据展示不能超过 5 分钟;
③ 技术栈要求批流一体,简化整体架构;
④ 支持数据回溯、数据更新同步及数据结构变更等场景;
⑤ 低成本开发。
数据结果查询:
① 报表类查询和 BI 分析分别要达到亚秒级和秒级响应;
② 支持多表关联查询;
③ TB 级数据量秒级查询。
高可靠:
① 支持全链路监控;
② 支持指标监控与告警,这一能力至关重要,只有这样才能及时发现实时任务的异常;
③ 具备跨机房、跨区域的容灾能力。
以上就是整体的业务特点分析,整体要求最高优先级的作业数据端到端严格一致,低时延以及高可用。
02
技术架构选型
1. 数据湖技术选型
技术选型时主要参考了 Iceberg 和 Hudi 两个组件,从上图可以看到一些功能的对比,Hudi 的实时性、更新能力、ACID 以及数据保序的能力对于业务场景中构建端到端的实时性是非常重要的几个核心要素。
实时性:数据表的读写性能直接影响到数据加工处理的性能,尤其是 Hudi MOR 的增量读写能力,极大的提升了表的读写性能。
更新能力:按传统数据湖的实现方案,数据更新是很复杂的,一般要构建拉链表,再通过关联处理找出要删除和更新的数据来进行操作,表的量级决定了执行的耗时,因此从数据入湖到处理完成性能会非常差,而支持数据更新能力后就会大大提升这块的性能。
ACID:传统多任务会存在完整的依赖关系,上游任务执行完毕下游才会执行,从整体来看,各个业务内部以及跨业务之间作业依赖关系会很复杂,端到端的性能也会比较差。有了 ACID 能力后,就可以保证读到的数据一定是成功写入的完整数据,这样整个调度的策略就会发生变化。不再依赖于 Pipeline 的调度关系,上游有数据过来,下游立马可以获取处理,上下游作业由串行调整为并行;
数据保序:在数据写入过程中,尤其是基于 CDC 技术采集业务数据库数据时,时序性是非常关键的,当数据写入的顺序被打乱后数据的准确性就很难保证;另外数据源的业务系统出现异常时,数据可能会重新发送,如果没有保序的能力计算结果的正确性也是得不到保证的。
从功能上来看 Hudi 更能满足项目要求。同时从后续演进来看,Hudi 的社区活跃度更高,基于可以持续演进的考虑,将 Hudi 做为底层的架构组件。
2. 实时数据湖整体架构
在整体方案中,相较于传统方案,有两个核心的调整:
(1)ETL 到 ELT 的转变
传统方式是 ETL,在入仓过程中进行数据的转换处理;而在大数据尤其是在实时数据湖场景中,采用 ELT 模式进行处理,先将数据加载到湖中,构建贴源数据层后在进行数据的处理,好处是充分利用大数据的算力,而且入湖时延低。
(2)批量模式转为增量批处理与流式处理
基于 Hudi 构建的流式处理,通过使用 Hudi 来替换 Kafka 的存储,从而实现了实时链路中所有数据的持久化存储,这也是和传统的 Kappa 架构较大的区别。
而增量批处理也是基于 Hudi 的一种新的加工方式。增量批模式和传统批量模式主要体现在几个方面,第一个增量批模式的调度周期可以做到分钟级别,而传统的批量模式大部分是小时级到天级;另一个是数据读取模式,基于 Hudi 构建的 Timeline 可以快速读取增量数据,而批量模式需要通过分区或者条件过滤实现增量数据的读取;第三个区别是处理数据量的变化,根据调度周期和增量读取获得的数据会少很多,因此大表关联可能会变成小表关联或大小表关联,整体的处理性能和资源消耗会降低。
基于流式或者增量的方式,可将计算分布到全天 24 小时,而传统批处理都是凌晨集中数据处理,集群资源利用率会得到提升。
从架构图中可以看到数据集成有两个通道,分别是批量及实时:
① 批量通道:负责系统上线的数据初始化入湖,以及异常情况的批量数据恢复。利用批量的高吞吐快速完成系统上线。
② 实时通道:为了提升数据时效采用的实时入湖通道,以及将湖内加工的结果数据快速同步到数据集市层,共下游业务快速消费数据。
流批一体的处理模式,流批一体在四个方面得到了体现:
① 数据一体:基于 Hudi 可以实现流批数据的统一存储,传统 Lambda 架构的数据不一致问题。
② 元数据一体:统一了数据湖的元数据,各个引擎统一复用,规避了元数据定义不一致带来的数据问题。
③ 引擎一体:基于 Flink 的引擎实现了流式处理与批量处理的计算引擎一致。
④ 代码一体:FlinkSQL on Hudi 的加工模式,处理逻辑的 SQL 一致,支持流批的运行模式。
通过数据集成,将数据源、数据湖、数据集市无缝打通,实现了数据的快速流转。
3. 数据实时接入
在数据实时接入的目标是低时延、数据一致性及低成本,遇到了以下挑战:
① 项目初期使用布隆索引,Hudi 的写入性能无法满足要求;
② 要满足大吞吐的数据传输和写入,同时保证性能,分布式怎么做到严格保序;
③ 要保证幂等写入,开启 precombine 之后,对应的字段怎么选择;
④ 在数据采集及写入过程中,怎么实现断点续传能力;
⑤ 怎么实现故障的自恢复及数据完整性保证;
⑥ 流量波动时,怎么实现过载保护,保证稳定运行。
针对以上挑战,我们采取了如下解决方案:
① 存储模型:选择分区表 +MOR+Bucket 索引,Bucket 索引通过主键 Hash 来实现唯一性判断,相较于布隆索引性能更高、稳定性很好;通过日期分区来规避随着表数据增加单桶数据过大问题,以及增量读写桶数过多带来的资源消耗大问题。
② 性能提升:首先是 Bucekt 索引的选择是提升性能的一个关键,其他的优化点包括文件 list 优化;compaction、clean、archive 的异步化 & 以及参数的优化。
③ 数据一致性:采用数据库事务的提交时间做为 precombine 字段;数据传输按照表的主键 Hash 后分布式传输,保证了各主键数据有序性;数据完整性由心跳表机制保证。
④ 成本控制:在实时数据同步过程中,基于各类业务数据发生时间错开的情况,实现多表之间资源复用以及动态资源伸缩的方式来降低资源消耗成本;同时通过可视化无代码的方式和自动化的方式来降低开发成本。
⑤ 可持续性:通过运维监控保证集成任务的稳定运行,在实时场景中增加数据积压、任务状态及数据时延等指标;业务系统是不断演进的,Hudi 的 Schema 演进能力保证数据湖的数据能够低成本的跟随业务系统进行演进。
4. 数据实时处理
数据存储是按照数仓的分层模型进行存储的,分为贴源层 -> 明细层 -> 汇总层 -> 应用层,在实时场景中个别业务可以根据实际情况进行跳层计算,例如:贴源层的数据质量比较高,可以直接实现由贴源层到汇总层的加工。在整体的加工模式中,以 FlinkSQL 的流式模式处理,少量采用批模式;月度和年度汇总统计采用 SparkSQL 跑批。
在这个过程中主要遇到了以下挑战:
① 流量波动的情况下,怎么保证流的稳定性;
② 多流计算及跨数据中心之间的数据对齐问题;
③ 新增流量和历史流量的关联以及技术上大维表的挑战;
④ 数据初始化通过批量去跑,异常时通过 Spark 去跑,如何保证 Spark 和 Flink 写入的一致性;
⑤ Flink On Hudi 方案中怎么避免数据重复。
针对以上问题,我们主要做了以下调整:
① 业务逻辑复杂,需要大量表进行关联的场景,通过分层建模方式,采用多迭代的方式,降低单一作业的复杂度;
② 流量突增场景,采用记录级限流,避免因 OOM 造成宕机;作业资源配置模型进行优化;
③ 维表关联场景,Hudi 中的表可以直接作为维表,针对中小型的维表,可以直接加载到内存做 lookup,性能会有明显提升;
④ MOR 表读优化,根据 clean 的事件的 committime 来进行优先级判断,优先读取 Log 文件数据,仅在 Log 文件被清理的场景才会读取 Parquet 文件;
⑤ 多引擎混写一致性问题,Spark 和 Flink 混写导致索引和数据内容的不一致的问题;
⑥ 状态后端优化,将作业级别的 TTL 调整为表级别 TTL,降低状态后端的数据存储。
同时以满足各个业务线的协同开发,后续相继完善了数据存储模型指导、数据开发规范等指导方法,保证线上整体的可持续运行。最终实现的效果是目前单层作业耗时在 10-30s 内,单作业主表能达到万级 TPS。
5. 数据实时查询
数据实时查询主要是用于结果数据的 BI 与报表分析,另外就是湖内数据的探索分析。
① BI 报表,采用数据集成,将结果数据实时同步到 Clickhouse 中,通过 Clickhouse 的实时写入能力和实时查询能力,与数据业务系统对接。
② 数据探索,采用 HetuEngine 查询 Hudi 的数据,完成数据业务探索分析和对结果数据核对验证。
HetuEngine on Hudi 做了性能优化,包括动态过滤、计算下推、CBO 优化等;基于 Clickhouse 实现亿级别的亚秒级查询
6. 项目中的经验
在索引上,一开始我们选择了 bloomfilter 索引,写入数据量多了之后,出现 filegroup 过多和假阳性问题,导致写入性能不断恶化,写入时间在分钟级以上不满足生产需求,因此引入了 Bucket 索引;Bucket 桶在非分区表场景,导致数据太多,写入资源消耗大,因此按照数据量及增量数据特点,表重建为分区表;同时在分区键的选择上,可以按照更新和插入数据的特点,设为月或年级;在数据管理上将 Compaction 及 clean 服务独立出来,避免过大导致的资源耗时,同时 Compaction、archive 及 clean 都采用异步执行,独立进程负责。
7. 典型的开发模式
以上是实际开发中两种比较典型的开发模型,第一种是流式加工,多流之间对齐周期比较长及部分表数据 TTL 过期的场景在这个模型中我们通过另一条流检测来进行修复,如果发现数据异常,增加一个补数的临时表,在主流程中通过 union 补数表的方式,将正确数据写入到目标表。另外一种是增量批实现事实表和维度表关联的场景,在主流程中事实表的增量数据与维度表的全量数据进行关联;辅助流程发生在维度表更新后,通过维度表的增量数据与事实表的全量关联。主流程是业务的通常的业务发生流程,辅助流程是为解决维度表更新对历史数据带来的影响。
03
后续改进点
后续改进点主要在性能和成本两个方面。
1. 性能方面
① 提升 Hudi 表读写性能,目标读写时延万级 TPS 秒级;
② 在数据存储模型、索引构建、统计信息等维度持续加强,进一步提升查询性能。
2. 成本方面
① 提升 Hudi 表的自管理能力,降低维护成本;
② 弹性伸缩能力,优化资源消耗。
04
问答环节Q1:Hudi 写入的表是下游依赖,需要保证端到端一致性,是否可以通过心跳表机制来保证?
A1:心跳表机制主要是保证数据的完整性,而下游数据的正确性需要通过保序的能力及幂等性来保证。心跳表机制是在 TP 库里面做一个定时任务,定时发心跳,心跳随 CDC 采集数据时一并采集过来,因为心跳数据的产生和原有业务的产生在数据库的日志中是保序的,因为可以根据心跳的时间窗去判断数据的完整性。
Q2:Hudi 表实时对外服务的接口是什么?
A2:Hudi 本身是底层的存储结构,不是一个独立的服务,对外提供能力主要依靠计算引擎,Hetu、Spark 等去查询,或者实时性要求更高的场景中,我们会将数据同步至下游的集市,比如 Clickhouse 或者 ES 等去解决多场景的查询能力。
Q3:Bucket Index 中 Bucket 的数量是怎么估算的,资源有什么建议?A3:单个桶的数据量影响了单个读取任务的资源消耗,一般我们建议单个桶的数据量 2GB 比较合适。具体资源消耗与计算逻辑相关,这个要根据实际情况进行评估。
今天的分享就到这里,谢谢大家。
推荐阅读
日增数据超10PB!揭秘沃尔玛Lakehouse架构选型之路
使用 Bucket Index 加速Apache Hudi 写入