流利说基于Apache Hudi构建实时数仓的实践
The following article is from 流利说技术团队 Author Bruce, Ibson
本文目录
背景
实时数仓可以为我们带来什么
技术方案选型
数据存储方案选型
Flink 开发平台选型
维表存储方案选型
Connector 开发
实时数仓整体架构
数据开发案例
数据接入
数据处理
展望
背景
流利说最开始的实时数仓是基于 Flink 1.9 搭建的,彼时Flink SQL 的生态也不够成熟,我们投入了大量的精力,使用 Yarn 做资源管理,修改了 SQL-Client 等组件,优化提交方式来提高开发效率,但是随着 Flink 社区的快速发展,此时 Flink 1.13 版本已经相对稳定,Flink SQL connector 也更加完善,可以将 Flink on K8s 资源管理应用于生产环境,所以我们将实时数仓中 Flink 的版本往前推进了一大步,并随即带动了其他生态组件的引入或升级。
实时数仓可以为我们带来什么
得益于开源社区对实时计算的推动,针对我们当下的数据平台的情况,实时数仓能够为我们解决的是
数据同步 秒级别实时数据处理 打通湖上数据达到实时离线一体化
技术方案选型
数据存储方案选型
现在主流的数据糊存储方案主要有 Delta、Hudi 和 Iceberg,Delta 与 Spark 是深度融合的,但由于我们已经选定了 Flink 作为实时计算引擎,故数据湖存储方案会在 Hudi 和 Iceberg 中进行比较,比较的详情如下。
比较维度 | Hudi | Iceberg |
---|---|---|
云厂商支持程度 | 阿里云、AWS | 阿里云 |
小文件合并 | 自动处理 | V1 版本表支持,需要手动处理 |
稳定性 | 稳定 | 稳定 |
是否支持sql | 支持 | 支持 |
更新删除支持情况 | 支持 | V2 版本表支持 |
主流执行引擎支持 | Flink、Spark、Trino、Presto | Flink、Spark、Trino、Presto |
语言支持程度 | Java、Scala、Python 及 SQL | Java、Scala、Python 及 SQL |
由于 Iceberg V1 版本表不支持 Upsert ,Iceberg V2版本支持 Upsert 但是在小文件合并时存在问题,且 Iceberg 需要通过额外任务来进行小文件合并,最终选择 Hudi 作为数据湖存储方案。
Flink 开发平台选型
当前流利说大数据体系依托于 Aliyun EMR 集成 Hadoop 环境,维护实时数据运维平台成本较高,所以调研之后选择了阿里云实时计算 Flink (VVP)作为我们的开发平台。
我们关注 VVP 具备的能力主要有如下几个方面:
基于 K8s 进行部署,支持 Session 和 Appliction 模式提交任务,支持弹性伸缩 Flink SQL 开发界面,交互式的 Debug 数据输出,代码版本控制 集群高可用 元数据管理,支持接入独立的 Hive Metastore 自定义 Connector 和 UDF Checkpoint 支持读写 OSS 任务监控支持外部的 Prometheus 多版本任务
VVP 提高了我们的开发效率,让我们更加专注于实现数据架构。
维表存储方案选型
Flink 官方提供 Connector 的支持情况见官网 Supported Connectors,根据业务情况整理如下几种类型。
Name | Version | Source | Sink |
---|---|---|---|
JDBC | Bounded Scan, Lookup | Streaming Sink, Batch Sink | |
HBase | 1.4.x & 2.2.x | Bounded Scan, Lookup | Streaming Sink, Batch Sink |
Elasticsearch | 6.x & 7.x | Not supported | Streaming Sink, Batch Sink |
从上表中可以看出,Flink 官方已经可以使用 JDBC 和 HBase 作为维表进行 Lookup Join,特性如下。
Name | 优点 | 缺点 |
---|---|---|
JDBC | 官方支持,可直接使用业务库作为维表,无需数据同步 | 业务高峰时对数据库性能有较大损耗,可能对业务造成影响 |
HBase | 官方支持,分布式服务,可支持高QPS,通过 RowKey 可进行快速检索 | 仅支持通过 RowKey 进行关联,Flink 1.13.x 对删除操作存在一定争议 |
Elasticsearch | 分布式服务,可支持高 QPS,可通过唯一 Id 进行检索,也可通过其他字段进行关联 | 官方不支持,维表过大,效率较 HBase 低 |
综上,因历史数据中存在需要通过多个字段进行关联的情况,需要通过非唯一 id 进行关联,如果使用 HBase 需将一份数据存储为多份数据,会造成一定资源浪费,且会导致系统比较复杂,不利于后续的维护和管理。对于 Elasticsearch 检索较慢,可以在业务允许的范围内,通过 Cache 来进行一定程度优化。
Connector 开发
基于官方提供的文档、接口和我们的需求,我们开发了 Flink-Elasticsearch-connector 并同时支持 Source (Scan, Lookup)和 Sink(Streaming Sink, Batch Sink),即读取和写入数据。
实时数仓整体架构
实时数仓平台架构图如上,存储托管于 Kafka 和 OSS,OSS 中的表主要是 Hudi 类型,VVP 是阿里云托管的实时计算平台,主要提供开发,运维和资源管理等功能,实现 Flink 数据开发运维一体化,提高业务开发效率。
实时计算数据流图如上,几个重要的关键点:
a. 业务数据库和日志数据分别通过 CDC 和 Flume 的方式采集到 Kafka 作为源数据;
b. 维表数据通过 Flink 连接器同步存入 Elasticsearch;
c. Flink Join 维表数据计算存储到 Kafka, 这里采用 Kafka 的原因主要是 User Profile Platform(用户画像系统)对实时性要求较高;
d. Flink 从源数据中同步存入 Hudi 并进行分层处理计算;
e. Hudi 表的所有源数据会自动同步到流利说独立部署的 Hive Metastore;
f. 使用 Trino 引擎对数据做实时在线分析,Superset 报表可以自动生成 Trino SQL 来获取结果,SQL Buffet(流利说数据分析平台)提交自定义 SQL 交互式分析;
g. Hudi 的数据也可以被其他系统所使用,例如数据质量,数据指标等。
数据开发案例
数据接入
首先通过 CDC 将 Mysql 的 Binlog 实时同步到 Kafka 对应 Topic 中,相应数据格式为 Debezium Json。
Hudi 表接入开发流程
使用阿里云实时计算 Flink 作为流式任务调度和任务管理平台
通过 FlinkSQL 将历史数据同步到 Hudi 表中 FlinkSQL 语句
-- Hudi 表 DDL 语句
CREATE TABLE IF NOT EXISTS `catalog`.`db_name`.`table_name` (
id BIGINT,
xxx STRING,
`data_date` VARCHAR(20),
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'hudi',
'table.type' = 'COPY_ON_WRITE',
'write.tasks' = '2',
'index.global.enabled' = 'true',
'index.bootstrap.enabled' = 'true',
'read.streaming.enabled' = 'true',
'oss.endpoint' = 'xxx',
'accessKeyId' = 'xxx',
'accessKeySecret' = 'xxx',
'path' = 'oss://xxx',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.db' = 'hudi',
'hive_sync.metastore.uris' = 'xxx',
'hive_sync.table' = 'xxx',
'compaction.tasks' = '1',
'hoodie.cleaner.commits.retained' = '10',
'hoodie.datasource.write.precombine.field' = 'updated_at',
'hoodie.datasource.write.recordkey.field' = 'id'
);
-- DML 语句
INSERT INTO `catalog`.`db_name`.`table_name`
SELECT id,
xxx,
`data_date`
FROM kafka_streaming_table
WHERE id != null;
通过 Flink SQL 任务实时同步数据到 Hudi (使用 COPY_ON_WRITE 模式),时效性在分钟级别(Checkpoint 时间有关) Hudi 表中已经有历史数据时,需要支持按照 'PRIMARY KEY' 进行去重时 首次接入流式数据需设置 index.bootstrap.enabled=true
,保证数据的唯一性,后续可直接通过状态进行恢复即可要支持更新和删除必须在 DDL 中指定 PRIMARY KEY,或者在 Table Properties 中设置 hoodie.datasource.write.recordkey.field=${YouPrimaryKey}
需要指定更新的 Precombine Key 须在 Table Properties 中设置 hoodie.datasource.write.precombine.field=${YouPrecombineKey}
表使用 COPY_ON_WRITE 对读友好,通过 hoodie.cleaner.commits.retained
来设置保留的数据历史版本数量,对历史记录进行清理同步元数据到 Hive Metastore 需设置如下参数
参数 | 类型 | 备注 |
---|---|---|
hive_sync.enable | Boolean | 是否允许进行元数据同步到 Hive |
hive_sync.mode | String | 指定元数据同步采用 hms(Hive Metastore) |
hive_sync.metastore.uris | String | Hive Metastore 地址 |
hive_sync.db | String | 同步到 Hive Metastore 中对应的数据库名 |
hive_sync.table | String | 同步到 Hive Metastore 中对应的表名 |
业务方可通过 Trino 直接对 Hudi 表进行交互式在线查询,需注意版本要支持对 Hudi 表的读取,我们使用的对应 Trino 版本为 359。
ES 维表接入
将开发好的自定义 Flink-Elasticsearch-connector 打包,上传到阿里云实时平台进行注册。
通过 FlinkSQL 读取 Kafka 中的数据,写入到 Elasticsearch 中,Flink SQL语句如下
-- ES 表 DDL语句
DROP TABLE IF EXISTS es_dimension01;
CREATE TABLE IF NOT EXISTS es_dimension01(
id STRING,
xxx STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch6-lls',
'format' = 'json',
'hosts' = 'http://xxx:9200',
'index' = 'es_dimension01',
'username' = 'xxx',
'password' = 'xxx'
);
-- DML 读取 Kafka 写出到 Elasticsearch
INSERT INTO es_dimension01
SELECT id,
xxx,
FROM kafka_xxx_dimension;
其中 kafka_xxx_dimension 中为同步 binlog 数据,Flink 解析其中 Debezium Format 数据转换为 RowData(其中包含 RowKind — 标识该行数据的操作类型) Elasticsearch-connector 支持 upsert,通过设置 es_dimension01 表设定 PRIMARY KEY 对应 Elasticsearch 中的唯一 Id 在进行 upsert 操作时通过 Id 对数据进行修改和删除操作
数据处理
使用一个标签任务来说明使用流程,该标签任务通过 Flink 消费 Kafka 对应 Topic 中数据,通过关联 Elasticsearch维表进行维度补全,最后将结果输出到指定的 Kafka topic 中,DML 语句如下:
INSERT INTO kafka_sink
SELECT cast (a.id as BIGINT) as id,
'lable0' AS lable,
CASE
WHEN f.id is NOT null THEN f.start_sec * 1000 ELSE e.start_sec * 1000
END AS sourceTime,
CASE
WHEN f.id is NOT null THEN f.end_sec * 1000 ELSE e.end_sec * 1000
END AS expiredAt,
a.updated_at AS updated,
'dmp' as type
FROM
kafka_stream_source a
JOIN es_llspay_order_items FOR SYSTEM_TIME AS OF a.proctime AS b
ON a.id = b.id
JOIN es_dimension01 FOR SYSTEM_TIME AS OF a.proctime AS c
ON b.id = c.id
JOIN es_dimension02 FOR SYSTEM_TIME AS OF a.proctime AS d
ON a.id = d.id LEFT
JOIN es_dimension03 FOR SYSTEM_TIME AS OF a.proctime AS e
ON e.id = d.id LEFT
JOIN kafka_stream_dimension01 AS f
ON CAST (f.id AS STRING) = d.id
AND a.proctime between f.proctime - interval '1' second
and f.proctime + interval '5' second
WHERE
c.name
LIKE '%label1%'
OR c.name
LIKE '%label2%'
AND (
e.id IS NOT NULL
OR f.id IS NOT NULL
);
上面标签任务中,kafka_stream_source 为流表,对应的为 Kafka 中的 topic,通过 Lookup Join 维表 es_dimension01、es_dimension02 和 es_dimension03 进行维度补全,因为 es_dimension03 对应的表更新较频繁,为避免数据未 Join 成功,在这里通过 Interval Join 流表 kafka_stream_dimension01 来进行补偿,当流表 kafka_stream_dimension01 和 es_dimension03 中数据同时 Join 到时,优先使用 kafka_stream_dimension01 中数据,尽可能保证数据的准确性,该标签任务的整体的延迟在秒级,可保证数据的时效性。
展望
实时数仓目前还不能解决所有的计算和存储问题,只能根据业务需求场景来选择离线计算还是实时计算,当前流利说整体数据架构采用的是 Lambda 架构,且离线计算以 Spark 2.4.5 为主(目前 Hive 任务数量已经清零,离线计算全面拥抱 Spark),从最初 Hive 2.X 到 Spark 2.X 一步步升级上来,Hudi 的引入让我们可以方便的查询实时数据,我们希望未来升级完 Spark 3.X 的时候,Hudi 可以继续作为离线批处理的存储,进而实现在流利说离线和实时数仓存储方面的统一。
推荐阅读