其他
StarRocks跨集群迁移最佳实践|得物技术
目录
一、背景
二、方案流程
1. 方案可行性评估口径
2. 方案设计
3. 方案规划
4. 方案实施
5. 方案验证&验收
三、方案成果
四、方案展望
一
引言
二
方案流程
方案可行性评估口径
资源成本
稳定性成本
方案设计
方案一:StarRocks外表
集群间的数据同步; 读写分离。向源集群中写入数据,并且源集群的数据变更同步至目标集群,目标集群提供查询服务。
数据量较小(200G以内); 无三方平台可用; 数据迁移无需考虑稳定性成本; 测试场景快速验证; 存在hll、bitmap类型字段,但是又没有底表数据进行数据重建(hll/bitmap类型字段借助三方组件进行迁移的方案可参考官方文档flink导入至-bitmap-列、flink导入导入至-hll-列等); Array/Map/Row等复杂类型的迁移。
方案二:Flink Connector
数据量较大; 有三方平台可用; 稳定性要求高,期望控制稳定性成本; 有24h持续同步需求。
方案规划
方式一
方式二
注意
准确的待迁移数据量评估,依赖数据时间范围的确认。对于新旧集群双写场景,同步的最晚时间是完全双写介入的那一天(包含)。 预期同步最大速率(MB/s),需要兼顾集群当前流量和预估可承受的最大流量,避免因数据同步给集群造成预期外的压力,影响线上服务稳定性。
方案实施
方案一:外表
CREATE EXTERNAL TABLE external_db.external_t
(
k1 DATE,
k2 INT,
k3 SMALLINT,
k4 VARCHAR(2048),
k5 DATETIME
)
ENGINE=olap
DUPLICATE KEY(`timestamp`)
PARTITION BY RANGE(`timestamp`)
(PARTITION p20231016 VALUES [("2023-10-16 00:00:00"), ("2023-10-17 00:00:00")),
PARTITION p20231017 VALUES [("2023-10-17 00:00:00"), ("2023-10-18 00:00:00")))
DISTRIBUTED BY HASH(k1) BUCKETS 10
PROPERTIES
(
"host" = "127.0.0.x",
"port" = "9020",
"user" = "${user}",
"password" = "${passwd}",
"database" = "test_db",
"table" = "t"
);
insert into external_db.external_t select * from db.other_table;
方案二 Flink SQL
定义数据来源表
StarRocks与Flink SQL的数据类型映射; Flink scan参数设置,尤其是超时(time-out)类字段的设置,建议往大了设置; 考虑到数据迁移的源端和目标端的库、表均同名,在定义时需要对源表和输出表的表名做区分,以免混淆错乱。比如源表命名为{table名}_source,输出表命名为{table名}_sink 。
CREATE TABLE rule_script_etl_source (
`timestamp` TIMESTAMP,
`identity_id` STRING,
`app` STRING,
`cost` BIGINT,
`name` STRING,
`error` STRING,
`script` STRING,
`rule_id` STRING
) WITH (
'connector'='du-starrocks-1.27', --具体值以官方组件或自研组件定义为准
'jdbc-url'='jdbc:mysql://1.1.1.1:9030?useSSL=false&rewriteBatchedStatements=true',
'scan-url'='1.1.1.1:8030',
"user" = "${user}",
"password" = "${passwd}",
'database-name'='test_db',
'table-name'='rule_script_etl',
'scan.max-retries'='3',
'scan.connect.timeout-ms'='600000',
'scan.params.keep-alive-min'='1440',
'scan.params.query-timeout-s'='86400',
'scan.params.mem-limit-byte'='1073741824'
);
定义数据输出表
StarRocks与Flink SQL的数据类型映射; Flink sink参数设置,尤其是超时(time-out)类字段的设置,建议往大了设置; 尽量进行攒批,减小对StarRocks的导入压力; 考虑到数据迁移的源端和目标端的库、表均同名,在定义时需要对源表和输出表的表名做区分,以免混淆错乱。比如源表命名为{table名}_source,输出表命名为{table名}_sink ; 如果输出表是主键模型,表定义中字段列表后需要加上PRIMARY KEY ({primary_key}) NOT ENFORCED。
CREATE TABLE rule_script_etl_sink (
`timestamp` TIMESTAMP,
`identity_id` STRING,
`app` STRING,
`rule_id` STRING,
`uid` BIGINT,
`cost` BIGINT,
`name` STRING,
`error` BIGINT,
`script` STRING,
`sink_time` TIMESTAMP,
PRIMARY KEY (`identity_id`) NOT ENFORCED # 仅适用主键模型
) WITH (
'connector'='du-starrocks-1.27',
'jdbc-url'='jdbc:mysql://1.1.1.2:9030?useSSL=false&rewriteBatchedStatements=true',
'load-url'='1.1.1.2:8030',
"user" = "${user}",
"password" = "${passwd}",
'database-name'='test_db',
'table-name'='rule_script_etl',
'sink.buffer-flush.max-rows'='400000',
'sink.buffer-flush.max-bytes'='94371840',
'sink.buffer-flush.interval-ms'='30000',
'sink.connect.timeout-ms'='60000',
'sink.wait-for-continue.timeout-ms'='60000'
);
定义同步ETL
有映射关系的非同名字段,添加as,提升可阅读性; 前后字段类型不一样的,需要使用case as进行显式类型转换; 如果是仅输出表包含的字段,也需要在select子句中显式指出,并使用case null as {dataType}的形式进行类型转换; 部分String/VARCHAR(n)类型字段中,可能存在StarRocks Flink Connector使用的默认列分隔符(参数sink.properties.column_separator,默认\t)、行分隔符(参数sink.properties.row_delimiter,默认\n),导致导入是报“errorLog:Error:Value count does not match column count. Expect xx, but got xx. Row:xxx”错误,需要替换为自定义的分隔符; select子句尽量添加filter信息,一般是分区字段,以便Flink根据同步任务设置的并行度,拆分任务,生成合适的执行计划。
insert into rule_script_etl_sink
select
`timestamp`,
`identity_id`,
`app`,
`rule_id`,
cast(null as BIGINT) `uid`,
`cost`,
`name`,
cast(`error` as BIGINT) `error`,
`script`,
`timestamp` as `sink_time`
from rule_script_etl_source
where `timestamp` >='2023-08-20 00:00:00' and `timestamp` < '2023-09-20 00:00:00';
CREATE TABLE rule_script_etl_source (
`timestamp` TIMESTAMP,
`identity_id` STRING,
`app` STRING,
`cost` BIGINT,
`name` STRING,
`error` STRING,
`script` STRING,
`rule_id` STRING
) WITH (
'connector'='du-starrocks-1.27',
'jdbc-url'='jdbc:mysql://1.1.1.1:9030?useSSL=false&rewriteBatchedStatements=true',
'scan-url'='1.1.1.1:8030',
"user" = "${user}",
"password" = "${passwd}",
'database-name'='test_db',
'table-name'='rule_script_etl',
'scan.max-retries'='3',
'scan.connect.timeout-ms'='600000',
'scan.params.keep-alive-min'='1440',
'scan.params.query-timeout-s'='86400',
'scan.params.mem-limit-byte'='1073741824'
);
CREATE TABLE rule_script_etl_sink (
`timestamp` TIMESTAMP,
`identity_id` STRING,
`app` STRING,
`rule_id` STRING,
`uid` BIGINT,
`cost` BIGINT,
`name` STRING,
`error` BIGINT,
`script` STRING,
`sink_time` TIMESTAMP,
PRIMARY KEY (`identity_id`) NOT ENFORCED # 仅适用主键模型
) WITH (
'connector'='du-starrocks-1.27',
'jdbc-url'='jdbc:mysql://1.1.1.2:9030?useSSL=false&rewriteBatchedStatements=true',
'load-url'='1.1.1.2:8030',
"user" = "${user}",
"password" = "${passwd}",
'database-name'='test_db',
'table-name'='rule_script_etl',
'sink.buffer-flush.max-rows'='400000',
'sink.buffer-flush.max-bytes'='94371840',
'sink.buffer-flush.interval-ms'='30000',
'sink.connect.timeout-ms'='60000',
'sink.wait-for-continue.timeout-ms'='60000',
'sink.properties.column_separator'='#=#', -- 自定义列分隔符
'sink.properties.row_delimiter'='@=@' -- 自定义行分隔符
);
insert into rule_script_etl_sink
select
`timestamp`,
`identity_id`,
`app`,
`rule_id`,
cast(null as BIGINT) `uid`, -- sinl表才有的字段
`cost`,
`name`,
cast(`error` as BIGINT) `error`,
`script`,
`timestamp` as `sink_time`
from rule_script_etl_source
where `timestamp` >='2023-08-20 00:00:00' and `timestamp` < '2023-09-20 00:00:00';
方案验证&验收
验证
验收
数据行数校验
数据质量校验 针对维度表,可参考分区及或表级行数校验结果; 针对事实表,可以在分区级别做指标列的SUM/MAX/MIN/AVG值校验; 研发也可以结合业务自定义更多的校验方式。
三
方案成果
四
方案展望
方案的不足
未来规划
往期回顾
文 / 管虎
关注得物技术,每周一、三、五更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。
“
扫码添加小助手微信
如有任何疑问,或想要了解更多技术资讯,请添加小助手微信:
线下活动推荐