尘锋信息基于 Apache Paimon 的流批一体湖仓实践
The following article is from Apache Paimon Author 代欣雨
1. 整库入湖,TB 级数据近实时入湖 2. 基于 Flink + Paimon 的数仓 批 ETL 建设 3. 基于 Flink + Paimon 的数仓 流 ETL 建设 4. 数仓 OLAP 与数据地图
01
尘锋信息介绍
02
选型背景
2.1 老架构
离线数仓:
TiDB + HDFS + Yarn + Apache Hive + Apache Spark + Apache Doris
痛点:
1. 离线数仓延迟过高,且批量从业务库拉取数据同步容易影响业务。
2. 基于 Hive 的离线数仓对于 CDC 采集 和 更新场景 治理建模有较大的侵入性,开发成本较高。
3. HDFS 相比于云厂商提供的对象存储,成本依旧很高。
实时数仓:
Apache Kafka + Apache Flink + StarRocks + K8S
痛点:
1. 实时链路 SR 虽然有较好的流写能力,但不支持流读,不便于数仓依赖复用,每层之间使用Apache Kakfa对接,又造成较大的 开发维护成本。
2. 实时链路使用SR微批调度处理 会导致非常高的资源占用导致 OLAP 慢查 甚至稳定性问题。
3. SR 不支持 Overwrite 等批处理能力。
2.2 新架构需求
结合以上的痛点,我们决定 Q1 进行数仓架构调整,我们的业务需求主要有以下几点:
1. 支持 T+1 、小时级的批处理离线统计。
2. 准实时需求 ,延迟可以在分钟级 (要求入湖端到端延迟控制在 1 分钟左右)。
3. 秒级延迟的 实时需求 ,延迟要求在秒级。
4. 存储成本低,存大量埋点和历史数据不肉疼。
5. 兼容私有化 (整个环境不依赖 Hadoop 、Hive 等比较重的组件,降低部署运维成本)。
结合业务需求,所以我们对存储和计算引擎的需求如下:
1. 较高的 CDC 摄入 及 更新能力。
2. 支持 批写 、批读。
3. 支持 流写 、流读。
4. 端到端延迟 能够 在秒级。
5. 支持 OSS 、S3、COS 等文件系统。
6. 支持 OLAP 引擎。
2.3 为什么选择 Paimon
对 Paimon 进行了深入的调研和验证,发现 Paimon 非常满足我们的需求:
1. 基于 LSM ,具有很高的更新能力,默认的 Changelog 模型可以处理 CDC 采集的变更数据(实测入湖端到端延迟能控制在 1 分钟左右)。另外 Paimon 支持 Append Only 模型,可以覆盖没有更新的日志场景,该模型在写入和读取时不用耗费资源处理更新,可以带来更高的读写性能和更低的资源消耗。
2. 支持 批写 、批读 ,并且支持 (Flink、Spark、Hive 等多种批处理引擎)。
3. 支持 流写、流读 (结合 Flink 的批处理,我们希望后期能够建设流批一体的数据仓库)。
4. Paimon 支持将一张表同时写入 Log System(如 kafka) 和 Lake Store (如 OSS 对象存储),结合 Log System 可以覆盖秒级延迟的业务场景,并且解决了 Kafka 不可查询分析的问题。
5. 支持 OSS 、S3、COS 等文件系统 ,且支持 FileSystem catalog ,可以完全与 Hadoop 、Hive 解耦。
6. 支持 Trino OLAP 引擎,实测 分组分析 5 亿 200GB 数据,30 个 Bucket,能够在 10 秒内出结果 (和社区沟通,还有优化空间),但满足目前需求。另外,Apache Doris 已经开始对接 Paimon格式,相信不久之后 Paimon的OLAP 生态会更加丰富。
03
整库入湖
3.1 实现步骤
■ 3.1.1 Unisync 采集平台
基于 GO 语言开发,自研 Unisync 采集平台, 功能如下:
1. 支持 CDC 增量采集多业务数据库(MongoDB 、TiDB 、MySQL),将不同类型的数据库日志格式进行统一,便于下游使用。
2. 支持 Batch 并行全量读取,且支持故障恢复,避免过程中失败而重新拉取浪费时间。
3. 支持全量 和 增量采集自动切换 ,支持动态加表,加表时可指定是否增量。
4. 支持直接 Sink StarRocks 、Doris 、TiDB 等数据库。
■ 3.1.2 Flink 采样程序
基于 Flink DatasSream API 开发 ,并通过 StreamPark 部署,功能如下:
1. 消费Kafka ,将Kafka 中的半结构化数据(MongoDB) ,进行解析,并将字段 - 类型保存至 State。
2. 有新增的字段自动加入 State 中,并将该条消息补齐字段和类型,发送至下游算子。
3. 自动生成 逻辑 Kafka Table (见上图详解)。
4. 自动生成 Paimon Table 及 入湖 Flink SQL (依赖 Kafka Table 元数据信息,见上图详解)。
5. 入湖 Flink SQL 会将 Kafka Table 中的所有字段列出形成别名,自动使用 UDF 处理 dt 分区字段等等。
■ 3.1.3 Flink + Paimon 入湖程序
1. 每个Flink Job 可以配置读取多个 Kafka Topic ,并设置起始时间 或者 Offset。
2. 程序内部根据 Kafka Topic 查询 MySQL ,获取 Kafka Table 元数据信息。
通过 Flink sql 不仅可以在入湖时做 Map Flatmap 甚至可以多流 Join 、State计算等。
4. 启动时 使用 Paimon 的 Flink Catalog API 根据MySQL 中的Paimon建表语句创建表。
3.2 入湖实践结论
■ 3.2.1 性能
1. 全量整库入湖 80+ 表,近 2TB ,全量写入阶段不处理更新,可以将 checkpoint 设置4分钟左右
2. 对于全量重刷一张大表的情况,需要更新非常多的分区和 bucket ,建议将表Drop后再全量写入
■ 3.2.2 稳定性
分别对一张 Append Only 日志表 和 Change Log 维度表进行增量稳定性测试 (数据量适中);
资源配比都是是 1个 TM 4GB 内存 2 slot;
04
流批一体的数仓 ETL Pipeline
4.1 需求
1. 满足 T+1 / 小时级 的离线数据批处理需求。
2. 满足 分钟级 的 准实时需求。
3. 满足 秒级的 实时需求。
4. 以上三种情况,业务 SQL 不应该做过多侵入,而只需要修改参数和资源占用,就可以进行升降级。
4.2 批
尘锋批处理主要用于覆盖 T+1 和 小时级的业务需求:
1. 存储侧选择 Paimon ,因为 Paimon 支持 Append-only 和 changelog 两种模式,支持 insert overwrite insert into 两种写入方式 。
■ 4.2.1 Paimon 批处理场景
Paimon 支持 Append only 模型 ,配合批覆盖写、批读 ,性能表现表现不亚于 Iceberg 。由于我们的更新场景较多,所以我们更加关注 Changelog 模型的读写:
1. 如上图,通过 Flink + Paimon 测试批读 Changelog 模型(MOR) 220GB 、 一亿左右数据 、 20 并发 ,需要 3 分钟左右,每个 TM 1 slot ,内存分配 2GB 左右。
(注意:由于我们使用测试的服务器是内存型 8C 64GB,所以该项测试数据并不是 Paimon 的最佳性能,理论 CPU 计算型服务器会更加出色,提供以上数据供大家参考)
2. ChangeLog 写入性能可以参考入湖侧。另外对于 Append only 不用处理更新,表现会更加出色,非常适合 insert overwrite 等批覆盖场景。
■ 4.2.2 Flink sql gateway
选择使用 flink sql gateway 进行批处理任务提交和管理的原因如下:
1. sql gateway 具有交互式开发的能力,可以利用 Flink 生态丰富的 connector,非常方便的读取 和 写入
Paimon 、SR、Doris、MySQL、TiDB 、Kafka 等, 甚至可以覆盖部分 OLAP 场景。用于数据开发场景,可以极大的降低 Flink sql 的使用门槛 ,提升开发调试效率 和 降低维护成本
当前我们生产使用基于 Flink 1.16 版本的 sql gateway还有一些不足,于是为了更好的和 dbt 数据构建工具整合,我们基于官方 hiveserver2 endpoint 实现 了 dustess_hiveserver2 endpoint ,增强功能如下:
1. 支持配置式内嵌多种 Catalog ,如 Paimon 、TiDB、SR、Doris、MySQL 等。
2. 支持配置式内嵌多种 Module ,主要是我们内部实现的 UDF 和 UDTF。
3. 修改默认语法为 Default (Flink)。
■ 4.2.3 dbt
我们选用 dbt 作为数据构建工具的原因如下:
1. 可以完全用编写工程代码 (如 Java 、Go 等语言)的方式去构建数据仓库,所有的模型统一在 git 仓库,可以review 、PR 、发布等流程控制,极大的提高模型复用率和避免烟囱开发 。
2. 数据开发只需要开发 select 语句,dbt 可以自动生成结果表结构,以及基于 yml 的模型注释,极大的提高了开发效率 。并且 dbt 支持非常多的 宏 语句,可以将非常多的重复工作复用,并且统一和收敛口径。
4.3 流
4.4 效果
ODS 的数据是使用 Flink 流式准实时写入,湖仓中 DWD 和 DWS 主要的治理需求为:
1. Map、flatmap 转换(对于此场景,流和批的SQL完全一致,只需要做提交 sql 的模式配置)。
2. join 形成宽表 (join 在流场景下复杂度要高于批,Paimon 提供了带有相同 key 的部分列更新,lookup join 等降低复杂度和成本,在 sql 层面和批是一致的)。
05
OLAP
06
数据地图
07
未来规划
7.1 sql gateway 升级
1. 支持 application mode
目前使用批处理任务使用 dbt 通过 flink sql gateway 提交作业。
目前 Flink sql gateway 支持 yarn session 和 yarn per job 两种部署模式,目前有以下问题:
yarn session 启动需要静态指定 JobManger 和 TaskManger 的内存 ,不能根据提交的 SQL 做针对性调优,存在稳定性不佳 或 资源利用率不高的问题。
yarn per job 可以在向 sql gateway 提交时通过 set 语法设置各项内存值,但是 per job 已经过时,且存在单点问题容易导致 sql gateway 不稳定。
2. 支持流任务生命周期维护和管理
目前我们的流任务,虽然可以通过 dbt 编写 sql ,且通过 sql gateway 提交至集群运行 (通过 set 'execution.runtime-mode'='streaming' )。
7.2 Log system 生产结合使用
08
总结
09
Paimon 信息
Apache Paimon 官网:https://paimon.apache.org/
Apache Piamon Github:https://github.com/apache/incubator-paimon
讲师介绍
代欣雨
尘锋信息大数据研发工程师,专注于流/批计算 ,目前对大数据存储也充满兴趣和激情
往期精选