干货!一文看Doris在作业帮实时数仓中的应用&实践
数据驱动未来。在大数据生态中,数据分析系统在数据创造价值过程中起着非常关键的作用,直接影响业务决策效率以及决策质量。Apache Doris作为一款支持对海量大数据进行快速分析的MPP数据库,在数据分析领域有着简单易用、高性能等优点。
9月20日,Apache Doris组织了一场线上Meetup,作业帮受邀参加,并带来了一场《Doris在作业帮实时数仓中的应用&实践》的主题分享。
现场分享精华
大家下午好。下面我来介绍下Doris在作业帮实时数仓中的应用与实践。
这次的分享主要分三个主题
1、首先是所在团队的业务与背景介绍
2、其次会介绍下基于Doris,作业帮的查询系统是如何构建的,以及主要解决的问题
3、未来的规划
我所在团队是作业帮大数据团队,主要负责建设公司级数仓,向各个产品线提供面向业务的数据信息,如到课时长、答题情况等业务数据以及如pv、uv、活跃等流量类数据,服务于拉新、教学、BI等多个重要业务线。
在数仓体系中,大数据团队主要负责到ODS-DWS的建设,从DWS到ADS一般是数仓系统和业务线系统的边界。
在过去,由于缺失有效、统一的查询系统,我们探索了很多模式来支持各个业务线发展。
有些业务线对大数据相关技术比较了解,熟悉spark等计算系统,可以自己处理计算。因此会选用kafka 接收数据后使用spark计算的模式来对接大数据团队;但是其他业务线不一定熟悉这套技术栈,因此这种方案的主要问题无法复制到其他业务线。且Spark集群跨越多个业务线使用,本身就给业务线带来了额外的维护成本。
既然Kafka+Spark的模式无法大范围推广,我们又探索了基于ES的方案,即大数据将数据写入ES中,然后业务先直接访问ES来获取数据,但是发现一方面高性能的使用ES,本身就具有很高的成本,对ES得非常熟悉,这对于业务线来说很难有精力去做,其次,由于使用ES的系统质量参差不齐,偶会还发生将ES集群打垮的问题,稳定性也不可控,最后ES-Sql语法完备性不足,如不支持join、多列group by(6.3版本)等。
因此我们又探索开发API接口,希望在稳定性上可以有更好的解决方案。虽然API可以可控,但是由于API不提供Sql功能,基于需求场景不断case by case的API开发反而成了影响交付效率的主要瓶颈点。
上述多是支持查询明细数据,一旦涉及到大规模的流量类查询,如pv、uv,只好引入druid类系统,但是duird的接口和其他系统的接口不一致,用户往往又得学习,且Druid不支持明细,一旦需要明细,就需要到ES去查询,由于涉及两套系统,有时候还得处理明细数据和聚合数据不一致的问题。
随着需求越来越多,系统也越来越难以维护,交付效率也特别低,需求排队非常严重。
因此,提供有效而统一的查询系统,对于实时数仓建设在提高业务支持效率、降低维护成本上都具有非常重大的意义。
经过过去数月的探索与实践,我们确立了以Doris为基础的实时查询系统。同时也对整个实时数仓的数据计算系统做了一次大的重构,最终整体的架构图如下:
如图所示(从下到上),原始业务层日志经数据摄入系统进入数仓,在数据清洗计算层,我们将原来基Spark系统升级到了Flink,并且基于Flink-Sql提供了统一的数据开发框架,从原有的代码开发升级到Sql开发来提升数据的研发效率。
其后查询系统将Kafka的数据实时同步到查询引擎内,并通过OpenAPI的统一接口对外提供查询服务。
基于Doris的查询系统上线后,我们面对一个需求,不用像过去一样做方案调研、开发接口、联调测试,现在只要把数据写入,业务层就可以基于sql自己完成数据查询、业务开发,交付效率(数据计算好到提供可读服务)从过去的数人周加快到小时级。
在性能方面,过去基于ES或者mysql来做,当查询的数据量较大时,我们只能忍受数十个小时到数分钟的延迟,基于Doris的方案,加快到分钟级甚至秒级。
Doris的整体架构非常简单,不依赖任何第三方组件,社区支持度也非常好,从上线到今,我们只需做一些轻量级的运维规范,即可保证高稳定性。
所以说,通过引入Doris,解决了作业帮内实时数仓查询交付慢、查询慢的痛点问题,对于后续数仓的系统发展起到了非常关键的作用。
接下来,重点讲下查询系统的工作。
分两部分:查询系统的架构选型以及原理,以及应用&实践
在讲查询引擎之前,先讲下业务场景。
作业帮内,业务场景主要分两种
一种是 传统的流量类,比如算pv、uv、活跃……,作业帮内很多时候还需要看进一步的明细。
比如 作业帮主App 在每天各个小时的活跃用户数,还要看 作业帮主App每个小时内各个版本的活跃用户数。
第二种是 面向我们业务线的工作台,比如教学的老师。
比如我们的老师上完课后,会看下自己班内的同学们的出勤数据、课堂测验数据等。
这两种场景下,这块考虑到调研成本、团队技术生态、维护成本等多种因素,我们最后选择了Doris 作为我们的查询引擎。主要是Doris可在上述两种场景下都可以统一的满足业务的需求。
首先介绍下Doris。
Doris是 mpp架构的查询引擎。
整体架构非常简单,只有FE、BE两个服务,FE负责Sql解析、规划以及元数据存储,BE负责Sql-Plan的执行以及数据的存储,整体运行不依赖任何第三方系统,功能也非常丰富如支持丰富的数据更新模型、Mysql协议、智能路由等。对于业务线部署运维到使用都非常友好。
接下来讲下用Doris如何解决我们前面提到的业务场景下的问题。
Doris有多种数据模型,流量类场景常用的是聚合模型。比如对于前面提到的场景,我们会吧作业帮主App各个版本的明细数据存到base表中,如果直接从base表中读取跨天级的聚合数据,由于数据行比较多,可能会出现查询延迟的问题,因此我们会对常用的天级数据做一次rollup,这样通过预聚合,来减少查询的数据量,可以加快查询的延迟。
要高效的使用Doris的聚合模型,前提都是基于key列做数据行筛选,如果使用value列,Doris需要把相关的行全部聚合计算后方可决策是否属于结果集,因此效率比较低。
而对于教研工作台,前面提到的都是基于value的筛选,因此使用了Doris on ES的模型。主要是考虑到 可以发挥ES的任意列检索的能力,来加快查询速度。
在我们的实践中,发现Doris on ES相比直接裸用ES或社区的其他方案如Presto on ES在性能上有很大的提升,接下来介绍下Doris on ES高性能的设计原理。
Doris on ES整体的架构如图,FE负责查询ES的元数据信息如location、shard等,BE负责从ES数据节点扫描数据。
Doris on ES高性能,相比裸用ES,有几个优化点:
裸用ES时,ES采用的是Query then Fetch的模式,比如请求1000条文档,ES有10个分片,这时候每个分片都会给协调返回1000个doc id,然后 协调节点其实拿到了10 * 1000个doc id,然后选择1000个。这样其实每个分片多返回了900个.
Doris on ES则绕过了协调节点直接去操作datanode。它会在每个datanode上查询符合预期的docid,这样不会有过多的docid返回。
其次,Doris从ES扫描数据时,也做了很多优化。比如在扫描速度上,采用了顺序扫描、列存优化、谓词下推等,在数据从ES传输到Doris时,采用就近原则如BE会优先访问本机的datanode、source filter来过滤不用的字段等来加速传输速度。
在我们的调研中,Doris on ES的性能,比Presto on ES快了有数十倍。
在作业帮内,除了上面介绍的基于Doris的数据模型做的基础应用,要完整的支持业务、保证稳定性、提高效率,还需要其他周边的系统建设。
接下来介绍下基于Doris,作业帮查询系统架构的整体设计以及工作模式。
这是作业帮查询系统的总体架构。
从上往下,首先是我们平台,包括各个报表平台、元数据管理平台等,主要来提高各个场景的人效。
其下红色部分为我们统一的api接口层,这里我们主要是制定了api的规范比如请求响应方式、返回码等,来减少系统之间对接的成本。
基于api除了提供了主要的读写接口外,也包含了周边的服务建设,比如元数据管理、调度系统等。
接下来就基于一个完整的流程来介绍下各部分系统。
首先是元数据。Doris基于mysql语法建表,已经有元数据,我们这里做元数据,有几个额外的考虑:
首先是保障查询性能方面:如果一个表在建表时配置写错,那么查询性能会非常差,比如ES的index mapping中关闭了docvalue,或者Doris表未启动列存模式,那么查询就会退化成行存模式,性能会比较低,因此为了最大化性能,就需要将建表的过程全部自动化且规范化。这是其一。
Doris自身存储是有强Schema约束的,比如一个字符串的长度。但是ES并没有明确的长度约束,对于一个keyword类型的字段,写入128B或者256B都可以成功,但这会导致一个问题,当把一张es表同步到Doris表时,同步的成功率无法保障。另外,一旦Doris表声明的类型(如bigint)和ES index的类型不一致(如keyword)时,也会导致Sql运行失败。因此需要构建统一的数据模型来避免这类问题。
第三:使用效率。我们在使用过程中,建表、删除表、修改表是一个常见的操作,为了让各个业务线的同学(不管是否了解Doris)都可以快速的建表,这也是要做统一元数据、统一模型的基础。
最后,前面也提到了我们整个计算系统也在重构为flink-sql。flink-sql则会强依赖元数据,比如table on kafka、table on redis……
要统一元数据,统一数据模型,就得抽象整个数据表的结构,来管理好不同存储上的表,我们基于env、db、table为基本单位来管理表,database、table大家相对熟悉,env是我们引入的新namespace,主要用于提供不同集群/业务线的定义,如百度云的数仓集群、腾讯云的数仓集群,表单元下主要包含field(列类型、值域)、index(如rollup、bitmap索引等)、storage(存储属性)。
关于列属性,主要是规范化类型系统,考虑到json-schema由于其校验规则丰富、描述能力强,因此对于列值的约束统一使用json-schema来做。
对于数据类型,我们设计了公共数据类型以及私有数据类型。公共类如varchar、int等,这些在不同的存储系统都有对应的实现,也支持私有类型如Doris::bitmap,方便私有系统的兼容和扩展。通过这个模式可以将基于各个存储系统的表做了统一的管理
这是我们线上的真实的一张表。里面包含了列信息以及对应的存储配置。
左图中的纵向红框是json-schema的描述,来规范化值域。横向红框为ES表的一些meta字段,比如docid、数据更新时间。这些字段可以方便追查数据问题、以及用作数据筛选。
因为我们统一了数据模型,因此可以很方便的对所有表统一设置要增加这些meta字段。
通过元数据的统一管理,构建的表质量都非常高。所有的表都在最大化性能的提供查询服务,且由于数据导致的查询不可用case为0。且对于任何业务线的同学,不管是否了解Doris,都可以分钟级构建出这样一张高质量的表。
建好表后,就是数据的写以及读。统一基于openapi来做。
做api接口其实本质上也是为了在提供系统能力的前提下,进一步保障系统的稳定性和易用性。
比如要控制业务线的误用(如连接数打满),提供统一的入口方便写es、Doris,且控制数据质量……。
首先介绍下数据写接口。
由于统一了表模型,因此可以很方便的提供统一的写入接口协议。用户也无须关注实际表的存储是es还是Doris以及处理异构系统的系统。
第二,统一了写接口,就可以统一的对写入的数据会做校验检查,如数据的大小、类型等,这样可以保证数据写入的质量与准确性。这样对于数据的二次加工非常重要。
第三,接入协议中还增加了关键词,如数据的版本。可以解决数据的乱序问题,以及建立统一的写入监控。如下图是我们整个写入数据流的qps以及端到端(数据写入存储时间以及数据生产时间)的延迟分位值,这样可以让系统提高可观测性、白盒化。
接下来讲一个具体的场景,写入端是如何解决乱序问题的。
常态下我们的实时数据流是经过flink或spark计算后写入kafka,然后由查询系统同步到Doris/es中。
当需要修数时,如果直接写入,会导致同一个key的数据被互相覆盖,因此为了避免数据被乱序覆盖,就得必须停掉实时流,这个会导致数据时效性式受损。
因此我们基于写入端做了改进,实时数据流、离线修复数据流各自写入不同的topic,同步服务对每个topic做限速消费,如实时流时效性要求高,可以配额调的大些,保证配额,离线时效性则允许配额小点,或者在业务低峰期将配额调大,并基于数据key&列版本存储做了过滤。这样可以保证时效性的前提下,修数也可以按照预期进行。
最后是读的部分。
在提供sql能力的前提下,我们也做了一些额外的方案,比如缓存、统一的系统配置。对于系统延迟、稳定性提升都有很大的改进。并且由于统一了读接口,上述的这些改造,对于业务线来说都是透明的。
除了常规下面向低延迟的读,还有一类场景面向吞吐的读。
介绍下场景,比如 要统计统计某个学部下(各个老师)的学生上课情况:上课人数、上课时长等。
在过去,我们是基于spark/flink来处理这类问题,如spark消费kafka中的课中数据,对于每一条数据,会去redis中查询教师信息来补全维度。
常态下,当课中数据到达的时候,教师信息是就绪的,因此没有什么问题。可是在异常下,如维度流迟到、存储查询失败等,会导致课中流到达时,无法获取对应的教师信息,也就无法计算相关维度如学部的统计。
过去面临这种情况时,只能遇到这种异常,如重试如果无法解决,只能丢弃或者紧急人工干预,比如在尾标就绪后再重新回刷课中表,一旦遇到上游kafka数据过期就只能从ods层或者离线修复,效率特别低,用户体验也非常差。
基于Doris模式下,我们使用微批调度的模式。
调度系统会定期(分钟级)执行一个调度任务,基于sql join完成数据的选取。这样哪怕在异常下,课中流查不到教师数据,这样join的结果只是包含了可以查到教师数据的信息,
待教师数据就绪后,即可自动补全这部分课中数据的维度。整个过程全部自动化来容错。效率非常高。
因此这个模式的主要好处
业务端延迟可控、稳定性好。整个过程主要取决于调度的周期和Sql执行时长。调度周期可控,且由于Doris on ES的高性能,Sql执行时长几乎都可以在分钟内完成。
数据修复成本低、维护方便。一旦数据有异常,可以自动触发对应的数据窗口进行重新计算。
最后,讲下其他方面的建议实践,这些相对简单,但是在实际的应用中非常容易忽视。
ADS层表,尤其是面向平台侧的应用,慎用join。Doris的join策略比较多,如broadcast、shuffer等,如果使用需要了解原理,属于高级用户的使用范畴。对于强调快速迭代的场景下,可以使用微批模式来略降低数据更新的延迟,提高数据查询的效率。
使用Doris on ES时,尤其是在ES集群负载很高的情况下,在延迟允许的情况下建议将es的扫描超时时间设置大一点,如30s甚至更久。
Batch size,不是越大越好。我们实践中发现4096下最好,可以最高达到每秒30w的扫描速度。
Doris使用bitmap做精确去重时,有时候会发现Sql延迟比较高,但是系统cpu利用率低,可以通过调大fragment_instance_num的值。
运维Doris时,建议使用supervisor,可以帮助避免很多服务异常挂掉的问题;机器全部开启ulimit –c,避免出core时无法高效定位
当前我们在使用master版本,主要是考虑到bugfix很及时,但是也要避免新代码、feature的bug引入,因此我们会关注社区的issue、并做好case回归、固化使用模式等一系列手段来保障master在实际生产中的稳定性。
最后,讲下规划。
Doris 在作业帮实时数仓的建设中发挥了很关键的作用。
在实际的应用中,我们也发现了一些当前的一些不足。
如Doris on ES在面对大表的join查询时,目前延迟还比较大,因此需要进一步的优化解决;
Doris自身的olap表可以做动态分区,对于ES表目前可控性还不足;
其次,当ES修改表后,如增加字段,只能删除Doris表重建,可能会有短暂的表不可用,需要自动化同步或者支持在线热修改;
最后Doris on ES可以支持更多的谓词下推,如count等。
我们也希望可以和社区一起,把Doris建设的越来越好。
好的。我的分享到此结束。谢谢大家。
精彩问答Q&A
问题1:Doris on ES V.S. sparksql on ES,在功能上和性能上咱们调研过吗?对于使用哪个您这边有什么建议吗?
答:SparkSql on ES和Doris on ES 虽然都是Sql,但是在实际的生产环境中使用差异还是比较大的。
功能上来说,SparkSql和Doris-Sql需要考虑语法的兼容性问题,毕竟是两个系统,语法兼容其实很难。一旦不一致就需要用户端面向不同的系统做适配。
性能上,SparkSql或者Doris on ES,虽然访问ES的原理都差不多,但是实现上可能会有diff,这些diff会导致性能上差异比较大,如SparkSql的connector是不支持列存模式的。
场景上,如果使用SparkSql建议可以使用在流计算场景,更多的是解决吞吐的问题,类似的系统应该是Flink-Sql。可以吧数据按照行扫出来后,基于Spark的分布式计算能力、yarn的资源管理走流计算的模式。Doris on ES更适合走低延迟的场景。
问题2:Doris 支持Hive Metastore,和Flink SQL是什么关系?刚才讲的太快,有点没听懂
答:Doris其实是不支持Hive MetaStore的。只是可以从HDFS上load文件,然后在Doris的load语法中指定对应的列。
FlinkSql和这块关系不大。不过我理解你说的应该是我们的元数据,这部分背景是因为Flink-Sql运行时需要设置ddl语句,比如一张基于redis的表都有哪些列,类型是什么,这些需要统一的管理起来,目前是存储到了我们的元数据系统中。通过接口和Flink系统完成对接。
问题3:_version字段是一个内部字段?需要用户端写入的时候指定,还是系统自动创建?和HBase的version的应用场景有区别吗?
答:_version是我们数据流的一个内置协议字段。在数流转过程中,用户只要设置值即可,不需要显示创建。具体的值可以根据数据字段的写入服务来设置,比如在ods层,应该是采集侧服务来写入,如果在中间的flink清洗环节,应该是flink系统来设置,尽量让架构服务统一设置,保证稳定性。
_version字段最终会映射到存储系统中的UpdateTime字段,这个也是架构负责写入的。不需要业务侧关注。
HBase的version更多是用于多版本的管理,比如数据的回滚等。这里查询系统的_version更多是为了保证数据的时鲜性,即用户从查询系统读到的数据始终是最新的。这么做的前提主要是因为查询系统比如ES对于数据列多版本支持不太好,对于数据流更新时如果没有版本管理,容易导致乱序覆盖。和HBase的version场景还不同。
ES内部也有一个_version,但是这个_version一般是ES内部使用,用于高并发下乐观锁的实现。和当前的场景都不一样。