Apache Doris 在某工商信息商业查询平台的湖仓一体建设实践
信息服务行业可以提供多样化、便捷、高效、安全的信息化服务,为个人及商业决策提供了重要支撑与参考。本文以某工商信息商业查询平台为例,介绍其从传统 Lambda 架构到基于 Doris Multi-Catalog 的湖仓一体架构演进历程。同时通过一系列实践,展示了如何保证数据的准确性和实时性,以及如何高效地处理和分析大规模数据,为信息服务行业提供了有价值的参考思路,有助于推动整个行业的发展和创新。
作者|高级数据研发工程师、Apache Doris & Hudi Contributor 李昂
架构 1.0:传统 Lambda 架构
逻辑冗余:同一个业务方案需要开发离线和实时两套逻辑,代码复用率很低,这就增加了需求迭代成本和开发周期。此外,任务交接、项目管理以及架构运维的难度和复杂度也比较高,给开发团队带来较大的挑战。 数据不一致 :在当前架构中,当应用层数据来源存在多条链路时,极易出现数据不一致问题。这些问题不仅增加了数据排查的时间,还对数据的准确性和可靠性带来了负面影响。 数据孤岛:在该架构中,数据分散存储在不同的组件中。比如:普通商查表存储在 MySQL 中,主要支持 C 端的高并发点查操作;对于像 DimCount 涉及宽表频繁变更的数据,选择 HBase 的 KV 存储方式;对于单表数据量超过 60 亿的年度维表的点查,则借助 GaussDB 数据库实现。该方式虽然可以各自满足数据需求,但涉及组件较多且数据难以复用,极易造成数据孤岛,限制了数据的深度挖掘和利用。
OLAP 引擎调研
在选型调研阶段,该平台深入考察了 Apache Doris、ClickHouse、Greenplum 这三款数据库。结合早期架构痛点和新的业务需求,新引擎需要具备以下能力:
标准 SQL 支持:可使用 SQL 编写函数,学习和使用成本较低; 多表联合查询能力:支持人群包即时交并差运算、支持灵活配置的人群包圈选; 实时 Upsert 能力:支持 Push 推送日志数据的 Upsert 操作,每天需要更新的数据量高达 6 亿条; 运维难度:架构简单,轻量化部署及运维。
多种 Join 逻辑:通过 Colocation Join、Bucket Shuffle Join、Runtime Filter 等 Join 优化手段, 可在雪花模型的基础上进行高效的多表联合 OLAP 分析。 高吞吐 Upsert 写入:Unique Key 模型采用了 Merge-on-Write 写时合并模式,支持实时高吞吐的 Upsert 写入,并可以保证 Exactly-Once 的写入语义。 支持 Bitmap:Doris 提供了丰富的 Bitmap 函数体系,可便捷的筛选出符合条件的 ID 交并集,可有效提高人群圈选的效率。 极简易用:Doris 满足轻量化部署要求,仅有 FE、BE 两种进程,使得横向扩展变得简单,同时降低了版本升级的风险,更有利于维护。此外,Doris 完全兼容标准 SQL 语法,并在数据类型、函数等生态上提供了更全面的支持。
引入 Unique Key 写时合并机制 : 为了满足大表在 C 端常态并发下的点查需求,通过设置多副本并采用 Unique Key 写时合并机制,确保了数据的实时性和一致性。基于该机制 Doris 成功替代了 GaussDB,提供了更高效、更稳定的服务。 引入 Light Schema Change 机制 : 该机制使可以在秒级时间内完成 DimCount 表字段新增操作,提高了数据处理的效率。基于该机制 Doris 成功替代了 HBase,实现了更快速、更灵活的数据处理。 引入 PartialUpdate 机制 : 通过 Aggregate 模型的 REPLACE_IF_NOT_NULL,加速两表关联的开发,这一改进使得多表级联开发更加高效。
2023 年 3 月,Apache Doris 正式上线后,运行了两个集群十余台 BE,这两个集群分别负责数分团队商业化分析与数据平台部架构优化,共同支撑大规模的数据处理分析的重要任务,每天支撑数据量高达 10 亿条,计算指标达 500+,支持人群包圈选、优惠券推送、充值订单分析及数据交付等需求。 2023 年 5 月,借助 Apache Doris 完成数分团队商业化分析集群 ETL 任务的流式覆写,近半离线定时调度任务迁移至 Doris 中,提高了离线计算任务的稳定性和时效性,同时绝大多数实时任务也迁移至 Apache Doris 中,整体集群规模达到二十余台。
架构 3.0:基于 Doris Multi-Catalog 的湖仓一体架构
利用 Hudi 天然支持 CDC 的优势,在 ODS 层将 Hudi 作为 Queryable Kafka,实现贴源层数据接入。 使用 MySQL 作为 Queryable State 进行分层处理,最终结果首先会写入 MySQL,再根据数据用途同步到 Hudi 或 Doris 中。 对于存量数据的录入,通过自定义 Flink Source 实现全量数据的 Exactly Once 抽取至 Hudi,同时支持谓语下推与状态恢复。
针对进行人群圈选、Push 分析以及 C 端分析等在线业务,会将数据存储在 Doris 中,这样能够充分利用 Doris 的高性能特性响应线上的高并发查询,同时能够提升整体运营效率和客户满意度,确保关键数据的快速处理和高效访问。 其他偏离线业务的数据存储在 Hudi 中,通过 Doris 的 Hudi Catalog 进行联邦查询。通过存储和计算的分离可以降低 Tablet 的维护开销、提高集群的稳定性,同时这一架构也可以降低写入压力、提升计算时节点的 IO 与 CPU 利用率,实现更高效的数据处理和分析。
某工商信息商业查询平台在 C 端查询业务中面临的核心挑战如下:
超大规模明细表的高并发查询:平台中存在超 60 亿的超大规模明细表,需要提供对该明细表的高并发查询能力。
多维度深度分析:数据分析团队希望对 C 端数据进行多维度分析,深入挖掘更多隐藏维度及数据穿透关系,这需要强大的数据处理和灵活的数据分析能力,以便从大量数据中提取有价值的信息。
定制化实时看板:希望将某些固定模板的 SQL 定制为实时看板,并满足并发查询与分钟级数据新鲜度的要求。同时希望将实时数据看板嵌入到 C 端页面中,以增强 C 端功能性与便利性。
为应对 C 端提出的挑战,该平台利用了 Apache Doris 的多个特性,实现了单点查询速度提升 127 %、批量/全量数据条件查询速度提升 193% 、开发效率提升 100% 的显著提升,此外面向 C 端的并发能力显著增强,目前可以轻松承载高达 3000 QPS 的线上并发量。
01 引入 Merge-on-Write,百亿级单表查询提速近三倍
为解决年报相关表(数据量在 60 亿)在 C 端的高并发查询问题,同时实现降本增效的目标,该平台启用了 Doris 的 Unique Key Merge-on-Write 写时合并功能。
每个文件都生成一个主键索引, 用于快速定位重复数据出现的位置 每个文件都会维护一个 min/max key 区间,并生成一个区间树。查询重复数据时能够快速确定给定 key 可能存在于哪个文件中,降低查找成本 每个文件都维护一个 BloomFilter,当 Bloom Filter 命中时才会查询主键索引 通过多版本的 DeleteBitmap,来标记该文件被删除的行号
company_base_info
表为例,单表数据量约 3 亿行、单行数据约 0.8KB,单表全量数据写入耗时约 5 分钟。在开启 Merge-on-Write 写时合并后,执行查询的耗时从之前的 0.45 秒降低至 0.22 s,对批量或全量数据进行条件查询时耗时从 6.5 秒降低至 2.3 秒,平均性能提升接近 3 倍。02 部分列数据更新,数据开发效率提升 100%
在该商业查询平台的业务场景中, 有一张包含企业各种维度的大宽表,而平台要求任意维度的数据变更都反映到落地表。在之前开发中,需要为每个维度开发一套类似 Lookup Join 的逻辑,以确保每个维度的变更都可以及时更新。
但是这种做法也带来一些问题,比如每新加入一个维度时,其他维度的逻辑也需要进行调整,这增加了开发和维护的复杂性和工作量。其次,为了保持上线的灵活性,该平台并没有将所有维度合并为一张表,而是将 3-5 个维度拆分为一张独立的表,这种拆分方式也导致后续使用变得极为不方便。
@RequiredArgsConstructor
private static class Sink implements ForeachPartitionFunction<Row> {
private final static Set<String> DIMENSION_KEYS = new HashSet<String>() {{
add("...");
}};
private final Config config;
@Override
public void call(Iterator<Row> rowIterator) {
ConfigUtils.setConfig(config);
DorisTemplate dorisTemplate = new DorisTemplate("dorisSink");
dorisTemplate.enableCache();
// config `delete_on` and `seq_col` if is unique
DorisSchema dorisSchema = DorisSchema.builder()
.database("ads")
.tableName("ads_user_tag_commercial").build();
while (rowIterator.hasNext()) {
String json = rowIterator.next().json();
Map<String, Object> columnMap = JsonUtils.parseJsonObj(json);
// filter needed dimension columns
Map<String, Object> sinkMap = columnMap.entrySet().stream()
.filter(entry -> DIMENSION_KEYS.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
dorisTemplate.update(new DorisOneRow(dorisSchema, sinkMap));
}
dorisTemplate.flush();
}
}
为解决此问题,该平台采用了自定义的 DorisTemplate(内部封装 Stream Load),以实现对存量数据的处理。其核心思想是参考了 Kafka Producer 的实现方式,使用 Map 来缓存数据,并设立专门的 Sender 线程,根据时间间隔、数据条数或数据大小定期发送数据。
通过从源端过滤出所需的列,将其写入 Doris 的企业信息维表中。同时,针对两表 Join 场景,选择用 Agg 模型的 REPLACE_IF_NOT_NULL 进行优化,使得部分列的更新工作变得更加高效。
这种改进为开发工作带来了 100% 的效率提升,以单表三维度举例,以前需要 1 天的时间进行开发,而现在仅仅需要 0.5 天。这一改变提升了开发效率,使其能够更迅速地处理数据并满足业务需求。
03 丰富 Join 的优化手段 ,整体查询速度最高提升近四倍
在该平台的业务场景中,超过 90% 的表都包含实体 ID 这一字段,因此对该字段构建了 Colocation Group,使查询时执行计划可以命中 Colocation Join,从而避免了数据 Shuffle 带来的计算开销。与普通的 Shuffle Join 相比,执行速度提升了 253%,极大地提高了查询效率。
对于一级维度下的某些二级维度,由于只存储了一级维度的主键 ID 而没有实体 ID 字段,如果使用传统的 Shuffle Join 进行查询,那么 A 表与 B 表都需要参与 Shuffle 操作。为了解决这个问题,该平台对查询语法进行了优化,使查询能够命中 Bucket Shuffle Join,从而降低了 50% 以上的 Shuffle 量,整体查询速度提升至少 77%。
04 Light Schema Change,线上 QPS 高达 3000+
为了提升 C 端并发能力,该平台为每个实体的每个维度都维护了一个 count 值。后端同学在查询数据前,会先查询该 count 值,只有在 count 值大于 0 的情况下,才会继续获取明细数据。为了适应维度不断扩张的迭代需求,选择采用了一套 SSD 存储的 HBase 集群,利用其 KV 存储特性维护了这套 count 值。
当引入新的需要计算的维度时,处理流程如下:
将其 KafkaTopic、查询 SQL 等信息录入 Apollo 配置平台
Flink 程序通过 Apollo 的 Listener 检测到有新的指标,请求 Redis 分布式锁
查询该维度的 success_key
,
判断是否完成过初始化通过
alter table
语句完成初始化,并设置 success_key其他 subtask 顺次执行 2-4 步骤
程序继续执行,新的 count 值已经可以写入
实时宽表 Join 的痛点在于多表外键关联,比如:select * from A join B on A.b_id=B.id join C on B.c_id = C.id 在实时模型构建时,A、B、C 三表都有可能各自发生实时变更,若要使得结果表对每个表的变化都进行实时响应,在 Flink 框架下有 2 种实现方式:
A、B、C 三张表,每张表都开发一套关联另外两张表的 Lookup Join 逻辑。
设置 Flink 中 State 存储的 TTL 为更长时间,例如 3 天。这样可以保证在 3 天内的数据变化能够被实时感知和处理, 同时,通过每日离线计算,可以保证 3 天前的更新能够在 T+1 的时间内被处理和反映在数据中。
而以上并不是最优方式,还存在一些问题:
随着宽表所需的子表数量不断增长,额外的开发成本和维护负担也随之线性上升。
TTL 时间的设定是一把双刃剑,设定过长会增加 Flink 引擎状态存储的额外开销,而设定过短则可能导致更多的数据只能享受 T+1 的数据新鲜度。
在之前的业务场景中,我们考虑将方案一与方案二进行结合,并进行了适当的折中。具体来说,只开发 A join B join C 的逻辑,并将产出的数据首先存储到 MySQL 中。每当 B、C 表出现数据变更时,通过 JDBC 查询结果表来获取所有发生变化的 A 表 ID,并据此重新进行计算。
然而,在引入 Doris 之后,通过写时合并、谓词下推、命中索引以及高性能的 Join 策略等技术,为该平台提供了一种查询时的现场关联方式,这不仅降低了开发的复杂度,还在三表关联的场景下,由原先需要的 3 人天的工作量降低只需要 1 人天,开发效率得到极大提升。
优化经验
在生产实践的过程中,该平台也遇到了一些问题、包括文件版本产生过多、事务挤压、FE 假死等问题报错。然而,通过参数调整和方案调试,最终解决了这些问题,以下是优化经验总结。
01 E-235(文件版本过多)
在凌晨调度 Broker Load 时, 由于调度系统任务挤占可能会导致同时并发多个任务,使得 BE 流量剧增,造成 IO 抖动、 产生文件版本过多的问题(E-235)。因此对此进行了以下优化,通过这些改动 E-235 问题未再发生:
使用 Stream Load 替代 Broker Load, 将 BE 流量分摊到全天。
自定义写入器包装 Stream Load, 实现异步缓存、限流削峰等效果, 充分保证数据写入的稳定性。
优化系统配置,调整 Compaction 和写入 Tablet 版本的相关参数:
02 E-233(事务挤压)
在深入使用 Doris 的过程中,发现在多个 Stream Load 同时写入数据库时,如果上游进行多表数据清洗并且限速难以把控时,可能会出现 QPS 较大的问题,进而触发 E-233 报错。为了解决这个问题,该平台进行了以下调整,调整之后在面对实时写入 300+ 表时, 再未复现 E-233 问题:
将 DB 中的表进行更细致的分库,以实现每个 DB 的事务分摊
参数调整:max_running_txn_num_per_db : 100->1000->2048
03 FE 假死
通过 Grafana 监控发现, FE 经常出现宕机现象,主要原因是因为早期该平台采用 FE 和 BE 混合部署的方式,当 BE 进行网络 IO 传输的时候,可能会挤占同机器 FE 的 IO 与内存。其次,因运维团队的 Prometheus 对接的服务比较多,其稳定性与健壮性不足, 从而造成假象告警。为解决这些问题做了以下调整:
当前 BE 机器的内存是 128G, 使用 32G 的机器将 FE 节点迁出。
Stream Load 过程中,Doris 会选定一个 BE 节点作为 Coordinator 节点,用于接收数据并分发至其他 BE 节点,并将最终导入结果返回给用户。用户可通过 HTTP 协议提交导入命令至 FE 或直接指定 BE 节点。该平台采用直接指定 BE 的方式,实现负载均衡,减少 FE 在 Stream Load 中的参与,以降低 FE 压力。
定时调度 show processlist 命令进行探活, 同时及时 Kill 超时 SQL 或者超时连接。
结束语
截止目前,基于 Doris 的数据平台已经满足该商业查询平台在实时与离线的统一写入与查询,支持了 BI 分析、离线计算、C 端高并发等多个业务场景,为产品营销、客户运营、数据分析等场景提供数据洞察能力与价值挖掘能力。未来,该商业查询平台还计划进行以下升级与优化:
版本升级:升级 Apache Doris 2.0 版本,更进一步实现高并发点查和部分列更新等最新特性,进一步优化现有架构,为查询提效。
规模扩大:进一步扩大集群规模,并将更多的分析计算迁移到 Doris 中。
日志分析:随着节点数越来越多,日志数据也在不断产生,未来该平台计划将集群日志接入到 Doris 中统一收集管理和检索,便于问题的提示探查,因此倒排索引和日志分析也是后面重要的拓展场景。
自动化运维:在某些特定查询场景下,可能会导致集群 BE 节点宕机,虽然出现概率较低,但手动启动仍然比较麻烦,后续将引入自动重启能力,使节点能够快速恢复并重新投入运行。
提升数据质量:目前该平台大部分时间专注于业务的实现上,数据入口的统一收束和补齐,数据质量监控还存在短板,所以希望可以在这方面提升数据质量。
更多行业实践
智慧金融与政企:杭银消金|河北幸福消费金融|金融壹账通|平安人寿|同程数科|星云零售信贷|招商信诺人寿|360数科
互联网与文娱:斗鱼|叮咚买菜|货拉拉|荔枝微课|票务平台|奇安信|腾讯音乐|天眼查|网易互娱|网易严选|小米|小鹅通|约苗|字节跳动|知乎|360商业化
企业服务与新经济:橙联|度言|观测云|慧策|领健|领创|Moka BI|美联物业|拈花云科|思必驰|物易云通|云积互动|有赞|纵腾集团