最佳实践|Apache Doris 在小米数据场景的应用实践与优化
导读:小米集团于 2019 年首次引入了 Apache Doris ,目前 Apache Doris 已经在小米内部数十个业务中得到广泛应用,并且在小米内部已经形成一套以 Apache Doris 为核心的数据生态。本篇文章转录自 Doris 社区线上 Meetup 主题演讲,旨在分享 Apache Doris 在小米数据场景的落地实践与优化实践。
作者|小米 OLAP 引擎研发工程师 魏祚
业务背景
因增长分析业务需要,小米集团于 2019 年首次引入了 Apache Doris 。经过三年时间的发展,目前 Apache Doris 已经在广告投放、新零售、增长分析、数据看板、天星数科、小米有品、用户画像等小米内部数十个业务中得到广泛应用,并且在小米内部已经形成一套以 Apache Doris 为核心的数据生态。
架构演进
小米引入 Apache Doris 的初衷是为了解决内部进行用户行为分析时所遇到的问题。随着小米互联网业务的发展,各个产品线利用用户行为数据对业务进行增长分析的需求越来越迫切。让每个业务产品线都自己搭建一套增长分析系统,不仅成本高昂,也会导致效率低下。因此能有一款产品能够帮助他们屏蔽底层复杂的技术细节,让相关业务人员能够专注于自己的技术领域,可以极大提高工作效率。基于此,小米大数据和云平台联合开发了增长分析系统 Growing Analytics(下文中简称 GA ),旨在提供一个灵活的多维实时查询和分析平台,统一数据接入和查询方案,帮助业务线做精细化运营。(此处内容引用自:基于Apache Doris的小米增长分析平台实践)
>>> 历史架构
增长分析平台立项于 2018 年年中,当时基于开发时间和成本,技术栈等因素的考虑,小米复用了现有各种大数据基础组件(HDFS, Kudu, SparkSQL 等),搭建了一套基于 Lamda 架构的增长分析查询系统。GA 系统初代版本的架构如下图所示,包含了以下几个方面:
数据源:数据源是前端的埋点数据以及可能获取到的用户行为数据。 数据接入层:对埋点数据进行统一的清洗后打到小米内部自研的消息队列 Talos 中,并通过 Spark Streaming 将数据导入存储层 Kudu 中。 存储层:在存储层中进行冷热数据分离。热数据存放在 Kudu 中,冷数据则会存放在 HDFS 上。同时在存储层中进行分区,当分区单位为天时,每晚会将一部分数据转冷并存储到 HDFS 上。 计算层/查询层:在查询层中,使用 SparkSQL 对 Kudu 与 HDFS 上数据进行联合视图查询,最终把查询结果在前端页面上进行显示。
针对上述两个问题,我们的目标是寻求一款计算存储一体的 MPP 数据库来替代我们目前的存储计算层的组件,在通过技术选型后,最终我们决定使用 Apache Doris 替换老一代历史架构。
>>> 基于 Apache Doris 的新版架构
选择 Doris 原因:
Doris 具有优秀的查询性能,能够满足业务需求。
Doris 支持标准 SQL ,用户使用与学习成本较低。
Doris 不依赖于其他的外部系统,运维简单。
Doris 社区拥有很高活跃度,有利于后续系统的维护升级。
>>> 新旧架构性能对比
我们选取了日均数据量大约 10 亿的业务,分别在不同场景下进行了性能测试,其中包含 6 个事件分析场景,3 个留存分析场景以及 3 个漏斗分析场景。经过对比后,得出以下结论:
在事件分析的场景下,平均查询所耗时间降低了 85%。 在留存分析和漏斗分析场景下,平均查询所耗时间降低了 50%。
应用实践
离线数据部分则会先写到 Hive 中,再通过小米的数据工场将数据导入到 Doris 中。用户可以直接在数据工场提交 Broker Load 任务并将数据直接导入 Doris 中,也可以通过 Spark SQL 将数据导入 Doris 中。Spark SQL 方式则是依赖了 Doris 社区提供的 Spark Doris Connector 组件,底层也是对 Doris 的 Stream Load 数据导入方式进行的封装。
>>> 数据查询
用户通过数据工场将数据导入至 Doris 后即可进行查询,在小米内部是通过小米自研的数鲸平台来做查询的。用户可以通过数鲸平台对 Doris 进行查询可视化,并实现用户行为分析(为满足业务的事件分析、留存分析、漏斗分析、路径分析等行为分析需求,我们为 Doris 添加了相应的 UDF 和 UDAF )和用户画像分析。
虽然目前依然需要将 Hive 的数据导过来,但 Doris 社区也正在支持湖仓一体能力,在后续实现湖仓一体能力后,我们会考虑直接通过 Doris 查询 Hive 与 Iceberg 外表。值得一提的是,Doris 1.1 版本已经实现支持查询 Iceberg 外表能力。同时在即将发布的 1.2 版本中,还将支持 Hudi 外表并增加了 Multi Catalog ,可以实现外部表元数据的同步,无论是查询外部表的性能还是接入外表的易用性都有了很大的提升。
>>> Compaction 调优
通过引导业务侧进行合理优化,对表设置合理的分区和分桶,避免生成过多的数据分片。 引导用户尽量降低数据的导入频率,增大单次数据导入的量,降低 Compaction 压力。 引导用户避免过多使用会在底层生成 Delete 版本的 Delete 操作。在 Doris 中 Compaction 分为 Base Compaction 与 Cumulative Compaction。Cumulative Compaction 会快速的把大量新导入的小版本进行快速的合并,在执行过程中若遇到 Delete 操作就会终止并将当前 Delete 操作版本之前的所有版本进行合并。由于 Cumulative Compaction 无法处理 Delete 版本,在合并完之后的版本会和当前版本一起放到 Base Compaction 中进行。当 Delete 版本特别多时, Cumulative Compaction 的步长也会相应变短,只能合并少量的文件,导致 Cumulative Compaction 不能很好的发挥小文件合并效果。
针对不同的业务集群配置不同的 Compaction 参数。部分业务是实时写入数据的,需要的查询次数很多,我们就会将 Compaction 开的大一点以达到快速合并目的。而另外一部分业务只写今天的分区,但是只对之前的分区进行查询,在这种情况下,我们会适当的将 Compaction 放的小一点,避免 Compaction 占用过大内存或 CPU 资源。到晚上导入量变少时,之前导入的小版本能够被及时合并,对第二天查询效率不会有很大影响。 适当降低 Base Compaction 任务优先级并增加 Cumulative Compaction 优先级。根据上文提到的内容,Cumulative Compaction 能够快速合并大量生成的小文件,而 Base Compaction 由于合并的文件较大,执行的时间也会相应变长,读写放大也会比较严重。所以我们希望 Cumulative Compaction 优先、快速的进行。 增加版本积压报警。当我们收到版本积压报警时,动态调大 Compaction 参数,尽快消耗积压版本。 支持手动触发指定表与分区下数据分片的 Compaction 任务。由于 Compaction 不及时,部分表在查询时版本累积较多并需要能够快速进行合并。所以,我们支持对单个表或单个表下的某个分区提高 Compaction 优先级。
具体可以参考:Apache Doris 1.1 特性揭秘:Flink 实时写入如何兼顾高吞吐和低延时
>>> 监控报警
Doris 的监控主要是通过 Prometheus 以及 Grafana 进行。对于 Doris 的报警则是通过 Falcon 进行。
另外,小米内部针对每一个 Doris 集群都有 Cloud - Doris 的守护进程。Could - Doris 最大功能是可以对 Doris 进行可用性探测。比如我们每一分钟对 Doris 发送一次 select current timestamp(); 查询,若本次查询 20 秒没有返回,我们就会判断本次探测不可用。小米内部对每一个集群的可用性进行保证,通过上述探测方法,可以在小米内部输出 Doris可用性指标。
小米对Apache Doris的优化实践
在应用 Apache Doris 解决业务问题的同时,我们也发现了 Apache Doris 存在的一些优化项,因此在与社区进行沟通后我们开始深度参与社区开发,解决自身问题的同时也及时将开发的重要 Feature 回馈给社区,具体包括 Stream Load 两阶段提交(2PC)、单副本数据导入、Compaction 内存限制等。
>>> Stream Load 两阶段提交(2PC)
遇到的问题
在 Flink 和 Spark 导入数据进 Doris 的过程中,当某些异常状况发生时可能会导致如下问题:
Flink 数据重复导入:Flink 通过周期性 Checkpoint 机制处理容错并实现 EOS,通过主键或者两阶段提交实现包含外部存储的端到端 EOS。Doris-Flink-Connector 1.1 之前 UNIQUE KEY 表通过唯一键实现了EOS,非 UNIQUE KEY 表不支持 EOS。
Spark SQL 数据部分导入:通过 SparkSQL 从 Hive 表中查出的数据并写入 Doris 表中的过程需要使用到 Spark Doris Connector 组件,会将 Hive 中查询的数据通过多个 Stream Load 任务写入 Doris 中,出现异常时会导致部分数据导入成功,部分导入失败。
Stream Load 两阶段提交设计
以上两个问题可以通过导入支持两阶段提交解决,第一阶段完成后确保数据不丢且数据不可见,这就能保证第二阶段发起提交时一定能成功,也能够保证第二阶段发起取消时一定能成功。
Doris 中的写入事务分为三步:
在 FE 上开始事务,状态为 Prepare ;
数据写入 BE;
多数副本写入成功的情况下,提交事务,状态变成 Committed,并且 FE 向 BE 下发 Publish Version 任务,让数据立即可见。
引入两阶段提交之后,第 3 步变为状态修改为 Pre Commit,Publish Version 在第二阶段完成。用户在第一阶段完成后(事务状态为 Pre Commit ),可以选择在第二阶段放弃或者提交事务。
支持 Flink Exactly-Once 语义
解决 SparkSQL 数据部分导入
>>> 单副本数据导入优化
单副本数据导入设计
性能对比测试
并发场景性能对比
测试中向 100 个表并发导入数据,每个表有 50 个导入任务,任务总数为 5000 个。单个 Stream Load 任务导入的数据行是 200 万行,约为 90M 的数据。测试中开了 128 个并发,将单副本导入和三副本导入进行了对比:
导入时间:3 副本导入耗时 67 分钟,而后单副本耗时 27 分钟完成。导入效率相当提升两倍以上。
内存使用:单副本的导入会更低。
CPU消耗对比:由于都已经是开了并发在导入,CPU开销都比较高,但是单副本导入吞吐提升明显。
>>> Compaction 内存限制
通过这种方式,我们对某些业务场景做了内存的限制,很好的避免集群负载高时占用过多内存导致 OOM 的问题。
总结
加入社区
如果你对 Apache Doris 感兴趣,请点击“阅读原文”了解并加入 Doris!我们也发起了征文活动,邀你讲讲与Doris “相遇 相知 相识”的故事,不仅有精美礼品相送,还可获得 SelectDB 全渠道传播曝光加持!最后,欢迎更多的开源技术爱好者加入 Apache Doris 社区,携手成长,共建社区生态。
相关链接:
SelectDB 官方网站:
https://selectdb.com
Apache Doris 官方网站:
http://doris.apache.org
Apache Doris Github:
https://github.com/apache/doris
Apache Doris 开发者邮件组:
dev@doris.apache.org