涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)
ODPS(Open Data Platform and Service)是阿里云自研的一体化大数据计算平台和数据仓库产品,在集团内部离线作为离线数据处理和存储的产品。离线计算任务节点叫做Odps节点,存储的离线表叫做Odps表;
Flink: 实时计算引擎,本文代码开发和测试均基于集团内部实时计算平台,代码细节可能会和Flink 官方社区文档有些许不同,假如用于生产环境测试,参考Apache Flink 官方文档为准,但是技术方案是通用的哈;
https://flink.apache.org/posts/
现有业务需求是 “根据用户注册以来的累计跑步里程,给用户发放勋章”,需要实时的计算出用户【历史~此时刻】的累计跑步数据。
比如说,某个用户20210101首次上传跑步记录,之后又多次上传跑步记录,我们需要实时的计算出,在20210101~当前时刻 期间,该用户累计跑了多少公里,累计跑了多少次等指标。上述指标的计算涉及用户历史至今的所有数据(2018~至今该用户所有数据),考虑使用批流结合的方式进行统计。参考批流结合的常用 lambda 方案:
我们将其拆分到“实时+离线”两条链路分别计算,离线链路计算用户历史至昨日的累计数据data1,实时链路计算当日实时累计数据data2。然后在对两条链路的数据进行汇总,data1+data2即为用户历史至今日此时刻的累计数据。
这里,离线链路使用odps来做,实时计算使用Flink来做,数据存储涉及 hbase、odps,所用消息中间件是MQ。
3.1 方案描述
离线链路计算目的:为了计算出全量用户【历史至昨日】的累计数据。
任务初始化时,先将历史的存量数据全量计算一次,得到存量累计值;以后每日计算用户昨日的新增数据,即新增累计值 ;两者相加即为用户历史至昨日的累计数据;循环往复,即可每日更新历史累计数据。
对应的数据链路应该长这样:
离线链路计算流程如下:
step1:用户历史数据初始化。假设该计算任务发布的时间为20231010,首先要对用户 历史~20231009 期间的历史数据进行汇总,得到一个 历史存量累计数据 history_data;
step2:从20231010起,对用户每日的增量跑步数据进行汇总,得到该日的增量累计数据 day_data;
step3:将每日的增量累计数据day_data 与 历史存量累计数据history_data 进行求和,作为新的历史存量累计数据 history_data(T-1) = day_data(T-1) + history_data(T-2) ;
step4:重复 step2 和step3 ,每日更新历史存量累计数据 history_data 。
该方案的优点是,历史全量数据只用计算一次,每日只需计算增量部分后再与存量合并即可,节省计算资源。
实时链路计算目的:实时计算出用户【当日零点至此刻】的累计数据
实时链路的计算逻辑比较简单,对应的计算链路示意图如下:
实时链路计算流程如下:
step1:用户新增的跑步记录通过MQ发送给Flink任务;
step2:Flink节点1对数据去重;
step3:Flink节点2对实时汇总统计 当日零点至此刻 用户的跑步累计数据;step4:将计算结果输出给下游。
实时离线链路融合目的:实时得到用户历史至此时刻的汇总数据
从上述的离线、实时链路中,我们分别得到了用户【历史~昨日】累计数据,和【当日凌晨~此刻】累计数据,只需将两者相加即可实时得到用户【历史~此刻】的累计数据:
ODPS 计算出用户 [非当日的历史累计数据],为使用方便,会每天更新全量用户历史累计数据;
使用Flink节点1 实时计算用户当日上传的跑步累计数据;
使用 Flink节点2 实时的将离线数据和实时数据汇总起来;
将汇总结果写入Hbase结果表,同时发送个MQ消息给下游业务方。
这里需要有两点需要注意:
1、根据业务特点,这里将离线计算结果作为维表使用:
Flink任务的下游业务方更关注当日上传过跑步记录的用户的数据更新情况,ODPS结果表作为维表用,Flink任务只对当日上传跑步记录的用户进行查询,得到“非当日历史统计数据”,在与“当日新增跑步数据”相加,即可得到该历史至今的最终的统计数据(更新hbase结果表),符合需求;
我们的跑步用户中大部分的用户不会每天都上传跑步记录,这些人的结果数据不会发生改变。若将ODPS表作为源表,则依旧会为这些用户更新数据,浪费计算资源。
【优化】odps表作为维表,不适合大数据量的情况,大数据量使用hbase表作为维表比较合适。这里将odps表数据同步到hbase表中,再拿该hbase表作为维表。
2、初始化下游结果表:在整个任务跑起来前,需要先使用ODPS表的bizdate分区数据初始化hbase结果表,然后再由实时任务对结果表进行更新;
最终的方案示意图如下:
3.2 存在的问题
上面的lambda方案有个问题,每日凌晨零点过后,实时任务已开始计算新的一天数据,而离线任务计算尚未结束,这时会出现一个离线数据缺失的窗口期。重点分析一下框图中“实时数据+离线数据”的部分:
当一个用户在T日实时上传了自己的跑步记录,Flink节点1会计算出其 [当日0点起至此刻] 的跑步累计数据data1,Flink节点2会根据该用户id取hbase维表里查询其 [历史~T-1日] 的累计数据 data2 (hbase表里数据由odps每日更新,即T-1日的存量累计汇总数据),将data1和data2二者汇总,就可得到 用户历史至此时刻的汇总数据;
在凌晨(比如说,在00:00~00:30),ODPS正在计算最新分区数据(T-1日的数据)的期间,新的分区还没生成完,或者ODPS计算已经完成,但odps表同步base表同步任务还未完成,此时若发生了查询,会发生什么?
会使用老分区的数据(T-2日的数据,而不是期望的T-1日数据),导致数据不准。
【问题描述】
在凌晨时分,ODPS计算T-1日数据期间,如果发生了对T-1日的数据查询,则无法获取到期望的T-1日数据,会继续使用T-2日的数据
这里“无法获取正确数据”的时间长度 = ODPS计算时间 + ODPS同步数据到Hbase的时间
【原因】
Flink查询维表时 使用维表当前的数据快照,本次查询完成后再发生的维表更新不会对已有查询造成影响。
【举例】
case1(ODPS计算未完成):
27号,Flink任务计算27号当天的用户累计数据,同时查询odps维表的 26号分区 中该用户的历史累计数据,两者相加,得到27号的实时累计结果;
28号凌晨,ODPS正在计算27号分区的数据,任务还未结束,27号分区数据尚不可用;而Flink任务已经开始计算28号当天的用户累计数据,此刻发生了一次维表查询,期望从维表中查到该用户27号统计的历史累计数据,然而由于27号数据未准备好,则维表会返回26号的历史累计数据,这会导致数据计算错误,相当于丢失了该用户27号的数据。
case2(ODPS计算完成,但odps表同步habse表任务未完成):
28号凌晨,ODPS的计算已完成,odps表正在同步数据到hbase表期间,如果Flink发生了查询,期望获取用户27号的最新数据,但由于还没有更新完成,还是会用26号的数据,会造成类似的错误结果。
上面所述问题是批流融合的 lambda 框架常会遇到的问题,因此必须思考优化方案来解决上述问题。优化方案将在下一篇文章展现,敬请期待!
涤生大数据往期精彩推荐
8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!