实时数仓在有赞的实践
点击关注“有赞coder”
获取更多技术干货哦~
部门:技术中台/数据中台
前言
随着实时技术的不断发展和商家实时应用场景的不断丰富,有赞在实时数仓建设方面做了大量的尝试和实践。本文主要分享有赞在建设实时数仓过程中所沉淀的经验,内容包括以下五个部分:
建设背景
应用场景
方案设计
项目应用
未来展望
一、建设背景
实时需求日趋迫切
实时技术日趋成熟
二、应用场景
实时BI看板
通过有赞BI工具基于实时数仓创建实时数据集,使用数据集配置柱状图、线图、饼图等图表来呈现实时汇总数据。目前BI工具所支持接入的实时数据源有Druid、MySQL。
实时OLAP
实时数据服务
通过有赞统一数据服务平台OneServices将实时数仓沉淀的实时指标以通用接口的方式提供给业务方使用。目前OneServices所支持接入的实时数据源有MySQL、Kylin、Druid、ClickHouse、Tidb。
实时监控告警
实时推荐
三、方案设计
该部分主要介绍实时数仓的方案设计,内容包括:分层设计、ETL设计、数据验证、数据恢复。
3.1 分层设计
实时数仓分层架构如下图所示 :
ODS(实时数据接入层)
ODS层表命名规范:部门名称.应用名称.数仓层级_主题域前缀_数据库名/消息名
例如:接入业务库的Binlog
实时数仓表命名:deptname.appname.ods_subjectname_tablename
例如:接入业务方的NSQ消息
实时数仓表命名:deptname.appname.ods_subjectname_msgname
DWS(实时明细中间层)
DWS层表命名规范:部门名称.应用名称.数仓层级_主题域前缀_数仓表命名
例如:实时事件A的中间层
实时数仓表命名:deptname.appname.dws_subjectname_tablename_eventnameA
例如:实时事件B的中间层
实时数仓表命名:deptname.appname.dws_subjectname_tablename_eventnameB
DIM(实时维表层)
DIM层表命名规范:应用名称_数仓层级_主题域前缀_数仓表命名
例如:HBase存储,实时维度表
实时数仓表命名:appname_dim_tablename
DWA(实时汇总层)
DWA层表命名规范:应用名称_数仓层级_主题域前缀_聚合粒度_数据范围
例如:HBase存储,某域当日某粒度实时汇总表
实时数仓表命名:appname_dwa_subjectname_aggname_daily
APP(实时应用层)
3.2 实时ETL
实时数仓ETL处理过程所涉及的组件比较多,接下来盘点构建实时数仓所需要的组件以及每个组件的应用场景。如下图所示:
具体实时ETL处理流程如下图所示:
3.2.1 维度补全
创建调用Duboo接口的UDF函数在实时流里补全维度是最便捷的使用方式,但如果请求量过大,对Duboo接口压力会过大。在实际应用场景补全维度首选还是关联维度表,但关联也存在一定概率的丢失问题,为了弥补这种丢失,可以采用Duboo接口调用兜底的方式来补全。伪代码如下:
create function call_dubbo as 'XXXXXXX';
create function get_json_object as 'XXXXXXX';
case
when cast( b.column as bigint) is not null
then cast( b.column as bigint)
else cast(coalesce(cast(get_json_object(call_dubbo('clusterUrl'
,'serviceName'
,'methodName'
,cast(concat('[',cast(a.column as varchar),']') as varchar)
,'key'
)
,'rootId')
as bigint)
,a.column)
as bigint) end
3.2.2 幂等处理
实时任务在运行过程中难免会遇到执行异常的情况,当任务异常重启的时候会导致部分消息重新发送和消费,从而引发下游实时统计数据不准确,为了有效避免这种情况,可以选择对实时消息流做幂等处理,当消费完一条消息,将这条消息的Key存入KV,如果任务异常重启导致消息重新发送的时候,先从KV判断该消息是否已被消费,如果已消费就不再往下发送。伪代码如下:
create function idempotenc as 'XXXXXXX';
insert into table
select
order_no
from
(
select
a.orderNo as order_no
, idempotenc('XXXXXXX', coalesce( order_no, '') ) as rid
from
table1
) t
where
t.rid = 0;
3.3 数据验证
由于实时数仓的数据是无边界的流,相比于离线数仓固定不变的数据更难验收。基于不同的场景,我们提供了2种验证方式,分别是:抽样验证与全量验证。如图3.3所示
抽样验证方案
全量验证方案
3.4 数据恢复
实时任务一旦上线就要求持续不断的提供准确、稳定的服务。区别于离线任务按天调度,如果离线任务出现Bug,会有充足的时间去修复。如果实时任务出现Bug,必须按照提前制定好的流程,严格按照步骤执行,否则极易出现问题。造成Bug的情况有非常多,比如代码Bug、异常数据Bug、实时集群Bug,如下图展示了修复实时任务Bug并恢复数据的流程。
四、项目应用
项目一:视频号直播间实时数据统计
微信视频号的用户数已超过2亿,是微信生态内较大的流量入口,视频号直播成为一个重要的线上消费场景。目前,有赞已接入了小程序交易组件,直播时借助小程序的商品交易能力挂载有赞商品进行直播卖货。商家在直播过程中以及直播结束后都需要实时的关注直播的交易数据,根据直播间的实时数据反馈来动态调整经营策略。
由于离线分区数据在凌晨跑批更新的时候存在执行和替换的时间,如何确保每时每刻服务都能查到最全的明细数据,这是实时数据+离线数据方案设计的关键。我们基于ClickHouse设计两种类型的分区:一种是实时分区,realtime_yyyymmdd;另一种是离线分区,offline_yyyymmdd
当现在是T日时,见Step1
最新全量数据 = T日实时数据 + (T-1)日离线数据
当现在是T+1日时 (T日离线数据正在执行中还未产出),见Step2
最新全量数据 = T日实时数据 + (T+1)日实时数据 + (T-1)日离线数据
当现在是T+1日时 (T日离线数据已产出),见Step3
最新全量数据 = (T+1)日实时数据 + T日离线数据
具体过程如下图,何时从Step2切换到Step3,我们通过服务层代码判断,使用此方案能确保应用端接口在每时每刻的查询都是最新的全量数据。对于实时分区和离线分区分别都配置4天有效期,过期失效的历史分区会自动清理。如图下图所示
关于新老客户数,这类指标在计算前涉及与历史数据进行比较,所以势必要维护一张动态更新的维度表,在计算新老客户数前,流入进来的消息先与动态维表进行关联,当消息的支付时间比维表的支付时间大则为老客数据;当消息无法关联上维表里的数据时则为新客数据,随后立即更新HBase维度表,当该用户再次支付所产生的消息则统计为老客数。伪代码如下:
insert into table
select
...
, case
when b.rowkey is not null
then b.paid_order_first_time
else a.paid_order_time
end
from
table1 a
left join
table2 for SYSTEM_TIME as of a.proctime as b
on
a.column1 = b.rowkey
具体过程如图下图所示:
以上是在项目开发过程中涉及的部分技术方案,项目上线后的效果如下图:
项目二:销售员实时分析
本次项目涉及实时指标主要有今日新增销售员数、今日新增下级销售员数、今日销售员成交金额、今日成交客户数、今日支出佣金等。以上指标都是当日实时指标,不用考虑离线场景,所以使用纯实时数据部分就能支撑。结合业务数据量级我们选型使用MySQL存储实时预聚合的数据,后续业务方通过OneServices提供的接口来获取数据进行实时展示。
部分代码如下图所示:
下面这张图有赞OneServices所提供的能力,通过配置的方式,将底层在线存储引擎映射成Dubbo接口,提供上层应用访问。
public class Request implements Serializable {
//请求唯一标识,填写uuid即可
String requestId;
//接入应用
String caller;
//秘钥
String secret;
//API名称
String apiName;
//请求参数
Map<String, Object> params;
//是否开启debug模式,如果开启,将返回更多的信息
Boolean debugMode = false;
//系统调用相关补充信息,可不填
Map<String, Object> additional;
}
以上是在项目开发过程中涉及的部分技术方案,项目上线后的效果如下图:
项目三:消费者机器人服务实时统计
有赞消费者机器人服务占整体服务量的比重已较高,但目前只能查看离线数据,较为滞后,无法针对随时爆发的线上问题及时发现并布防,因此需要开发实时BI看板,以便于实时监控各个来源的流入以及转人工情况,便于发现异常点,及时止损,降低转人工率。
本次项目涉及的实时指标主要有问题流入量、机器人会话量、机器人转人工会话量、直连人工会话量等。实时数据源涉及客户问题入口记录表和机器人会话表,通过监听Binlog的方式实时采集数据,在机器人会话中存储了问题编码,但通过问题编码无法确定问题来源类型,因此必须要将以上两个数据流通过双流JOIN的方式连接在一起,才能区分哪些是消费者机器人所服务的问题以及会话。
流A是客户问题流入的消息流,流B是基于客户问题机器人进行自动回复的会话消息流。客户问题产生会先于机器人服务会话,会话存在有效期,超过有效期问题未结束重算一次新的服务会话。问题会话量的统计周期是一天,超过一天即使问题未结束也不再计算会话量。以上需求是一个明显的双流JOIN场景,如图4.3.1所示,我们采用FlinkSQL提供的Interval JOIN功能。首先A流的服务ID必须等于B流的服务ID,其次A流的创建时间满足以下表达式 a.created_time between b.created_time and b.created_time + INTERVAL '1' DAY
,伪代码如下:
from
(
select
key
, column
, created_time
from
table1
) a
inner join
(
select
key
, created_time
from
table2
) b
on
a.column = b.column
where
a.created_time between b.created_time and b.created_time + INTERVAL '1' DAY
出于性能和存储的考虑,要将过期的数据清除,通过所配置的INTERVAL参数,将过期的STATE自动清除。
在机器人会话表中如果客户多次关闭会话接着再次打开会话(在会话有效期内),在数据库层面会生成多条记录,但是会话编码不变。如果不对数据去重会导致实时统计结果偏高,但是精确去重对性能消耗较大,会影响到数据产出的时效性,与需求方沟通后,决定采用非精确去重,在使用Druid接入Kafka后,对去重字段使用hyperUnique算法处理,损失一定精度的准确性,确保实时汇总数据产出的时效性。
如下图所示:
以上是在项目开发过程中涉及的部分技术方案,项目上线后的效果如下图所示:
五、未来展望
在实时数仓方面,我们未来有几个重点事情:
实时数仓主题域建设覆盖面更广,支撑更多的业务应用
建立实时数仓价值评估体系,量化投入与产出
推进在线平台能力的完善,优化实时任务血缘、简化参数配置、本地代码调试等提效的功能
最后,有赞数据中台,长期招聘 数据仓库、数据产品、数据算法、基础组件、平台研发等方面的人才。欢迎加入我们,一起enjoy~ 简历投递邮箱:liuyijun@youzan.com。
招募优秀的你加入👇