查看原文
其他

基于Flink-CDC数据同步⽅案

徐奕 领创集团Advance Group
2024-09-11

前言

在业务数据处理过程中,我们时常会遇到不同业务模块 / 存储系统间实时数据同步需求。比如, 报表模块依赖订单模块数据进行增量更新,检索引擎依赖业务数据进行实时同步等。针对这类场景,我们目前采用了Flink-CDC的技术方案用于数据同步。

Flink-CDC(CDC,全称是 Change Data Capture),是基于Apache Flink® 生态的数据源连接器,该连接器集成了Debezium引擎。其目标是为了用于监控和捕获数据变更,以便下游服务针对变更进行相应处理。基于CDC场景,比较成熟的解决方案还包括 Maxwell、Canal等 。



方案对比

Flink - CDC(Debezium)/ Maxwell / Canal

以上是我们进行的解决方案对比,可以看到相较于Maxwell、Canal,Flink-CDC在数据源支持上更为丰富,同时基于Apache Flink®生态,在数据同步任务的处理过程中,还提供了诸如Map、Filter、Union的丰富算子支持。

Flink-CDC ⽀持数据源、数据下游⽀持:



原理

以Mysql为例,Flink-CDC的底层原理实际上是基于读取数据库的Binlog日志,同时内置集成的Debezium引擎,会将Binlog日志解析为行级变更的数据结构。目前Flink-CDC支持 DataStream-API / SQL-API 两种实现进行CDC监控,以下我们主要以DataStream-API实现举例。

