其他
基于Flink-CDC数据同步⽅案
前言
在业务数据处理过程中,我们时常会遇到不同业务模块 / 存储系统间实时数据同步需求。比如, 报表模块依赖订单模块数据进行增量更新,检索引擎依赖业务数据进行实时同步等。针对这类场景,我们目前采用了Flink-CDC的技术方案用于数据同步。
Flink-CDC(CDC,全称是 Change Data Capture),是基于Apache Flink® 生态的数据源连接器,该连接器集成了Debezium引擎。其目标是为了用于监控和捕获数据变更,以便下游服务针对变更进行相应处理。基于CDC场景,比较成熟的解决方案还包括 Maxwell、Canal等 。
方案对比
以上是我们进行的解决方案对比,可以看到相较于Maxwell、Canal,Flink-CDC在数据源支持上更为丰富,同时基于Apache Flink®生态,在数据同步任务的处理过程中,还提供了诸如Map、Filter、Union的丰富算子支持。
原理
以Mysql为例,Flink-CDC的底层原理实际上是基于读取数据库的Binlog日志,同时内置集成的Debezium引擎,会将Binlog日志解析为行级变更的数据结构。目前Flink-CDC支持 DataStream-API / SQL-API 两种实现进行CDC监控,以下我们主要以DataStream-API实现举例。
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySqlBinlogTask {
public static void main(String[] args) throws Exception {
# 定义数据源
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("host_name")
.port("port")
.databaseList("order_infos")
.tableList(["order_infos.order_info_0", "order_infos.order_info_2"])
.username("user_name")
.password("user_pass")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction, "data_source_01");
# 数据下游输出
dataStreamSource.print();
env.execute();
}
}
{
# New Row Data
"after":{
"order_id":"mock_oid_01",
"shop_id":"mock_sid_01",
"order_status":"CANCELLED",
"payment_method":"Wallet V2",
"total_amount":"8881.000000",
"create_datetime":"2020-09-03T03:03:36Z",
"update_datetime":"2022-04-08T02:23:12Z",
"transaction_id":"a154111f857514b0"
},
# Metadata
"source":{
"version":"1.4.1.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":"1649384718000",
"db":"order_infos",
"table":"order_info_01",
"server_id":"225",
"gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
"file":"mysql-bin.000025",
"pos":"45950793",
"row":"0",
"thread":"27273"
},
"op":"c",
"ts_ms":"1649384718067"
}
{
# Old Row Data
"before":{
"order_id":"mock_oid_01",
"shop_id":"mock_sid_01",
"order_status":"SHIPPING",
"payment_method":"Wallet V2",
"total_amount":"8881.000000",
"create_datetime":"2020-09-03T03:03:36Z",
"update_datetime":"2022-04-08T02:23:12Z",
"transaction_id":"a154111f857514b0"
},
# New Row Data
"after":{
"order_id":"mock_oid_01",
"shop_id":"mock_sid_01",
"order_status":"CANCELLED",
"payment_method":"Wallet V2",
"total_amount":"8881.000000",
"create_datetime":"2020-09-03T03:03:36Z",
"update_datetime":"2022-04-08T02:23:12Z",
"transaction_id":"a154111f857514b0"
},
# Metadata
"source":{
"version":"1.4.1.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":"1649384718000",
"db":"order_infos",
"table":"order_info_01",
"server_id":"225",
"gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
"file":"mysql-bin.000025",
"pos":"45950793",
"row":"0",
"thread":"27273"
},
"op":"u",
"ts_ms":"1649384718067"
}
{
# Old Row Data
"before":{
"order_id":"mock_oid_01",
"shop_id":"mock_sid_01",
"order_status":"SHIPPING",
"payment_method":"Wallet V2",
"total_amount":"8881.000000",
"create_datetime":"2020-09-03T03:03:36Z",
"update_datetime":"2022-04-08T02:23:12Z",
"transaction_id":"a154111f857514b0"
},
# Metadata
"source":{
"version":"1.4.1.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":"1649384718000",
"db":"order_infos",
"table":"order_info_01",
"server_id":"225",
"gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
"file":"mysql-bin.000025",
"pos":"45950793",
"row":"0",
"thread":"27273"
},
"op":"d",
"ts_ms":"1649384718067"
}
实践
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySqlBinlogTask {
public static void main(String[] args) throws Exception {
# 定义多个数据源
SourceFunction<String> sourceFunction01 = initMySQLSource(1)
SourceFunction<String> sourceFunction02 = initMySQLSource(2)
SourceFunction<String> sourceFunction03 = initMySQLSource(3)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource01 = env.addSource(sourceFunction01, "data_source_01");
DataStreamSource<String> dataStreamSource02 = env.addSource(sourceFunction02, "data_source_02");
DataStreamSource<String> dataStreamSource03 = env.addSource(sourceFunction03, "data_source_03");
# 多数据流合并
DataStream<String> dataStreams = dataStreamSource01.union(dataStreamSource02, dataStreamSource03);
# 数据下游输出
dataStreams.print();
env.execute();
}
}
# 过滤异常数据
dataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
if (value == null) {
return false;
}
return true;
}
});
# 数据转换
dataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.trim();
}
});
增加addSink调用,配置数据流写入Kafka服务。
# Kafka配置
String sinkTopicName = "order_infos_topic";
Properties sinkProperties = new Properties();
sinkProperties.setProperty("bootstrap.servers", bsServersSB.toString());
# 数据流写入Kafka
dataStreamSource.addSink(new FlinkKafkaProducer<String>(
sinkTopicName, new SimpleStringSchema(), sinkProperties))
.name("write to kafka topic: " + sinkTopicName );
总结
参考资料
关于领创集团