基于ClickHouse的百亿级广告平台实时数仓构建实战
【作者介绍】王磊,阿里云 MVP,华院计算技术总监。著有:《图解 Spark 大数据快速分析实战》;《offer 来了:Java 面试核心知识点精讲(原理篇)》;《offer 来了:Java 面试核心知识点精讲(架构篇)》。
编辑 | 韩楠
约 5100 字 | 10 分钟阅读
问题背景
我们的广告平台的数据,包括实时数据和离线数据。实时数据首先会发送到消息系统,然后在消息系统中进行流转批,最终将转换后的数据存储在S3上,离线数据则直接调用S3的API将数据存储在S3上。S3上存储的数据格式有:CSV、TXT、Parquet、ORC等。
图1
查询的时候通过Athena、Spark或者Hadoop进行查询分析。最终分析的数据用于BI、统计报表和告警规则等。
这种方案的好处是多源数据入库方便,并且查询的时候,在Athena中通过标准SQL按照不同的需求,可以进行任何维度的数据分析。
但是这种方案有一个很大的缺点就是查询效率慢,并且多源数据的一致性难以保障。一般对TB级别数据量的分析耗时在20秒以上,因此它更适合于离线数据分析。这也是大数据行业比较典型的离线数仓方案。
图2
梳理分析,解决问题
怎么做既能够将多源数据进行融合,又能实现快速实时的分析数据。同时服务器的预算又能控制在合理的范围内,这个问题,可以说一直是我们架构优化的方向。
由于离线数据的缺点随着数据量的增大越来越被放大,因此我们迫切需要一个既能快速融合和分析多源数据,同时又经济实惠的实时数仓的方案。经过内部讨论,这个方案需要满足这样的一些条件,梳理了七点。
图3
上面7个关键点,第3、4点是目前实时数仓建立的核心需求,也是难点。
值得注意的是:
任何解决方案都没有完美的,我们只能在大量的实践摸索中不断加以完善、丰富。我们的实时数仓基于ClickHouse的projection在构建主题上探索出了比较好的方案,但是基于数据变更的自动感知,目前仍然不是个比较完美的解决方案,后续的实践中还需要进一步完善。行业里该方案其实也处于探索阶段。
言归正传,刚刚经过前面的需求分析,我们十分明确的一点是“需要一个实时数仓平台”。但是如何根据我们目前的需求在投入最少的情况下,获得最多的收入呢?为了达到上面的目标,我们对市场上的实时数仓方案进行了分析,最终确定基于StartRocks或者ClickHouse构建实时数仓。来一起看看具体的实现方式:
将数据源上的实时数据直接写入消费服务。
对于数据源为离线文件的情况,有两种处理方式,一种是将文件转为流式数据写入Kafka,另外一种情况是直接将文件通过SQL导入ClickHouse集群。
ClickHouse接入Kafka消息,并将数据写入对应的原始表,基于原始表可以构建物化视图、Project等实现数据聚合和统计分析。
应用服务基于ClickHouse数据对外提供BI、统计报表、告警规则等服务。
图4
解决问题的核心关键点
在StarRocks、ClickHouse这两种方案中,我们经过性能和功能上的比较,最终选择了ClickHouse。这里可能有人会想了,那为什么选择ClickHouse呢?主要原因有这几点。
通过具体的测试发现ClickHouse在50TB约140亿数据上,整体的查询稳定性和性能优于StarRocks。
ClickHouse支持源数据载入,具体包括Kafka、S3、HTTP、JDBC等。
ClickHouse支持联邦查询,可以轻松将MySQL或者MongoDB的数据和ClickHouse的数据进行关联查询。
基于project支持数据的实时统计。
基于物化视图支持基于原始的统计数据存储。
支持数据权限。
图5
介绍完了我们选择ClickHouse的原因,下面进入到本文的重头戏,如何通过ClickHouse实现我们最初在构建实时数仓时提出的要求。
(一)如何保障各种来源的数据能实时地入仓
一般我们将多源数据分为离线数据、实时数据和其他周边库数据。离线数据主要指文件数据,实时数据主要是Kafka消息队列中的数据、周边库数据指的的类似MySQL、MongoDB等数据。接下来从实战角度,分别说下如何接入各种数据源。
(1)MySQL的数据接入,可以通过使用JDBC方式实现。
CREATE TABLE user_table
(
`id` Int32,
`user_name` String,
`height` Float32,
`password` Nullable(String)
)
ENGINE JDBC('jdbc:mysql://localhost:3306/?user=root&password=root', 'test', 'test')
code1
通过前面代码将MySQL表接入到ClickHouse后,就可以直接在ClickHouse中直接查询MySQL表中的数据。
SELECT * FROM user_table
code2
同样也可以向MySQL中插入数据。
INSERT INTO user_table(`id`, `user_name`) VALUES(1,'alex')
code3
(2)下面咱们看下MongoDB的数据接入:
CREATE TABLE [IF NOT EXISTS] testdb.test_collection
(
id UInt64,
name String,
) ENGINE = MongoDB(127.0.0.1:27017, testdb, test_collection, 'your_user_name', 'your_password');
code4
表建立好后便可以执行查询语句。
SELECT COUNT() FROM test_collection;
code5
(3)再看下S3的数据接入:
CREATE TABLE s3_table (name String, value UInt32)
ENGINE=S3('https://test-datasets.s3.amazonaws.com/my-test-bucket/test-data.csv.gz', 'CSV', 'gzip')
SETTINGS input_format_with_names_use_header = 0;
code6
S3 表数据建立好后,我们向其中插入数据。
INSERT INTO s3_table VALUES ('a', 1), ('b', 2), ('c', 3);
code7
同样可以像使用关系型数据库一样查询S3数据表。
SELECT * FROM s3_table LIMIT 2;
code8
(4) Kafka数据接入:ClickHouse提供了Kafka数据接入引擎,可以方便地实现将Kafka的数据接入到ClickHouse中,并将其插入到ClickHouse表中。
CREATE TABLE kafka_source_test (
level String,
type String,
name String,
time DateTime64
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'test_topic',
kafka_group_name = 'test_group',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
code9
SELECT * FROM kafka_source_test LIMIT 5
code10
通过前面的介绍,你可以看到ClickHouse接入数据源还是比较简单的,其中Kafka是构建实时数仓最常用的数据源。
(二)数据入仓能及时被分析
这里的数据分析分为两种情况,一种情况是有明确需求的报表分析,这种情况下可以对实时数据边入库边分析,具体可以通过建立物化视图的方式实现,也可以通过ClickHouse的projection实现。另外一种情况是随机性的数据分析需求,像这种需求ClickHouse基于索引可以做到TB级别数据秒级计算返回。具体的代码实现后面将会介绍。
(三)基于实时数仓能方便的根据业务需求建立主题数据
我们构建实时数仓的很大一个初衷是:当客户有基于主题的报表需求时,我们通过一个配置或者一条SQL语句,就能实现业务的统计报表。那么具体在ClickHouse中如何实现呢?
其中物化视图和projection,是ClickHouse实现灵活的按照业务构建主题数据的具体解决方案,下面咱们分别对这两个关键知识点进行介绍。
其他更多ClickHouse的原理和使用,需要同学们在具体使用中查阅官方文档(https://clickhouse.com/docs/en/)。
1.物化视图
虽然ClickHouse的性能在TB级别的数据的查询上,已经能达到秒级返回了,近乎完美。但是在超大规模和特别复杂的计算场景下计算耗时,还是面对一些挑战,这时我们就需要通过一个类似触发器的机制,将原始数据进行实时汇总,这样我们统计分析时直接查询汇总后的表就可以,如此一来,无论对于服务器负载还是业务快速分析的需求都是友好的。那么如何对ClickHouse数据进行实时分析汇总呢?答案就是物化视图。
这里得先看下物化视图的原理。
物化视图的构建过程中包括一个原表,一个物化视图和一个目标表。物化视图定义了数据物化的计算逻辑,当我们向原表插入一条数据时,会触发物化计算,计算完成后将计算结果实时写入目标表。用户查询的时候查询目标表即可完成数据的分析。
图6
在实际应用中我们对于报表需求,我们通过物化视图来实现,例如对于kafka_source_test中的数据,我们需要基于type字段按照“天”进行统计。
首先我们需要建立一个物化视图的基础表。
CREATE TABLE statistics_type_day
(
type String,
day Date,
type_count UInt32
)
ENGINE = SummingMergeTree()
ORDER BY (type, day)
code11
上述建表语句中使用了SummingMergeTree引擎,该引擎用于统计数据的存储,它能在合并分区的时候,按照预定义条件对数据进行聚合,将相同分组的多行数据统一到一行,从而显著减少存储空间,加快查询的效率。
接下来我们创建一个物化视图,将kafka_source_test表中的数据实时进行统计分析并写入statistics_type_day表中。
CREATE MATERIALIZED VIEW if not exists statistics_type_day_mv
TO statistics_type_day
AS
SELECT type, day, count(1) type_count
from kafka_source_test
group by type, day
code12
这样当kafka_source_test中有数据的时候,便能实时统计并将统计结果物化下来。查询的时候直接查询物化表statistics_type_day就行。
select * from statistics_type_day where day='2022-03-20'
code13
2.projection:数据投影
ClickHouse的projection本质上实现的是数据聚合,小伙伴可能要问了,既然ClickHouse已经有物化视图了,为何还需要projection呢?
试想这样一个场景,我们有一个150个字段的表,基于该表我们针对不同业务构建了20个物化视图。那我们业务在分析的时候如果查询基础数据就到基础表查询,如果查询的是统计数据则需要在对应的物化视图的统计表中查询。
但是在实际的使用过程中,常常因为构建视图和使用视图的人不在一个团队,一种情况是使用者不知道有哪些视图可以使用,所以在写SQL的是时候就直接查基表了,另外一种情况是业务需要针对基础表的SQL和统计表的SQL维护多个业务SQL,在查询上不统一。
图7
那么如何构建一个表,在该表上既能完成基础数据的查询,也能进行统计数据的查询,如果能实现的话,之前的1+20个表SQL的维护就变成了1张表SQL的维护。使用起来将大大简化应用层开发,应用层也不用关注哪些是统计数据,哪些是基础数据。
ClickHouse的projection给了这个问题一个很好的解决方案。
projection在数据分析功能上类似于物化视图,我们可以在projection中预定义表达式,当数据写入的时候,会将原始数据和针对projection中表达式计算后的聚合数据一起写入存储。
在查询的时候会经过一个智能路由的过程,智能路由通过分析SQL语句,如果发现SQL语句查询的数据都在聚合数据中,则直接查询聚合数据并将结果返回给用户,这块便大大减少了服务器资源的开销,只有到查询的数据在各个projection都不存在的时候才会去查基表进行统计。也可以理解为ClickHouse基于projection自动完成了查询优化。(具体参见下图)
图8
了解了projection原理后,接下来我们看下如何使用projection,首先需要定义projection。
CREATE TABLE test_projection_table
(
level String,
type String,
name String,
city String,
time DateTime64,
PROJECTION projection_1
(
SELECT
name,
count(1)
GROUP BY level
) ,
PROJECTION projection_2
(
SELECT
name,
count(1)
GROUP BY type
)
)
ENGINE = MergeTree()
ORDER BY (name , level, type)
code14
上述代码在定义表的时候定义了projection_1和projection_2,其中projection_1在level维度对数据进行汇总,projection_2在type维度对数据进行汇总。在创建好projection后还可以对projection进行修改。
ALTER TABLE test_projection_table
ADD PROJECTION projection_3
(
SELECT
count(1),
name
GROUP BY type, level
)
code15
那么查询的时候,如何才能命中projection呢?得满足这样几个条件:
select 表达式必须为 projection 定义中 select 表达式的子集。
group by 列必须为 projection 定义中 group by 列的子集。
where 条件 必须为 projeciton 定义中的 group by 列 的子集。
例如这里SQL查询的时候会走表达式:
select name,count() from test_projection_table where type='A' group by type
code16
而SQL不会走projection,因为city不在projection的定义中。
select name,city,count() from test_projection_table where type='A' group by city
code17
具体可以通过explain 查看执行计划,如果出现ReadFromStorage (MergeTree(with projection)) ,表示命中 projection。
(四)实时数仓需要提供元数据变更的自动感知能力。
对于元数据的变更ClickHouse无法做到自动感知,我们构建了一个元数据管理平台,实时监ClickHouse Insert语句,当发现有新的Schema变更后以告警的形式通知研发人员,有研发人员修改表结构以兼容元数据的变更。
(五)实时数仓难免需要要外部数据交互,因此实时数仓需要具备联邦查询的能力。
对于联邦查询,我们将外部库表数据接入ClickHouse并和ClickHouse进行联邦查询。
(六)对外提供一个统一的数据分析服务,用户不用关心数仓底层数据复杂的关系关系。
在查询上我们利用projection特性将查询统一到一个表上,除非有特别特殊的需求会走物化视图。
(七)需要确保实时数仓中存储的数据量在合理的访问内,以确保不会产生过多的服务器资源账单。
在控制数据的存储量上我们通过ClickHouse的冷热分离思想,将热数据存储在ClickHouse上,将冷数据存储在S3上,以控制服务器资源。
图9
结束语
本文我介绍了如何基于ClickHouse的百亿级广告平台实时数仓构建实战。在数仓的构建前期我们首先基于数据要达成的目标进行了梳理。
图10
我们在StarRocks、ClickHouse选型上,基于性能原因我们选择了ClickHouse。在具体的实现细节上我们没有像传统数仓那样构建完成的ODS、DWD、DWS、ADS层,而是借助ClickHouse的物化视图和projection完成我们构建实时数仓的目标。
其实在该架构中,可以将接入Kafka原表和外部表的数据理解为ODS(贴源数据层),将经过数据清洗转换后的基础表理解为DWD(数据明细层),将基于基础表衍生的物化视图和projection等统计数据理解为DWS(轻度汇总层),基于视图我们还可以接着构建视图,表和projection。
图11
延伸思考
延伸思考
前面给大家介绍了我们的实时数仓的方案,那么实时数仓是如何演化来的呢?下面多说几句,跟大家从宏观上说下大数据的行业发展,以便于能够从一个更高视角理解实时数仓。只有大家了解了大数据的发展历程,才能更好地理解在当下企业建立实时数仓的必要性,以及我们未来实时数仓未来的发展方向。
快速方便地构建基于主题的统计数据,一直是大数据行业的痛点,最初解决该问题的方案是Hadoop和Spark的离线计算,但随着业务的发展,这两种方案在数据实时性上的短板越来越明显。行业急需一个实时数仓方案,但是在实时数仓中如果基于Flink进行实时计算的话,对于业务的频繁变更带来的开发成本又变得不可控。因此大家又把目光投向了MPP架构,这种架构基本上达到了1个SQL就能满足一个业务报表的需求,方便快速,数据又是实时计算的,基本满足了我们的要求。但是其在分布式事务和元数据的自动感知上,还有待完善。
从上面的趋势中也可以看到大数据发展的阶段,第一阶段大家基于大型MySQL的分库分表实现大规模数据的存储和分析,但是随着数据量的增大,这个方案玩不转了。于是出现了Hadoop和Spark来解决海量数据计算问题。等海量数据计算的需求满足后大家又对数据的实时性要求更高了,业务总想看着最新的数据,于是出现了实时数仓。
但是问题到这里并没有完,后续必然在实时数仓的分布式数据一致性,事务的支持,元数据的动态管理和数据治理上,有更多的需求出现。这个也是大数据行业下个阶段要解决的重点问题。
因此大数据不像关系型数据库那样有稳定的解决方案,而是根据行业的变化和用户的需求在不断的迭代和更新。其最终目标是既能像关系数据库那样满足ACID的需求,又具有大规模数据实时计算的能力和灵活的数据分析能力。
最后要说的是ClickHouse之所以性能优异,是因为它采用了向量化计算技术。将多次CPU计算优化为一次CPU计算,从而大大提升了CPU的效率。具体实现技术手段为采用 SIMD (Single Instruction Multiple Data) 技术实现单条指令操作多条数据,在CPU 寄存器层面实现数据的并行操作。
向量化计算可以说是MPP架构的“银弹”,目前ClickHouse和StarRocks都采用了这个技术方案来达到海量数据快速分析的,后续肯定会有更多的数据库方案跟进该技术。限于篇幅问题在这里不能为大家详细介绍向量化计算的魅力,后续如果大家感兴趣我再单独拿出来讲一下。
最后,欢迎你在评论区留言与交流,也可以把这一篇分享给你的朋友,我们下期再会。