3.1 配置代码
DataStream API实现:
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySqlBinlogTask {
public static void main(String[] args) throws Exception { # 定义数据源 SourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("host_name") .port("port") .databaseList("order_infos") .tableList(["order_infos.order_info_0", "order_infos.order_info_2"]) .username("user_name") .password("user_pass") .startupOptions(StartupOptions.initial()) .deserializer(new JsonDebeziumDeserializationSchema()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction, "data_source_01"); # 数据下游输出 dataStreamSource.print(); env.execute(); }}
到这⾥Flink-CDC的基础配置已经完成,启动后如果order_infos.order_info_0数据变更,那么程序就 可以监听到对应的变更信息了。⽰例代码的下游配置为标准输出流,实际在线上业务场景,可通过dataStreamSource.addSink()的⽅式对接下游服务,⽐如Kafka / RabbitMq / Elasticsearch等。
3.2 数据结构
基本操作 Debezium 格式化后的数据结构,包含 before、after、source、op、ts_ms 5个字段,含义 如下:

INSERT⽰例:
{ # New Row Data "after":{ "order_id":"mock_oid_01", "shop_id":"mock_sid_01", "order_status":"CANCELLED", "payment_method":"Wallet V2", "total_amount":"8881.000000", "create_datetime":"2020-09-03T03:03:36Z", "update_datetime":"2022-04-08T02:23:12Z", "transaction_id":"a154111f857514b0" }, # Metadata "source":{ "version":"1.4.1.Final", "connector":"mysql", "name":"mysql_binlog_source", "ts_ms":"1649384718000", "db":"order_infos", "table":"order_info_01", "server_id":"225", "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002", "file":"mysql-bin.000025", "pos":"45950793", "row":"0", "thread":"27273" }, "op":"c", "ts_ms":"1649384718067"}
UPDATE⽰例:
{ # Old Row Data "before":{ "order_id":"mock_oid_01", "shop_id":"mock_sid_01", "order_status":"SHIPPING", "payment_method":"Wallet V2", "total_amount":"8881.000000", "create_datetime":"2020-09-03T03:03:36Z", "update_datetime":"2022-04-08T02:23:12Z", "transaction_id":"a154111f857514b0" }, # New Row Data "after":{ "order_id":"mock_oid_01", "shop_id":"mock_sid_01", "order_status":"CANCELLED", "payment_method":"Wallet V2", "total_amount":"8881.000000", "create_datetime":"2020-09-03T03:03:36Z", "update_datetime":"2022-04-08T02:23:12Z", "transaction_id":"a154111f857514b0" }, # Metadata "source":{ "version":"1.4.1.Final", "connector":"mysql", "name":"mysql_binlog_source", "ts_ms":"1649384718000", "db":"order_infos", "table":"order_info_01", "server_id":"225", "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002", "file":"mysql-bin.000025", "pos":"45950793", "row":"0", "thread":"27273" }, "op":"u", "ts_ms":"1649384718067"}

DELETE⽰例:
{ # Old Row Data "before":{ "order_id":"mock_oid_01", "shop_id":"mock_sid_01", "order_status":"SHIPPING", "payment_method":"Wallet V2", "total_amount":"8881.000000", "create_datetime":"2020-09-03T03:03:36Z", "update_datetime":"2022-04-08T02:23:12Z", "transaction_id":"a154111f857514b0" }, # Metadata "source":{ "version":"1.4.1.Final", "connector":"mysql", "name":"mysql_binlog_source", "ts_ms":"1649384718000", "db":"order_infos", "table":"order_info_01", "server_id":"225", "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002", "file":"mysql-bin.000025", "pos":"45950793", "row":"0", "thread":"27273" }, "op":"d", "ts_ms":"1649384718067"}


实践

4.1多数据源
在多个数据源同步场景下,Flink提供了union算⼦⽅便进⾏多数据流的合并。
拓扑结构:

示例代码:
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySqlBinlogTask {
public static void main(String[] args) throws Exception { # 定义多个数据源 SourceFunction<String> sourceFunction01 = initMySQLSource(1) SourceFunction<String> sourceFunction02 = initMySQLSource(2) SourceFunction<String> sourceFunction03 = initMySQLSource(3) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource01 = env.addSource(sourceFunction01, "data_source_01"); DataStreamSource<String> dataStreamSource02 = env.addSource(sourceFunction02, "data_source_02"); DataStreamSource<String> dataStreamSource03 = env.addSource(sourceFunction03, "data_source_03"); # 多数据流合并 DataStream<String> dataStreams = dataStreamSource01.union(dataStreamSource02, dataStreamSource03); # 数据下游输出 dataStreams.print(); env.execute(); }}

4.2数据过滤&转换
增加Filter算子进行异常数据过滤、增加Map算子进行数据格式转换。
拓扑结构:
示例代码:
# 过滤异常数据dataStreamSource.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { if (value == null) { return false; } return true; }});# 数据转换dataStreamSource.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value.trim(); }});
4.3数据流写入Kafka

增加addSink调用,配置数据流写入Kafka服务。

拓扑结构:
示例代码:
# Kafka配置String sinkTopicName = "order_infos_topic";Properties sinkProperties = new Properties();sinkProperties.setProperty("bootstrap.servers", bsServersSB.toString());
# 数据流写入KafkadataStreamSource.addSink(new FlinkKafkaProducer<String>( sinkTopicName, new SimpleStringSchema(), sinkProperties)).name("write to kafka topic: " + sinkTopicName );


总结

到这里一个基本的Flink-CDC的数据同步逻辑就实现了。Flink-CDC方案,目前已落地生产环境并得到有效验证,日均千万级的数据同步,业务检索系统可达到秒级同步,报表数据可达到分钟级同步。
当然,这其中也包含了基于生产环境更多因素优化。比如Flink任务基于窗口的数据合并,任务并行度配置等。
后续,随着业务数据的增长,数据同步仍然会面临很多挑战,我们会持续优化并完善数据同步方案,也欢迎对数据同步 / ETL感兴趣的同学,可以提出您的建议共同学习交流。


参考资料

https://github.com/ververica/flink-cdc-connectors
https://github.com/zendesk/maxwell
https://github.com/alibaba/canal
https://debezium.io/

感谢阅读「技术创想」第42期文章
领创集团正在春季招聘中
期待你的加入
点击文末
阅读原文
获取更多
招聘信息

关于领创集团

(Advance Intelligence Group)
领创集团成立于 2016年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含 ADVANCE.AI 和 Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的先享后付平台 Atome 和数字金融服务。2021年 9月,领创集团宣布完成超4亿美元 D 轮融资,融资完成后领创集团估值已超 20亿美元,成为新加坡最大的独立科技创业公司之一。
继续滑动看下一个
领创集团Advance Group
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存