基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践
线上买券,线下核销是O2O的一个典型业务场景,在实际业务场景中,线上买券走营销链路,线下核销走当面付。
在针对营销效果数据分析的过程中存在需要将两个业务过程关联起来的情况,这两个业务过程的特点是买券和核销的时间可能相差很久,离线计算由于数据源是有边界的历史全量数据,可以很方便的通过离线任务直接算出,然而该特点导致在实时计算中关联这两部分数据成为了一个难点,主要原因如下:
1.双流join需要保存state,随着时间推移会导致state越来越大,性能下降;
2.平台的state保存都有TTL限制,过期后会清空导致数据关联不上;
3.任务修改逻辑可能导致state不可用,重置会导致state数据被清空,不可追溯,任务维护成本大;
针对上面的原因分析发现,核心是在于state的管理是实时平台自己做的,目前对用户不开放,而要做到两个业务时间间隔很长的数据关联需要永久保存state,这个需要我们自己利用外部存储实现。
1. Flink作业A,向Hbase表:tydt_test_ri 中插入发券数据;
create table tydt_test_ri(
rowkey varchar,
voucher_id varchar,
tb_userid varchar,
alipay_userid varchar,
gmt_send varchar,
PRIMARY KEY(rowkey)
) with (
type='alihbase',
diamondKey='hbase.diamond.xxx.xxx.xxx',
diamondGroup='hbase-diamond-xxx-xxx-xxx',
columnFamily='f',
tableName='tydt_test_ri'
);
insert into tydt_test_ri
select concat_ws('#',
substring(md5(voucher_id),1,4),
voucher_id
) as rowkey,
voucher_id,
tb_userid,
alipay_userid,
gmt_created as gmt_send
from dwd_alsc_mkt_vcr_base_ri
;
2.Flink作业B,向Hbase表:tydt_test_ri中插入核销数据;
create table tydt_test_ri(
rowkey varchar,
gmt_use varchar,
use_shop_id varchar,
PRIMARY KEY(rowkey)
) with (
type='alihbase',
diamondKey='hbase.diamond.xxx.xxx.xxx',
diamondGroup='hbase-diamond-xxx-xxx-xxx',
columnFamily='f',
tableName='tydt_test_ri'
);
insert into tydt_test_ri
select concat_ws('#',
substring(md5(voucher_id),1,4),
voucher_id
) as rowkey,
gmt_create as gmt_use,
shop_id as use_shop_id
from dwd_alsc_mkt_vcr_suborder_ri
;
3. DataHub平台上配置TT采集Hbase表:tydt_test_ri 的Hlog数据,如下配置:
创建完TT topic后,将TT topic,accesskey,hbase表名提供给hbase值班同学,人工接入一下才能生效。
4.Flink作业C,Hlog tt流反查hbase表补全字段。注意点是hlog的日志格式会把一条put按照key拆成多条日志,如果字段特别多的情况下会导致产生特别多相同rowkey的日志,因此可以根据rowkey做一下去重,减小关联hbase的次数。
-- hlog tt 流
CREATE TABLE tydt_test_ri_tt(
operation VARCHAR,
rowkey VARCHAR,
ts timestamp,
ts_date VARCHAR,
hbase_key VARCHAR,
hbase_value VARCHAR,
WATERMARK wm FOR ts as withOffset(ts, 2000) -- 允许2秒延迟
) WITH(
accessId='xxx',
accessKey='xxx',
type='tt',
lineDelimiter='\n',
topic='tydt_test_ri',
fieldDelimiter='\u0001',
encoding='utf-8'
);
-- hbase 表
CREATE TABLE dim_tydt_test_ri(
rowkey VARCHAR,
voucher_id varchar,
tb_userid varchar,
alipay_userid varchar,
gmt_send varchar,
gmt_use varchar,
use_shop_id varchar,
PRIMARY KEY(rowkey)
) WITH(
zkQuorum='hbase-nn-xxxx.xxx.xxx.net,hbase-nn-xxxx.xxx.xxx.net,hbase-nn-xxxx.xxx.xxx.net',
zkNodeParent='/group-xxxx-cell01-ssd',
type='alihbase',
cache='None',
columnFamily='f',
tableName='tydt_test_ri'
--async='true'
);
-- sink
create table sls_test(
rowkey VARCHAR,
voucher_id varchar,
tb_userid varchar,
alipay_userid varchar,
gmt_send varchar,
gmt_use varchar,
use_shop_id varchar
) WITH(
type='sls',
logStore='test',
accessId='xxx',
accessKey='xxx',
endPoint='http://cn-shanghai-xxxx-xxx.log.aliyuncs.com',
project='xxx-xxxx-test'
);
--去重,每分钟输出
create view v_distinct (
rowkey
) as
select rowkey
from tydt_test_ri_tt
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), rowkey;
--反查获取全字段
insert into sls_test
SELECt a.rowkey,
b.voucher_id,
b.tb_userid,
b.alipay_userid,
b.gmt_send,
b.gmt_use,
b.use_shop_id
from v_distinct a
join dim_tydt_test_ri FOR SYSTEM_TIME AS OF PROCTIME() AS b
on a.rowkey=b.rowkey
;
1.使用Hbase作为外部存储保存全量数据,解决了使用内部state无法长时间保存,以及join性能差的问题。
2.Hbase在线沉淀了明细数据,可支持在线排查问题,也可以通过数据服务对外提供查询分析。
3.维护成本大大降低,不用担心任务报错,重置等导致内部state丢失的问题。
涤生大数据往期精彩推荐