快手 Druid 精确去重的设计和实现
The following article is from DataFunTalk Author 邓钫元
分享嘉宾:邓钫元 快手大数据
编辑整理:王吉东
内容来源:Druid 中国用户组 MeetUp 6th
出品社区:DataFun
注:欢迎转载,转载请注明出处
本次分享内容提纲:
快手 Druid 平台概览
Druid 精确去重功能设计
Druid 其他改进
快手 Druid Roadmap
★ 快手 Druid 平台概览
1. 从业务需求角度考虑,快手为什么选择 Druid ?
快手的业务特点包括超大数据规模、毫秒级查询时延、高数据实时性要求、高并发查询、高稳定性以及较高的 Schema 灵活性要求;因此快手选择 Druid 平台作为底层架构。由于 Druid 原生不支持数据精确去重功能,而快手业务中会涉及到例如计费等场景,有精确去重的需求。因此,本文重点讲述如何在 Druid 平台中实现精确去重。另一方面,Druid 对外的接口是 json 形式 ( Druid 0.9 版本之后逐步支持 SQL ) ,对 SQL 并不友好,本文最后部分会简述 Druid 平台与 MySQL 交互方面做的一些改进。
2. 下面重点介绍快手 Druid 平台的架构:
和大多数数据平台一样,快手的数据源主要有两种:基于 kafka 的实时数据源和建立在 hadoop 内的离线数据源。将这两种数据源分别通过 kafka index 和 hadoop index 导入到平台核心—— Druid 模型中。
在 Druid 中,将数据进行业务隔离、冷热分层 ( 例如:冷数据放在硬盘,热数据放在 SSD )
外部业务接口方面,支持通过 API 自行开发业务模块,支持通过 Kwai BI 做可视化;同时针对小部分人需求,现已支持 tableau ( tableau 通过 hive 连接 Druid ) 。
数据平台辅助系统包括 metric 监控 ( 实时监控 CPU 占用率情况,进程情况,lag 信息等 ) 、探针系统 ( 用来查询,定义查询热度、维度等信息,有助于优化 ) 、管理系统 ( 在原生态 Druid 的 json 接口基础上定制管理系统,实现任务的可视化导入等管理操作 )
★ Druid 精确去重概述
1. 原生 Druid 去重功能支持情况
a. 维度列
cardinality agg,非精确,基于 hll 。查询时 hash 函数较耗费 CPU
嵌套 group by,精确,耗费资源
社区 DistinctCount 插件,精确,但是局限很大:
① 仅支持单维度,构建时需要基于该维度做 hash partition
② 不能跨 interval 进行计算
b. 指标列
HyperUniques/Sketch,非精确,基于 hll,摄入时做计算,相比 cardinality agg 性能更高
结论:Druid 缺乏一种支持预聚合、资源占用低、通用性强的精确去重支持。
2. 精确去重方案
a. 精确去重方案:hashset
使用 hashset 的方式存储,具备简单通用等优点,但是资源占用非常大。以下图所示为例,将左上角的原始数据使用 hashset 方法,Year 作为维度列,City 作为指标列;按照维度列分成2个 Segment,转换为右上角的数据格式;接下来将指标列聚合到 broker 中,计算出 size,即得到最终结果。整个过程如下图所示:
但是使用 hashset 的去重方式,其资源占用非常大;具体来说,假设有 5000W 条平均长度为 10B 的 string ( 共500MB ) ,中间生成的 Hashset 会产生一系列链表结构,导致内存可达到 5G,即内存扩展可达10倍。因此,直接使用 hashset 的方法可以是完全不可行的。
在 hashset 方法基础上可以考虑一些优化的思路:例如,通过增加节点,将数据分散到不同机器上,查询时在每台机器上分别聚合进而求和;另一种方式类似于 MapReduce,计算前利用 shuffle 模型将数据打散。然而这两种优化思路和 Druid 平台属性有冲突,因此都不适用于 Druid 这个平台。
b. 精确去重方案:字典编码+Bitmap
Bitmap 方法,即用一个位来表示一个数据 ( 这是理论上的最小值 ) 。这种方案的思路是:对要去重的数据列做编码,将 string 或其他类型的数据转化成 int 型的编码,摄入到 Druid 后通过 Bitmap 的形式存储;查询时将多个 Bitmap 做交集,获取结果。Bitmap 的范围是42亿,最大占用空间 500M 左右;还可以使用压缩算法,使空间占用大大减少。
因此,字典编码+Bitmap 的方式,优点是存储和查询资源占用少,可以将构建和查询分开 ( 即字典不参与查询 ) ;缺点是:字典需要做全局编码,其构建过程较复杂。
考虑到 Kylin 曾使用过这种方案,因此快手的后续优化方案选择在这种方案的基础上进行优化。
3. 字典编码方案
a. 字典编码方案的使用
① Redis 方案
Redis 可以进行实时编码,可以同时支持离线任务和实时任务,因此,实时处理数据和离线处理数据可以使用相同的方案。
但是 Redis 的 ID 生成速度有瓶颈,最大5W/s;查询上压力也较大,对交互会造成一定的麻烦;同时对 Redis 的稳定性要求也比较高。
② 离线 MR 方案
离线 MR 方案,利用 MR 的分布式存储,编码和查询的吞吐量更高,还用用更高的容错性。但是 MR 仅支持离线导入任务。
综合这两种字典编码方案的优劣,得到以下结论:
离线任务使用离线 MR 编码,通过小时级任务解决实现准实时
实时任务仅支持原始 int 去重 ( 无需编码 )
b. 字典编码方案模型
快手的字典编码方案参照的是 Kylin 的 AppendTrie 树模型,模型详见下图。
Trie 树模型主要是使用字符串做编码;快手支持各种类型数据,可将不同类型数据统一转换成字符串类型,再使用 Trie 树模型做编码。Trie 树模型可以实现 Append,有两种方式:
按照节点 ( 而不是位置 ) 保存 ID,从而保持已有 ID 不变。
记录当前模型最大 ID,便于给新增节点分配 ID 。
但是这种模型会带来一个问题:假如基数特别大,Trie 树在内存中就会无限扩张。为了控制内存占用率,选择单颗子树设定阈值,超过即分裂,进而控制单颗子树内存消耗。分裂后,每棵树负责一个范围 ( 类似于 HBase 中的 region 分区 ) ;查询时候只需要查询一颗子树即可。
为节省 CPU 资源,基于 guava LoadingCache,按需加载子树 ( 即懒加载 ) ,并使用 LRU 方法换出子树。
LRU ( Least Recently Used ) 是内存管理的一种置换算法,即内存不够时,将最近不常用的子树从内存清除,加载到磁盘上。LRU 算法根据数据的历史访问记录来进行淘汰数据,其核心思想是“如果数据最近被访问过,那么将来被访问的几率也更高”。
c. 字典并发构建:
字典会存在并发构建的问题,分别使用 MVCC 以及 Zookeeper 分布式锁的方案。
① 使用 MVCC ( 持久化在 hdfs 上 ) ,在读取的时候会使用最新版本,拷贝到 working 中进行构建,构建完成生成新的版本号;如历史数据过多,会根据版本个数及 TTL 进行清理。
② 构建字典的过程中会操作临时目录,如果存在多个进程同时去写临时目录,会存在冲突问题;因此引入 Zookeeper 分布式锁,基于 DataSource 和列来做唯一的定位,从而保证同一个字典同时只能有一个进程。
d. 精确去重的实现
① 新增 unique 指标存储
使用 ComplexMetricSerde 定义一个指标如何 Serde,即序列化与反序列化;
使用 Aggregator,让用户定义一个指标如何执行聚合,即上卷;
使用 Aggregator 的 Buffer 版本 BufferAggregator 作为 Aggregator 的替代品;
使用 AggregatorFractory 定义指标的名称,获取具体的 Aggregator 。
② 精确去重的整体流程介绍
使用 DetermineConfigurationJob 计算最终 Segment 分片情况。
使用 BuildDictJob,Map 将同一去重列发送到一个 reducer 中 ( Map端可先combine ) ;每个 reducer 构建一列的全局字典;每列字典构建申请 ZK 锁。
IndexGeneratorJob,在 Map 中加载字典,将去重列编码成 int;Reducer 将 int 聚合成 Bitmap 存储;每一个 reducer 生成一个 Segment。
此套精确去重的使用方法:
导入时,定义 unique 类型指标
查询时,使用 unique 类型查询
e. 性能优化与测试
① 优化字典查询
· 存在一个超高基数列 ( UHC ) 的去重指标:在 IndexGenerator 之前增加 ClusterBy UHC 任务,保证每个 map 处理的 uhc 列数据有序,避免多次换入换出。
· 存在多个超高基数列的去重指标:拆成多个 DataSource,调大 IndexGenerator 任务的 map 内存,保证字典能加载到内存
② Bitmap 查询优化
减少 targetPartitionSize 或调大 numShards,增加 Segment 个数,提升并行度。
使用 Bitmap 做 or 计算时候,建议使用 batchOr 方法,而不是逐行 or,避免 I/O 与 or 计算交替,缓存持续被刷新导致性能降低。关于 batchOr 的选择,rolling_bitmap 提供一些接口,默认选择 naive_or ( 尽量延迟计算模型 ) ,通常情况下其性能较好;priorityqueue_or 使用堆排序,每次合并最小两个 bitmap,消耗更多内存,官方建议 benchmark 后使用;如果结果是较长的连续序列,可以选择手动按顺序依次 inplace or。
另一个思路是 Croaring-high performance low-level implementation
Rolling bitmap 用 C 语言实现了 Simd,相对于 java 版,Smid 版更多地利用了向量指令 ( avx2 ) ,性能提升了80%。
快手将 Smid以JNI 的方式引入,对100W的 Bitmap 做 Or 操作,使用 Java 版本单线程操作,计算用时13s;用 JNI 调用 Croaring,数据进行反序列化(需要 memcpy 用时6.5s),计算消耗7.5s,总用时14s(慢于 java 版本)。Croaring 暂时不支持原地 inplace 反序列化 ( ImmutableRoaringBitmap ) ,导致其实际运行效率与官方称的80%提升不一致。
③ 传输层编码
Broker 与 Historical 之间通过 http 协议交换数据,默认打开 gzip 压缩 ( 也可以选择不压缩,即 plain json ) 。测试发现 gzip 压缩的过程中会耗用大量 CPU,因此在万兆的网络下建议不压缩。
④ 对上述查询优化步骤依次性能测试:
8个维度,10亿基数的数据,选择列 author_id 作为其去重指标,摄入后达150W行;10台 historical 机器:
未做优化去重,查询时间50s;
增加 Segment 数量至10,查询时间为7s,提升约7倍性能;
将 Or 的策略设置为 BatchOr,查询时间为4s;
关闭 gzip 压缩,查询时间为2s。
以上是对10亿基数数据作去重的查询时间;如果数据基数在1亿以下,查询时间为毫秒级。
★ Druid其他方面做的改进
1. 资源隔离部署方案
该方案为 Druid 官方推荐部署方案,充分利用 Druid 的分层特质性,根据业务,根据数据冷热,分到不同 proxy 上,保证各个业务相互不受影响。
2. 物化视图
物化视图是包括一个查询结果的数据库对象,可以将其理解为远程数据的的本地副本,或者用来生成基于数据表求和的汇总表,这些副本是只读的。
如果想修改本地副本,必须用高级复制的功能;如果想从一个表或视图中抽取数据时,可以用从物化视图中抽取。物化视图可以通过数据库的内部机制可以定期更新,将一些大的耗时的表连接用物化视图实现,会提高查询的效率。
a. 维度物化
Druid 社区0.13开始具备物化视图功能,社区实现了维度的物化。应用场景如下:原始数据有很高维度,而实际查询使用到的维度是原始维度的子集;因此不妨对原始维度做小的聚合 ( 类似 Kylin 中的 cube 和 cuboid ) ;对经常查询的部分维度做物化。
b. 时序物化
除了维度上的物化,还包括时序上的物化,例如将原始数据按照小时聚合、按照分钟聚合。
c. 物化效果
这里,主要关心物化膨胀率和物化命中率,两项指标会影响最终的查询效率。物化膨胀率,表示物化后的数据存储耗用资源情况;物化命中率,可以描述物化后与实际应用场景的匹配度。表格中,ds4 和 ds5 都做到了物化膨胀率很低,物化命中率很高,因此,查询效率可获得7~9倍的提升。
3. Historical快速重启
快手数据平台硬件资源,大约是12块容量2T的SATA硬盘,数据量平均为10W segments,数据占用10T空间,重启一次大约40min。
Druid 中,默认重启的时候会加载全部数据;考虑到超过10T的元信息在启动时并不需要,可以将加载数据推迟到查询时候。因此,使用 Guava Suppliers.memoize 命令,延迟数据加载信息,只有在查询的时候才作列的加载。此过程通过一个参数 ( LazyLoad ) 控制。
( 此代码已提交 Druid 社区:druid.segmentCache.lazyLoadOnStart ( pr: 6988 ) )
优化后,重启过程只需要 2min,提升20倍。
4. Kafka Index 方面的改进
a. TaskCount 自动伸缩
Kafka index 任务数即 TaskCount 为固定,需要按照峰值任务数设定,这样导致在非高峰时刻会存在资源浪费。
这里实施的一个优化策略是:
KafkaSupervisor 增加 DynamicTaskCountNotice
基于 task 的 cpu use & kafka lag 阈值,进行25%增减 task count
无需重启 supervisor
b. 精细化调度
Middle Manager 的 indexing task 资源分配从 slot 改成按照内存大小分配 ( 类似于 MapReduce 从1.0到2.0的改进 ) ,具体优化方法如下:
区分 Kafka indexing task 和 Hadoop indexing task ( 一般 Hadoop indexing task 不需要占用内存 )
允许在提交 task 时指定 task 内存大小。
使用这种方式优化,实时数据任务处理可节省超过65%的内存占用,离线数据任务处理可节省超过87%的内存占用。
5. 元数据交互
元数据 ( Metadata ),又称中介数据、中继数据,主要是描述数据属性 ( property ) 的信息,用来支持如指示存储位置、历史数据、资源查找、文件记录等功能,是关于数据的组织、数据域及其关系的信息,可以看作是一种电子式目录。简言之,元数据就是关于数据的数据。
a. Overlord 与 MySQL 交互优化
Overlord 节点负责接收任务,协调和分配任务,为任务创建锁,并返回任务状态给任务发送方,Overlord 有两种运行模式:本地模式或者远程模式 ( 默认本地模式 ) 。
Overlord 控制台可以查看等待的任务、运行的任务、可用的 worker,最近创建和结束的 worker 等。
Segment 是 Druid 中最基本的数据存储单元,采用列式的方式存储某一个时间间隔 ( interval ) 内某一个数据源 ( dataSource ) 的部分数据所对应的所有维度值、度量值、时间维度以及索引。
Segment 的逻辑名称结构为:
<dataSource>_<intervalStart>_<intervalEnd>_<version>_<partitionNum>
<dataSource> 表示数据源 ( 或表 ) ;<intervalStart> 和 <intervalEnd> 分别表示时间段的起止时间;<version> 表示版本号,用于区分多次加载同一数据对应的 Segment;<partitionNum> 表示分区编号 ( 在每个 interval 内,可能会有多个 partition )
Segments 在 HDFS 上的物理存储路径下包括两个文件:descriptor.json 和 index.zip 。前者记录的是 Segment 的描述文件(样例见下表),其内容也保存在 Druid 集群的元数据的 druid_segments 表中;index.zip 则是数据文件。
描述文件descriptor.json样例:
{
"dataSource": "AD_active_user",
"interval": "2018-04-01T00:00:00.000+08:00/2018-04-02T00:00:00.000+08:00",
"version": "2018-04-01T00:04:07.022+08:00",
"loadSpec": {
"type": "hdfs",
"path": "/druid/segments/AD_active_user/20180401T000000.000+0800_20180402T000000.000+0800/2018-04-01T00_04_07.022+08_00/1/index.zip"
},
"dimensions": "appkey,spreadid,pkgid",
"metrics": "myMetrics,count,offsetHyperLogLog",
"shardSpec": {
"type": "numbered",
"partitionNum": 1,
"partitions": 0
},
"binaryVersion": 9,
"size": 168627,
"identifier": "AD_active_user_2018-04-01T00:00:00.000+08:00_2018-04-02T00:00:00.000+08:00_2018-04-01T00:04:07.022+08:00_1"
}
将 druid_segments 增加索引 ( dataSource,used,end ) ,查询时间从 10s 优化到 1s 。
b. Coordinator 与 MySQL 交互优化
Druid 的 Coordinator 节点主要负责 Segment 的管理和分发。具体的就是,Coordinator 会基于配置与历史节点通信加载或丢弃 Segment 。Druid Coordinator 负责加载新的 Segment,丢弃过期的 Segment,管理 Segment 的副本以及 Segment 的负载均衡。
Coordinator 会根据配置参数里的设置的事件定期的执行,每次执行具体的操作之前都会估算集群的状态。与 broker 节点和 historycal 节点类似的,Coordinator 节点也会和 Zookeeper 集群保持连接用于交互集群信息。同时,Coordinator 也会和保存着 Segment 和 rule 信息的数据源保持通信。可用的 Segment 会被存储在 Segment 的表中,并且列出了所有应该被集群加载的 Segment 。Rule 保存在 rule 的表中,指出了应该如何处理 Segment 。
① Segment 发现
将 Coordinator 中默认的全量扫描 druid_segments 表改成增加读取,druid_segments 添加索引 ( used,created_date ) ,查询时间从 1.7min 优化到 30ms 。
② 整个协调周期
Segment 之前使用 TreeSet 按时间排序确保优先 load 最近的 segment 现在优化成 ConcurrentHashSet,并分成最近一天和一天前的两个 set,先操作最近一天的;
Apply rules 的时候对 LoadRule 先判断集群副本和配置的是否一致,是的话就跳过,提升并发能力;
Cleanup 和 overshadow 两个操作合并;
查询时间从 3min 优化到 30s 。
★ 快手 Druid Roadmap
1. 在线化
今年会实现多集群、高可用,着重保证在线业务的 SLA,同时线上业务也要着重权限的管理。
2. 易用性
实现对 SQL 的支持,同时快手的 BI 产品也会做相应的升级和优化。
3. 性能及优化 ( Druid 内核方面,与社区合作 )
自适应的物化视图,智能化
数值型索引
向量化引擎
最后附上我们提交社区的代码:
https://github.com/apache/incubator-druid/pull/7594 ( 精确去重功能 )
https://github.com/apache/incubator-druid/pull/6988 ( historical 快速重启 )
作者介绍:
邓钫元,快手大数据架构团队研发工程师,毕业于浙江大学,曾就职于百度、贝壳,目前负责快手 Druid 平台研发工作,多年底层集群以及 OLAP 引擎研发、分布式系统的优化经验,热衷开源,为 hadoop / kylin / druid 等社区贡献代码。
——END——
数仓,不就是写SQL吗? | 四个SQL密技
深度解读:Flink 1.11 SQL 流批一体的增强与完善
附PPT|阿里巴巴实时数仓最新架构图
职场建议:给新人和老鸟的几点建议(都是干货)
Q: 关于实时数仓你还想了解什么?
更多精彩,请在文末点击“实时数仓”查看
!关注不迷路~ 各种福利、资源定期分享!