哔哩哔哩⼤数据建设之路—实时DQC篇
本期作者
韩志华
哔哩哔哩大数据平台工具负责人
冯益峰
哔哩哔哩资深开发工程师
王鼎
哔哩哔哩高级开发工程师
背景
数据质量是基于大数据衍生的应用有效与否的重要的前提和保障之一。B站现在高速发展的业务需求以及未来能够依靠大数据孵化出更有深度和竞争力应用的愿景,都要求我们数据平台能够提供实时的、准确的、可以被各个业务方所信赖的数据。可以说,可信赖的数据,是大数据平台核心竞争力的体现。因此,在B站的大数据平台的建设过程中,数据质量平台成为了不可或缺的一环,因为它的使命便是为大数据平台的数据质量保驾护航。
质量平台
平台组件
DQC简介
功能图
DQC主要的工作链路与普通的监控系统如prometheus基本相似,过程都包括数据采集、数据检查、告警通知,都需要尽量减少对监控对象正常工作的影响,同时对监控对象的异常做到及时告警。
功能上有离线和实时之分
离线DQC:主要用来保障由离线任务生产出来的数据,触发规则主要是由离线调度任务完成后来通知DQC做采集,然后再根据阈值进行检查及告警。通常用户配置检查规则,一般会在下一次任务完成后生效。 实时DQC:主要针对Kafka数据源。Kafka是大数据平台中一个非常重要的基础组件,特别是在实时数仓中,使用的则更加频繁。一般是根据规则在固定的检查周期对在一定窗口数据进行采集及检查。通常用户配置检查规则,一般会在下一个数据周期后生效。
实时DQC
第一版方案
公司内部流计算的主流框架是Flink,所有实时DQC也是采用Flink计算框架。
实时DQC 相对离线DQC复杂不少,因为它的数据采集任务是一直运行着的,向一个已经运行的任务中去更新规则,或者新增采集对象,有一定的难度。
因此在第一版方案中,用户每新增一个采集对象或一个规则,DQC服务会为用户重新生成一个新的Flink任务来完成采集,并将采集结果写入到MySQL中。同时定时触发质量规则检查,将检查结果写入到MySQL中,并根据检查结果判断是否做告警通知。
这个方案架构简单,但是仍存在以下弊端:
资源利用率低: 由于一个检查规则一个数据采集任务,导致Flink任务大多数时间都比较空闲,特别在一些流量小的Topic更加明显。
网络带宽消耗大: 对一个Topic不同的规则,需要多个任务去消费。如果Topic流量很大,对网络带宽会有很大冲击。
稳定性差: 规则的修改需要重启数据采集任务,集群资源紧张时可能因为申请不到YARN/K8s资源而导致启动失败。
监控程序本身占用资源大,在降本增效为主流的现在,是不合时宜的。因此,我们迫切需要对实时DQC的方案进行改造,在资源使用上有下面三个要求:
一次启动,不再重启。避免资源竞争,提高系统可用性。
一个任务,多Topic共用。实际应用中,一些Topic流量很小,导致Flink任务大多数时间都比较空闲。如果一个Flink可以多消费几个这样Topic,相同的资源做更多的事,资源利用率更高。
一次消费,多规则校验。避免多任务重复消费一个Topic,减少网络带宽消耗。
新的方案
设计目标:
实时DQC采集程序的准确性;
规则检查的及时性;
最大限度的减低实时DQC的资源消耗;
最大限度的减低Kafka正常作业的影响;
总体架构
目前线上Topic 有7000+,为了便于管理,主要根据Influxdb能接受的QPS、及Flink任务利用率,实时DQC把Topic划分为大Topic、中Topic、小Topic。小Topic: 消息量 <1000/s,中Topic: 消息量 (1000/s—10w/s),大Topic: 消息量 > 10w/s。
方案分析:
大中小Topic分开管理,走不同的路径来做数据采集,中小Topic中的数据全量导入到全量表中(字段会根据检查规则做过滤),然后利用Influxdb CQ功能做汇聚存入CQ表完成数据采集,而大Topic则在Flink任务中直接算出最终的汇聚数据存入CQ表;
Influxdb 全量表:中小Topic使用,保存Topic中的全量数据(字段会根据检查规则做过滤),一般有效时间为一个小时;
Influxdb CQ表:所有Topic都会使用, 保存了Topic的汇聚数据,用于规则检查,一般有效时间为二个星期;
Topic动态化:运行时可新增或删除监控的Topic,无需重启Fllink任务;
规则动态化:运行时可以新增、删除、修改Topic的监控规则,无需重启Fllink任务;
DQC资源管理:这里包括两个部分,一个Flink任务和Influxdb资源,合理利用这两个资源,对Topic或者Topic的规则进行动态分配管理;
中小Topic方案
数据全量导入到全量表中(字段会根据检查规则做过滤)
Topic、Mapper支持动态化:
Topic动态化,Topic列表维护在配置中心,KafkaConsumer可以感知配置的变化从而达到动态的增删Topic; Mapper动态化,根据消费Topic 列表、Topic内部的数据格式、及对应DQC规则,动态形成Mapper的处理逻辑,推到Mapper Warpper;
中小Topic方案—kafkaconsumer
解决方案:因为开发人力、时间的原因,没有完全开发一个新的支持动态Topic的Kafka Consumer,而是扩展了FlinkKafkaConsumer,KafkaFetcher && KafkaConsumerThread采用hack方式处理。
中小Topic方案—Mapper
解决方案:根据TopicList、Topic内容、DQC规则形成Mapper的bytecode,以Base64方式存入配置中心,Mapper Wrapper动态获取,形成新的Mapper,来替换老的Mapper。
大Topic方案
由于大型Topic流量很大,如果还和中小型Topic使用相同的方案,不管是对网络带宽,还是对底层存储,都会造成非常大的压力。因此我们选择了另一种方案------单Topic单任务 + 规则动态。
每一个Flink任务只消费一个Topic,在这个任务中,我们需要能够动态感知针对该Topic的检查规则的变化,并根据检查规则对消费到的实时数据流进行打标,对打标后的结果按窗口进行汇聚。
动态逻辑:规则动态化,规则元信息维护在配置中心,FlatMap可以感知规则变化,将规则命中的记录向后输出进行聚合
规则动态
为了降低用户的学习成本,在该方案中,我们与离线DQC配置保持一致,用户可以通过SQL自定义where子句对数据进行过滤,实现更加精细的质量检查。规则解析器会对SQL中的where子句部分进行词法解析,生成对应的过滤器,用于数据过滤,对符合规则的记录,打上规则ID后,根据规则ID向后输出后由聚合算子进行局部的聚合。
数据膨胀
在规则打标后,输出到窗口进行聚合的过程中,可能会产生数据膨胀。
如上图所示,当数据进入FlatMap后,FlatMap会根据规则过滤器对数据打上规则ID标签,再将数据向后发送给聚合算子,聚合算子根据规则进行聚合。如果一条记录没有命中任何规则,则不会向后输出。从上图可以看出,输入条数为1,输出条数为4,数据膨胀4倍。这是由于我们向后输出的数据格式为,使用规则ID作为分组条件。一条记录命中多个规则,则会输出多条记录。由于该方案本身是针对大Topic的,流量本身就很大,在经过规则过滤器放大,带宽压力很重,不符合设计初衷。
针对这种情况,我们做了两个优化:
FlatMap数据输出格式调整为 <分组Key,RuleIdList,Data>。将数据命中的所有规则ID合并在一起,向后输出。聚合算子收到数据后,根据RuleIdList进行业务计算。
使用map -> reduce -> reduce架构,对规则打标后输出的结果,会先进行一次局部聚合,最后再进行全局规约数据汇总。
但是,这个设计并不能彻底解决问题。
规则 | 处理逻辑 | 分组key |
表行数类规则 | 局部累加,汇总时将局部计算结果累加得到汇总值 | 数据hash |
汇总值类规则 | 局部累加,汇总时将局部计算结果累加得到汇总值 | 数据hash |
最大值/最小值类规则 | 局部取最大/小值,汇总时将局部计算结果取最大/小值得到全局的最大/小值 | 数据hash |
平均数类规则 | 局部计算<字段值累加值, 条数累加值>,汇总时将局部计算结果合并计算全局的<字段值累加值, 条数累加值>得到全局的<字段值累加值, 条数累加值> | 数据hash |
字段去重(Distinct)类规则 | 局部对字段值去重后向后输出一遍,汇总时累加字段数量 | 字段值 |
经过对实时DQC质量规则分析,除了字段去重(Distinct)类规则外,其他所有规则,对分组的key的具体取值是完全不依赖,因为局部聚合与最终汇总时的业务逻辑是一致的。然而字段去重(Distinct)类规则是在局部进行去重,最终汇总时则是将计算去重后的记录数量,因此在局部聚合时分组key必须使用字段值。对此,我们调整了第一步的优化方案:
当Topic没有字段去重(Distinct)类规则,将所有规则合并输出,输出格式为
当Topic有一个字段去重(Distinct)类规则,则将所有规则合并输出,输出格式为 <字段值,ruleIDList,Data>
当Topic有多个字段去重(Distinct)类规则,且使用不同的字段时。将其中一个与其他所有非去重类规则合并输出。剩余的数值字段去重(Distinct)类规则单独输出。
最终形态如下图所示:
在优化之前,由于FlatMap的数据膨胀率很高,聚合算子经常会出现背压情况,导致消费性能下降。
在优化之后,数据膨胀率大大降低,但是仍然存在。而膨胀率取决于字段去重(Distinct)类规则类规则的数量,膨胀率是可预知且可控的。目前在B站大数据平台中,除了特殊业务外,基本没用使用该类规则的情况,但仍需要通过监控该规则使用情况,适时增加资源或规则下线方式保障任务稳定性。
Influxdb proxy方案
为了更好的适配和处理读写请求和Influxdb之间的联系,我们引入了Influxdb proxy代理服务。
后端的Influxdb集群包含多组实例,每一组实例中的数据都不分片,只互为备份,最终一致性由proxy双写来保证。
双写过程中,如有写入失败,Influxdb proxy将失败请求内容,记录到本地文件中并进行重试,直至成功写入。
每一个Influxdb实例节点包含分配到该实例的完整的全量表和CQ表数据。
优化读请求
查询时,proxy会选择后端的最优Influxdb节点(数据完整性、节点性能)来查询。
如果后端的Influxdb节点都有问题情况时,那么这次查询会降级。
优化写请求
由于实时DQC写入数据量大且极为频繁,所以为了减少网络IO流量开销,Influxdb proxy使用gzip压缩、批次写入的方式提升写入性能。
在Influxdb proxy上线后,由于实际运行时,实时写入的数据量十分大,导致网络IO一直处于峰值状态。经过排查,发现原本数据输入都是一次请求包含5000+的完整Influxdb数据插入语句,且大多都是相同db和相同measurement,同时每条插入语句有太多的无效tag。随即我们优化的数据写入的接口协议,每次请求只能写入同一个db,且把相同的measurement抽出来,同时删除插入语句中无效tag。
在上述改造之后,网络IO流量开销有了明显的改善和下降。
运维保障
在Influxdb proxy中,引入了Prometheus监控,实时监控读写请求的qps、写入的db,measurement数量和分布情况等,更好的加强相关资源的利用。
支持对后端的Influxdb集群管理,包括节点数据同步,新节点加入、删除,节点数据恢复等。
Influxdb方案
全量表
在中小型Topic的处理上,我们选择落库全部数据,每个Topic都会单独存储为一张Influxdb的measurement。measurement的结构如下所示:
time:数据时间
subtask:业务默认字段,消费的flink子任务号
sinknum:业务默认字段,子任务消费顺序
扩展tag:topic字段,做索引用
record_num:业务默认字段,值为1,用于计算行数类规则
扩展field:topic字段
业务默认字段:在全量表的设计中,我们额外增加了两个tag字段:subtask和sinknum。由于Influxdb的特性,当两条记录的时间和tag完全相同时,此时后写入的记录会覆盖前一条。当计算表行数规则时,这种情况可能导致误告。因此,为了保障数据可以写入,我们增加了subtask、sinknum两个字段,subtask使用的flink上下文中的子任务号,sinknum我们使用环形数组实现,取值范围是1-200,用于确保每条记录都是独一无二的,不会发生覆盖的问题。
TTL时间:全量表的作用,主要是为CQ表做准备。Influxdb会定时读取全量表,根据质量规则进行数据聚合,并将聚合结果写入到CQ表。而实时DQC的特性决定了已经被计算进入CQ表的全量表数据便已经过期了。因此,为了避免数据条数、序列数等资源无限制增长,全量表的TTL时间我们设计为1小时,超过1小时的全量表记录会被Influxdb删除。
Tag和Field的关系:tag在Influxdb中是被作为索引使用的,通常用来作为查询条件使用,而field则是在某个时间点产生的事件的具体值。在Influxdb中,一次查询如果没有查询任何一个field字段,那么这次查询是没有意义的,不会返回任何结果。因为只有field才能反应那个时间点的信息。因此我们业务默认保留一个record_num字段作为field,表行数类规则会优先使用该字段。
什么字段会被存储到Influxdb?
一个Topic的字段数量无法确定,几个或上百个,都有可能。起初对这些字段,我们并没有筛选,选择全部落库,在运行过程中发现调用Influxdb-proxy服务屡屡发生异常。在问题定位中发现这样的异常现象,Influxdb-proxy机器网络带宽消耗大,CPU使用率高。最终结论是,Influxdb-proxy写入Influxdb时会将数据压缩,减少网络带宽消耗,提高写入性能,但由于写入数据的冗余信息过多,压缩过程导致CPU负载非常高。Influxdb-porxy机器由于负载过高,无法继续向外提供服务。
因此,去除冗余数据使我们需要优化的点。经过业务分析,我们任务一个Topic中大部分字段,都是不需要落库的。以直播业务某TopicA(40个字段)为例,计算某个时间窗口下的质量规则:
规则ID | 规则 | 业务SOL | 需要存储的字段 |
1 | 表行数 | select count(1) from TopicA | 无 |
2 | 来自IOS平台的记录数 | select count(1) from TopicA where platform = 'IOS' | platform |
3 | 来自安卓平台的不同mid数 | select count(distinct mid) from TopicA where platform = 'android' | platform,mid |
根据规则发现,我们实际使用的字段只有platform和mid字段,其余字段对当前业务而言,都是冗余的。针对目前实时质量规则使用的现状,去除掉冗余字段,只存储必须的信息,网络带宽消耗可以减少90%以上,Influxdb-proxy的使用率一直维持在一个稳定的变化范围。在后续的使用中,该类异常再也未曾复现。
什么字段会被应用为tag?
tag在Influxdb中是作为索引被使用的,一条记录所有的tag被称为序列Series,根据机器性能的不同,每台机器可以承载的序列数量是有限的,序列数膨胀会造成Influxdb的读写性能急剧下降。
刚上线初期,由于接入Topic时未进行严格的SOP流程,发生过这样一起事故。某Topic中包含XxxId字段,运维人员误认为该字段是某字典信息,取值范围是有限的。因此直接作为Tag字段进行接入,结果该字段是高基数的,短时间造成Influxdb序列数急速膨胀到300W,Influxdb几乎不可对外提供服务。在总结会议上,为避免此类情况再次发生,我们制定了Topic接入SOP,对Tag字段进行严格校验。
由于Tag字段的选取对Influxdb的稳定性十分重要。我们是如何判断什么字段会被应用为tag?
我们认为tag字段的使用需要满足以下两点:
where子句中被用作过滤条件,如 where platform = 'ios' 子句中,platform字段会被选中继续进行筛选
字段取值范围可枚举,不存在高基,如 platform 字段的取值范围在 ios、android、web中,符合tag字段的选取特征。
在上面的例子中,platform字段会被应用为tag,而mid字段由于存在高基,会被应用为field。实际应用中,不能作为tag又需要使用的字段,都会被存储为field。
新增规则,字段如何扩展?
类似mongodb这种非结构化数据库,Influxdb中的measurement也不需要预先定义schema,拥有良好的扩展性。当新增字段时,只需要在写入语句中指定新增字段与字段值即可完成字段新增。
CQ表
CQ表存储的是实时数据根据业务规则聚合后的结果,它的结构简单且固定,如下图所示:
time:计算窗口开始时间
rule_id:质量规则ID
value:时间窗口内质量规则的计算结果
CQ的表数据来源有两种:
全量表定时汇聚写入:依靠Influxdb自身提供的基础能力Continuous Query实现,依靠该能力,Influxdb能够对实时数据自动周期运行查询,并将查询结果写入指定的CQ表中,目前Continuous Query运行 99分位耗时<30ms.
Flink任务(大Topic)实时汇聚写入:大Topic消费任务会在时间窗口内进行质量规则计算,并将计算结果写入到CQ表中
CQ表的TTL时间:CQ表是按分钟聚合的结果,它的TTL时间我们是根据业务来设计的。由于存在日同比、周同比等波动率比值类型的规则,因此CQ表的TTL时间目前设置的是14天,避免由于数据过期删除导致误告警。
Influxdb水位
Influxdb是整个方案的核心,保障Influxdb的稳定性是非常重要的一环。因此,对Influxdb的监控便是重中之重。经过对我们实际使用机器的测试,目前单机写入瓶颈在150W/s,序列数量峰值在200W左右,超出数值可能会导致写入性能下降。
对此我们采取了以下措施:
Influxdb支持水平扩展,根据Topic元数据与Influxdb监控信息判断,如果某台Influxdb当前承载的Topic流量或Influxdb实时写入量已经达瓶颈的80%,新接入的Topic会往新的Influxdb写入。
序列数量增速监控,由于数据ttl时间的限制,序列的数量应该是呈周期性的变化,并且峰值稳定保持在一个数据范围内。但若是tag字段选取不当,选择了高基字段,就会造成序列数量急速增长。当监控到这种情况,需要及时处理相关Topic,保障服务稳定。
运维&&异常情况
为了保障实时DQC新架构的稳定性,我们主要考虑了以下异常情况。
流量突增,消息堆积
流量突增,消息堆积主要分两种情况:
业务方可预知的增长情况:这类情况会比较多,比如某些赛事直播活动,相关的Topic流量突增,类似于最近的英雄联盟S12总决赛直播
业务方不可预知的增长情况:这类情况会比较少,UP主上传某个视频成了爆款,这种现象是不可预知的。
经过评估,当出现消息堆积时,此时的告警都可能是不准确,因此我们需要极力避免出现消息堆积的情况。
针对可预知的增长情况,我们会提前增加资源,提高任务的消费能力。
针对不可预知的增长情况或即使增加资源仍旧导致消息堆积时,我们认为此时任务是以非正常状态运行的,对其进行降级处理,关闭任务触发的相关告警,P0等高优级Topic会发送通知给到用户,告知其影响面。
程序Crash
使用Flink的checkpoint机制,保障任务异常恢复。但是仍然存在以下问题。
中小Topic:
该方案中数据需要写入Influxdb,为提高吞吐量,在sink阶段接收的数据不会每次都写入存储,而是先放入缓冲区,当达到数据长度或等待时间达到阈值后,再写入Influxdb。因此,若程序Crash,可能导致最近一次缓冲区中的数据丢失,该情况可能导致漏告与误告。
目前我们设置的时间阈值是10秒,记录数量是1000条,考虑到把缓存中的数据在每次checkpoint时保存到文件系统所耗费的成本,我们认为这样的损失是可以接受的,在产品的宣发上也会与用户说明。
大Topic:
使用Flink的checkpoint机制,可以很好的保障任务从上次状态恢复。在写入Influxdb时,会根据时间和tag信息覆盖聚合结果,避免结果重复写入。
需要考虑的点是,这个方案针对的Topic都是大流量的,如果恢复时间过长,可能导致数据堆积。需要考虑数据堆积的处理方案。
重复消费
多个任务重复消费相同的Topic,导致写入Influxdb数据增多,可能导致误告。
需要注意:kafkaconsumer里面配置的group.id已经不具备消费组的特性,因为kafkaconsumer使用了底层API来分配TP。
解决方案:
任务与Topic强绑定,分配Topic给消费任务时,注册任务与Topic到数据库中,若已存在相同的Topic,则启动失败,同时发出告警。
结合以上异常情况,为了保障新架构的稳定性,在运维方面我们主要做了以下措施:
Flink断流与堆积监控
Influxdb集群状态监控
Influxdb序列数监控
借助这些措施,帮助我们及时发现问题并解决问题。
实时DQC后续工作
工程化
新的方案虽然目前已经上线实施,但是,由于新的架构较之前稍显复杂,在自动工程化方面还有所欠缺。存量的DQC规则都是依靠开发人员人工移植,后续的规则新增也需开发人员参与。
另一方面,分级保障也是我们工程化的一个发力点。P0级别的Topic是在任何情况下,都是需要我们优先保障的。P0与P1的Topic不会再一个任务中合并消费,当启动P0任务如果失败,根据失败异常信息,会选择停止一个正在运行中的P1级别任务,释放资源,优先用来启动P0任务。当P1任务停止时,会发出告警通知到用户与运维人员,进行人工判断是否需要增加队列资源。
下一阶段,我们会在自动工程化和分级保障方面持续发力,将更好的使用体验呈现给用户。
Flink任务管理
在降本增效主流的现在,依靠Topic动态与规则动态两种设计,节约大量集群资源消耗,但仍有优化的空间,那就是多Flink任务更加精细化的管理。比如,Topic动态方案中,一个Topic是否应该向已经存在的任务重动态加入,还是新启动一个任务进行消费?这需要结合任务当前负载进行判断。
还有Influxdb单机处理瓶颈问题,在我们的规划中,Influxdb可以水平扩展,Flink任务启动时,需要结合Influxdb集群负载,与当前任务信息,选择最优Influxdb节点写入数据。
对正常流的影响
对Topic的质量监控,不应该影响到线上正常作业的任务。目前质量任务所使用的队列仍然是和其他任务混合使用的,可能会造成资源竞争、占用机器资源影响到机器上其他任务等问题,后续会根据资源规划进行调整。
以上是今天的分享内容,如果你有什么想法或疑问,欢迎大家在留言区与我们互动,如果喜欢本期内容的话,欢迎点个“在看”吧!