查看原文
其他

基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践

涤生-莫哥 涤生大数据
2024-12-05
1.业务场景


线上买券,线下核销是O2O的一个典型业务场景,在实际业务场景中,线上买券走营销链路,线下核销走当面付。

在针对营销效果数据分析的过程中存在需要将两个业务过程关联起来的情况,这两个业务过程的特点是买券和核销的时间可能相差很久,离线计算由于数据源是有边界的历史全量数据,可以很方便的通过离线任务直接算出,然而该特点导致在实时计算中关联这两部分数据成为了一个难点,主要原因如下:

1.双流join需要保存state,随着时间推移会导致state越来越大,性能下降;

2.平台的state保存都有TTL限制,过期后会清空导致数据关联不上;

3.任务修改逻辑可能导致state不可用,重置会导致state数据被清空,不可追溯,任务维护成本大;

针对上面的原因分析发现,核心是在于state的管理是实时平台自己做的,目前对用户不开放,而要做到两个业务时间间隔很长的数据关联需要永久保存state,这个需要我们自己利用外部存储实现。

2.技术方案


1. Flink作业A,向Hbase表:tydt_test_ri 中插入发券数据;

create table tydt_test_ri( rowkey varchar, voucher_id varchar, tb_userid varchar, alipay_userid varchar, gmt_send varchar, PRIMARY KEY(rowkey)) with ( type='alihbase', diamondKey='hbase.diamond.xxx.xxx.xxx', diamondGroup='hbase-diamond-xxx-xxx-xxx', columnFamily='f', tableName='tydt_test_ri');

insert into tydt_test_riselect concat_ws('#', substring(md5(voucher_id),1,4), voucher_id ) as rowkey, voucher_id, tb_userid, alipay_userid, gmt_created as gmt_sendfrom dwd_alsc_mkt_vcr_base_ri;

2.Flink作业B,向Hbase表:tydt_test_ri中插入核销数据;

create table tydt_test_ri( rowkey varchar, gmt_use varchar, use_shop_id varchar, PRIMARY KEY(rowkey)) with ( type='alihbase', diamondKey='hbase.diamond.xxx.xxx.xxx', diamondGroup='hbase-diamond-xxx-xxx-xxx', columnFamily='f', tableName='tydt_test_ri');

insert into tydt_test_riselect concat_ws('#', substring(md5(voucher_id),1,4), voucher_id ) as rowkey, gmt_create as gmt_use, shop_id as use_shop_idfrom dwd_alsc_mkt_vcr_suborder_ri;

3. DataHub平台上配置TT采集Hbase表:tydt_test_ri 的Hlog数据,如下配置:

创建完TT topic后,将TT topic,accesskey,hbase表名提供给hbase值班同学,人工接入一下才能生效。

4.Flink作业C,Hlog tt流反查hbase表补全字段。注意点是hlog的日志格式会把一条put按照key拆成多条日志,如果字段特别多的情况下会导致产生特别多相同rowkey的日志,因此可以根据rowkey做一下去重,减小关联hbase的次数。

-- hlog tt 流CREATE TABLE tydt_test_ri_tt( operation VARCHAR, rowkey VARCHAR, ts timestamp, ts_date VARCHAR, hbase_key VARCHAR, hbase_value VARCHAR, WATERMARK wm FOR ts as withOffset(ts, 2000) -- 允许2秒延迟) WITH( accessId='xxx', accessKey='xxx', type='tt', lineDelimiter='\n', topic='tydt_test_ri', fieldDelimiter='\u0001', encoding='utf-8');-- hbase 表CREATE TABLE dim_tydt_test_ri( rowkey VARCHAR, voucher_id varchar, tb_userid varchar, alipay_userid varchar, gmt_send varchar, gmt_use varchar, use_shop_id varchar, PRIMARY KEY(rowkey)) WITH( zkQuorum='hbase-nn-xxxx.xxx.xxx.net,hbase-nn-xxxx.xxx.xxx.net,hbase-nn-xxxx.xxx.xxx.net', zkNodeParent='/group-xxxx-cell01-ssd', type='alihbase', cache='None', columnFamily='f', tableName='tydt_test_ri' --async='true');
-- sink create table sls_test( rowkey VARCHAR, voucher_id varchar, tb_userid varchar, alipay_userid varchar, gmt_send varchar, gmt_use varchar, use_shop_id varchar) WITH( type='sls', logStore='test', accessId='xxx', accessKey='xxx', endPoint='http://cn-shanghai-xxxx-xxx.log.aliyuncs.com', project='xxx-xxxx-test');
--去重,每分钟输出create view v_distinct (rowkey) as select rowkeyfrom tydt_test_ri_ttGROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), rowkey;
--反查获取全字段insert into sls_testSELECt a.rowkey, b.voucher_id, b.tb_userid, b.alipay_userid, b.gmt_send, b.gmt_use, b.use_shop_idfrom v_distinct a join dim_tydt_test_ri FOR SYSTEM_TIME AS OF PROCTIME() AS bon a.rowkey=b.rowkey;


3.方案优势


1.使用Hbase作为外部存储保存全量数据,解决了使用内部state无法长时间保存,以及join性能差的问题。

2.Hbase在线沉淀了明细数据,可支持在线排查问题,也可以通过数据服务对外提供查询分析。

3.维护成本大大降低,不用担心任务报错,重置等导致内部state丢失的问题。

涤生大数据往期精彩推荐

1.企业数仓DQC数据质量管理实践篇

2.企业数据治理实战总结--数仓面试必备

3.OneData理论案例实战—企业级数仓业务过程

4.中大厂数仓模型规范与度量指标有哪些?

5.手把手教你搭建用户画像系统(入门篇上)

6.手把手教你搭建用户画像系统(入门篇下)

7.SQL优化之诊断篇:快速定位生产性能问题实践

8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!

9.新能源趋势下一个简单的数仓项目,助力理解数仓模型

继续滑动看下一个
涤生大数据
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存