摘要:本文整理自 Apache Flink Committer、Flink CDC Maintainer、阿里巴巴高级开发工程师徐榜江(雪尽)在 5 月 21 日 Flink CDC Meetup 的演讲。主要内容包括:
- 基于 Flink CDC 的海量数据的实时同步和转换
Tips:点击「阅读原文」获取演讲PDF~
CDC 是 Change Data Capture 的缩写,是一种捕获变更数据的技术,CDC 技术很早就存在,发展至今,业界的 CDC 技术方案众多,从原理上可以分为两大类:
上图为常见开源 CDC 的方案对比。可以看到 Flink CDC 的机制以及在增量同步、断点续传、全量同步的表现都很好,也支持全增量一体化同步,而很多其他开源方案无法支持全增量一体化同步。Flink CDC 是分布式架构,可以满足海量数据同步的业务场景。依靠 Flink 的生态优势,它提供了 DataStream API 以及 SQL API,这些 API 提供了非常强大的 transformation 能力。此外,Flink CDC 社区和 Flink 社区的开源生态非常完善,吸引了很多社区用户和公司在社区开发共建。Flink CDC 支持全增量一体化同步,为用户提供实时一致性快照。比如一张表里有历史的全量数据,也有新增的实时变更数据,增量数据不断地往 Binlog 日志文件里写,Flink CDC 会先同步全量历史数据,再无缝切换到同步增量数据,增量同步时,如果是新增的插入数据(上图中蓝色小块),会追加到实时一致性快照中;如果是更新的数据(上图中黄色小块),则会在已有历史数据里做更新。Flink CDC 相当于提供了实时物化视图,为用户提供数据库中表的实时一致性快照,用于可以对这些数据做进一步加工,比如清洗、聚合、过滤等,然后再写入下游。
上图为传统数据入仓架构 1.0,主要使用 DataX 或 Sqoop 全量同步到 HDFS,再围绕 Hive 做数仓。此方案存在诸多缺陷:容易影响业务稳定性,因为每天都需要从业务表里查询数据;天级别的产出导致时效性差,延迟高;如果将调度间隔调成几分钟一次,则会对源库造成非常大的压力;扩展性差,业务规模扩大后极易出现性能瓶颈。
上图为传统数据入仓 2.0 架构。分为实时和离线两条链路,实时链路做增量同步,比如通过 Canal 同步到 Kafka 后再做实时回流;全量同步一般只做一次,与每天的增量在 HDFS 上做定时合并,最后导入到 Hive 数仓里。此方式只做一次全量同步,因此基本不影响业务稳定性,但是增量同步有定时回流,一般只能保持在小时和天级别,因此它的时效性也比较低。同时,全量与增量两条链路是割裂的,意味着链路多,需要维护的组件也多,系统的可维护性会比较差。
上图为传统 CDC ETL 分析架构。通过 Debezium、Canal 等工具采集 CDC 数据后,写入消息队列,再使用计算引擎做计算清洗,最终传输到下游存储,完成实时数仓、数据湖的构建。
传统 CDC ETL 分析里引入了很多组件比如 Debezium、Canal,都需要部署和维护, Kafka 消息队列集群也需要维护。Debezium 的缺陷在于它虽然支持全量加增量,但它的单并发模型无法很好地应对海量数据场景。而 Canal 只能读增量,需要 DataX 与 Sqoop 配合才能读取全量,相当于需要两条链路,需要维护的组件也增加。因此,传统 CDC ETL 分析的痛点是单并发性能差,全量增量割裂,依赖的组件较多。03
基于 Flink CDC 的
海量数据的实时同步和转换
Flink CDC 的方案能够给海量数据的实时同步和转换带来什么改善?
Flink CDC 2.0 在 MySQL CDC 上实现了增量快照读取算法,在最新的 2.2 版本里 Flink CDC 社区 将增量快照算法抽象成框架,使得其他数据源也能复用增量快照算法。增量快照算法解决了全增量一体化同步里的一些痛点。比如 Debezium 早期版本在实现全增量一体化同步时会使用锁,并且且是单并发模型,失败重做机制,无法在全量阶段实现断点续传。增量快照算法使用了无锁算法,对业务库非常友好;支持了并发读取,解决了海量数据的处理问题;支持了断点续传,避免失败重做,能够极大地提高数据同步的效率与用户体验。
上图为全增量一体化的框架。整个框架简单来讲就是将数据库里的表按 PK 或 UK 切分成 一个个 chunk ,然后分给多个 task 做并行读取,即在全量阶段实现了并行读取。全量和增量能够自动切换,切换时通过无锁算法来做无锁一致性的切换。切换到增量阶段后,只需要单独的 task 去负责增量部分的数据解析,以此实现了全增量一体化读取。进入增量阶段后,作业不再需要的资源,用户可以修改作业并发将其释放。
我们将全增量一体化框架与 Debezium 1.6 版本做 简单的 TPC-DS 读取测试对比,customer 单表数据量 6500 万,在 Flink CDC 用 8 个并发的情况下,吞吐提升了 6.8 倍,耗时仅 13 分钟,得益于并发读取的支持,如果用户需要更快的读取速度,用户可以增加并发实现。
Flink CDC 在设计时,也考虑了面向存储友好的写入设计。在 Flink CDC 1.x 版本中,如果想实现 exactly-once 同步,需要配合 Flink 提供的 checkpoint 机制,全量阶段没有做切片,则只能在一个 checkpoint 里完成,这会导致一个问题:每个 checkpoint 中间要将这张表的全量数据吐给下游的 writer,writer 会将这张表的全量数据混存在内存中,会对其内存造成非常大的压力,作业稳定性也特别差。Flink CDC 2.0 提出了增量快照算法后,通过切片能够将 checkpoint 粒度降至 chunk, 并且 chunk 大小是用户可配置的,默认是 8096 条,用户可以将其调至更小,减轻 writer 的压力,减少内存资源的使用,提升下游写入存储时的稳定性。
全增量一体化之后, Flink CDC 的入湖架构变得非常简单,且不会影响业务的稳定性;能够做到分钟级的产出,也就意味着可以实现近实时或实时分析;并发读取实现了更高的吞吐,在海量数据场景下有着不俗的表现;链路短,组件少,运维友好。
有了 Flink CDC 之后,传统 CDC ETL 分析的痛点也得到了极大改善,不再需要 Canal、Kafka 消息队列等组件,只需要依赖 Flink,实现了全增量一体化同步和实时 ETL 加工的能力,且支持并发读取,整个架构链路短,组件少,易于维护。
依托于 Flink DataStream API 以及易用的 SQL API ,Flink CDC 还提供了非常强大完善的 transformation 能力,且在 transformation 过程中能够保证 changelog 语义。在传统方案里,在 changelog 上做 transformation 并保证 changelog 语义是非常难以实现的。
海量数据的实时同步和转换示例 1:Flink CDC 实现异构数据源的集成
这个业务场景是业务表比如产品表和订单表在 MySQL 数据库里,物流表存在 PG 数据库里,要实现异构数据源的集成,并且在集成过程做打宽。需要将产品表、订单表与物流表做 Streaming Join 之后再将结果表写入库里。借助 Flink CDC,整个过程只需要用 5 行 Flink SQL 就能够实现。这里使用的下游存储是 Hudi,整个链路可以得到分钟级甚至更低的产出,使围绕 Hudi 做近实时的分析成为了可能。
海量数据的实时同步和转换示例 2:Flink CDC 实现分库分表集成
Flink CDC 对分库分表做了非常完善的支持,在声明 CDC 表时支持使用正则表达式匹配库名和表名,正则表达式意味着可以匹配多个库以及这多个库下的多张表。同时提供了 metadata column 的支持,可以知道数据来自于哪个 数据库、来自于哪张表,写入下游 Hudi 时,可以带上 metadata 声明的两个列,将 database_name、table_name 以及原始表中的 主键(例子中为 id 列)作为新的主键,只需三行 Flink SQL 即可实现分库分表数据的实时集成,非常简单。
依托于 Flink 丰富的生态,能够实现很多上下游的扩展,Flink 自身就有丰富的 connector 生态。Flink CDC 加入之后,上游有了更丰富的源可以摄取,下游也有丰富的目的端可以写入。
海量数据的实时同步和转换示例 3:三行 SQL 实现单品累计销量实时排行榜
这个 Demo 演示在无需任何依赖的前提下,通过 3 行 SQL 实现商品的实时排行榜。首先在 Docker 里添加 MySQL 和 ElasticSearch 镜像, ElasticSearch 是目的端。将 Docker 拉起后,下载 Flink 包以及 MySQL CDC 和 ElasticSearch 的两个 SQL Connector jar。拉起 Flink 集群和 SQL Client。在 MySQL 内建库建表,灌入数据,更新后再用 Flink SQL 做一些实时加工和分析,写入 ES。在 MySQL 的数据库里构造一张订单表并插入数据。
上图第一行 SQL 是创建订单表,第二行是创建结果表,第三行是做 group by 的查询实现实时排行榜功能,再写入到第二行 SQL 创建的 ElasticSearch 表中。
我们在 ElasticSearch 里做了可视化呈现,可以查看到随着 MySQL 中订单源源不断地更新,ElasticSearch 的排行榜会实时刷新。
在过去的一年多时间,社区发了 4 个大版本, contributor 和 commits数量在不断增长,社区也越来越活跃。我们一直坚持将核心的 feature 全部提供给社区版,比如 MySQL 的百亿级超大表、增量快照框架、MySQL 动态加表等高级功能。
最新的 2.2 版本中同样新增了很多功能。首先,数据源方面,支持了 OceanBase、PolarDB-X、SqlServer、TiDB。此外,不断丰富了 Flink CDC 的生态,兼容了 Flink 1.13 和 1.14 集群,提供了增量快照读取框架。另外,支持了 MySQL CDC 动态加表以及对 MongoDB 做了完善,比如支持指定的集合,通过正则表达式使其更加灵活友好。
除此之外,文档也是社区特别重要的一部分。我们提供了独立的版本化社区网站,在网站里不同版本对应不同版本的文档,提供了丰富的 demo 以及中英文的 FAQ,帮助新手快速入门。
在社区的多个关键指标,比如创建的 issue 数,合并的 PR 数,Github Star 数上,Flink CDC 社区的表现都非常不错。
Flink CDC 社区的未来规划主要包含以下三个方面:- 框架完善:增量快照框架目前只支持 MySQL CDC ,Oracle、PG 和 MongoDB 正在对接中,希望未来所有数据库都能够对接到更好的框架上;针对 Schema Evolution 和整库同步做了一些探索性的工作,成熟后将向社区提供。
- 生态集成:提供更多 DB 和更多版本;数据湖集成方面希望链路更通畅;提供一些端到端的方案,用户无须关心 Hudi 和 Flink CDC 的参数。
提问
Qustions
&
解答
Answers
Q1
CDC 什么时候能够支持整库同步以及 DDL 的同步?
正在设计中,因为它需要考虑到 Flink 引擎侧的支持与配合,不是单独在 Flink CDC 社区内开发就可以实现的,需要与 Flink 社区联动。
Q2
什么时候支持 Flink 1.15?
目前生产上的 Flink 集群还是以 1.13、1.14 为主。社区计划在 2.3 版本中支持 Flink 1.15,可以关注 issue:https://github.com/ververica/flink-cdc-connectors/issues/1363,也欢迎贡献。
Q3
有 CDC 结果表写入 Oracle 的实践吗?
1.14 版本的 Flink 暂不支持,这个是因为 Sink 端的 JDBC Connector 不支持 Oracle dialect,Flink 1.15 版本的 JDBC Connector 已经支持了 Oracle dialect,1.15 版本的 Flink 集群可以支持。
Q4
下个版本能否支持读取 ES?
还需要考察 transactional log 机制以及它是否适合作为 CDC 的数据源。
Q5
能做到单 job 监控多表 sink 多表吗?
可以实现单作业监控多表 sink 到多个下游表;但如果是 sink 到多表,需要 DataStream 进行分流,不同的流写到不同的表。
Q6
Binlog 日志只有最近两个月的数据,能否支持先全量后增量读取?
默认支持的就是先全量后增量,一般 binlog 保存七天或两三天都可以。
Q7
2.2 版本 MySQL 没有主键,全量如何同步?
可以回退到不用增量快照框架;在增量快照框架上,社区已有组件的 issue,预计将在社区 2.3 版本提供支持。
▼ 关注「Apache Flink」,获取更多技术干货 ▼更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~