深入理解 RisingWave 流处理引擎(二):计算模型
|作者:师天硕 RisingWave Labs 内核开发工程师
关于本系列
在《深入理解 RisingWave 流处理引擎》系列,我们希望通过一系列文章自顶向下,由浅入深地分享 RisingWave 中流处理引擎的方方面面,聊聊我们在从零开始设计这个流处理引擎时的设计理念,进一步去深入各个模块的技术原理以及实现细节。
流处理还是一个相对年轻的领域,很多设计和工程上的最佳实践没有形成公论。我们非常希望能和对流处理感兴趣的爱好者,研究者和使用者进行更深入的交流,欢迎大家在评论区和其他渠道进行讨论,共同探讨流处理技术的未来。
那么让我们开始吧!
01
SQL 和关系代数
前一篇中,我们提到 RisingWave 作为流数据库,使用SQL来声明式地定义流处理任务。那让我们先来看看经典 SQL 查询引擎的计算模型是怎样的,再去思考如何设计 SQL 下的流处理计算模型。
SQL 之所以能够成为历久弥新的数据库查询语言,其背后的关系代数(Relational Algebra)起到了很大作用。任何一条 SQL 语句都可以等价地变换为关系代数算子所组成的算子树。例如,下面的 SQL 查询就可以被表示为如图所示的算子树。
CREATE TABLE StoriesVC(story_id int, vote_count int);
CREATE TABLE Stories (id int, title text);
/* Get stories information with more than 1 vote*/
SELECT id, title, vote_count
FROM Stories JOIN StoriesVC
ON StoriesVC.story_id = Stories.id
WHERE vote_count > 1;
关系代数作为一个代数系统,满足封闭性(Closure Property)。即任何一个关系代数算子的输入和输出都一定是有特定结构(Schema)的关系(Relation)。这使得关系算子可以任意的灵活嵌套而不超出关系代数的范畴。查询引擎可以利用关系运算的特性,使用统一的框架进行处理。这种灵活性体现在 SQL 上,就是可以无处不在的子查询。用户可以将自己定义的 SQL 任意串联,例如前面的查询就可以演进到下面的查询。
/* Get stories information with more than 1 vote*/
SELECT id, title, vote_count
FROM Storiess JOIN (
SELECT count(*) as vote_count, story_id FROM Votes;
) StoriesVC
ON StoriesVC.story_id = Stories.id
WHERE vote_count > 1;
通过对 SQL 和关系代数的分析可以得出,要支持使用 SQL 定义流处理的计算模型,需要同时满足以下两个要求。
1.算子需要从传统关系代数算子扩展而来,这样才能将 SQL 需要转换为对应的经典 SQL 算子,进而转换为对应的流算子。2.同样要满足封闭性,构成一个新的代数系统,输入和输出需要能够满足相同的性质。
综合来看,计算模型中对于计算对象的定义是这里的关键问题。
02
基于 TVR 的计算模型
显然,计算对象必须由 relation 扩展而来才可能满足第一个要求。一个尝试解决这个问题的模型是 Time-varying Relations(TVR) 上的 streaming SQL,在《Streaming Systems》的第八章和《One SQL to Rule Them All – an Efficient and Syntactically Idiomatic Approach to Management of Streams and Tables》中都对这种模型进行了详细的说明,有兴趣的读者可以深入的研究。简单地说,TVR 是不断随时间变化的多版本 relation。对于任何的经典 SQL 算子,都可以由输入 TVR 的每一个快照版本计算出输出 TVR 的对应快照版本,进而得到输出的 TVR,这就达成了上面的两个要求。
TVR 是一个偏向理论的模型。如果我们完全贴合他的原始定义构建查询引擎,每一次上游 TVR 发生变更,生成新的快照版本,都需要下游去全量的计算新版本的输出结果。最终我们会获得一个全量计算刷新的物化视图的算法。举例来说,我们构建物化视图 StoriesVC 并在基表 Votes 上进行修改,计算流程如图所示。每次修改会产生一个新的快照,而为了更新物化视图 StoriesVC,需要在快照上重新执行(全量的)计算过程。
CREATE TABLE Votes(user_id int, story_id int);
CREATE MATERIALIZED VIEW StoriesVC AS
SELECT story_id, COUNT(*) AS vcount
FROM Votes GROUP BY story_id
HAVING vcount >= 2;
INSERT INTO Votes VALUES (1,1), (2,1), (3,2);
DELETE FROM Votes WHERE user_id = 1 AND story_id = 1;
INSERT INTO Votes VALUES (4,1), (5,2);
03
基于变更流的计算模型
在每个快照上进行全量计算的成本很高,其中包含大量冗余的计算。RisingWave 和其他流处理系统一样,采用的是增量的计算模型。具体的说,算子的输入和输出都是 relation 的变更。RisingWave 的流计算引擎用一系列的 INSERT 和 DELETE 操作流来表示变更。上面的例子在这种模型下的计算模型如图所示,“+”代表 INSERT,“-”代表 DELETE,基表 Votes 的变更经过一个个算子,计算出目标物化视图 StoriesVC 的变更。
注意到这里 relation 变更可以采用许多不同的表达方式,但这里仍然需要满足封闭性,即算子输入和输出的变更表示必须是相同的。因此 RisingWave 采用了最简单的 INSERT 和 DELETE 两种操作,在此基础上,还提供了更多良好的性质。
04
Stream key
只有 INSERT 和 DELETE 而没有其他保证的变更流对存储引擎是不友好的(试想从一个无序列表里找到某个值),要支持快速的插入和删除,通常需要一个唯一的 key 去进行数据的索引。因此为了保证每个物化视图之上都一定存在一个可以用作主键的key,RisingWave 的优化器会对算子树做自底向上的推导,保证每个算子的输出都一定存在unique key。它也被称作stream key,代表变更流上的操作是针对这个key进行的。举例来说,对于下面的 join 物化视图语句,其输出本身是不存在unique key 的,优化器会将其改写,在 Scan 算子和 Join 算子当中加上两列作为stream key,分别来自两张表的原始主键 uuid
。最后这两列会出现在结果集中,物化视图就可以将他们作为隐藏列用作复合主键。
CREATE TABLE VisitEvent(
story_id int,
user_id int,
time datetime,
uuid varchar primary key);
CREATE TABLE Comments(
story_id int,
user_id int,
created_at datetime,
content varchar,
uuid varchar primary key);
CREATE MATERIALIZED VIEW mv AS SELECT
Comments.story_id as story_id,
Comments.user_id as user_id,
Comments.content as content,
Comments.created_at as comment_time,
VisitEvent.time as visit_time,
FROM Comments JOIN VisitEvent USING(story_id, user_id);
由于 stream key 是由关系代数本身的属性推导出来的,在输入变更流满足其自身 stream key 约束的情况下,输出的变更流也自然的满足 stream key 的约束。这里的约束即同一个 stream key 的 INSERT 和 DELETE 一定交替出现,不可以插入已有的 stream key,也不可以删除不存在的 stream key。
05
其他种类变更流
对流计算有所研究的读者一定知道,不同的场景下的变更流也有其他表示,他们和 RisingWave 的变更流都有些许不同。本节将简要地介绍下 Append-only Stream 和 Upsert Stream,并会描述当外部系统输入这些变更流的时候,RisingWave 是如何将他们转换成内部的变更流表示的。
Append-only stream 是在特殊限制的表上的变更流,它只能表示原表上追加 INSERT 记录的变更,表中已经插入的记录在之后不会被删除、撤回或更新。对于这样的流,RisingWave 使用 snowflake 一类的主键生成算法生成新的一列作为 stream key,之后这个流就成为一个只有 INSERT 操作的 RisingWave 内部变更流。
Upsert stream 同样是定义在带 key 的表上的变更流,包含 UPSERT 和 DELETE 两种操作,其中 UPSERT 操作的行为以来当前表中 key 的状态,如果表中不存在该 key 则插入该记录,否则覆盖更新掉表中对应 key 的记录。UPSERT 操作和 RisingWave 的变更流相比丢失了记录变更之前的状态。不是所有流算子都能高效的支持 UPSERT 操作,例如求和等聚合运算,都要求能够知道记录变更前的旧值才能计算出结果的增量改变,而 upsert stream 丢失了这部分信息。因此 RisingWave 会将外部进入的 upsert stream 转换为内部的变更流。具体地说,RisingWave 会将对应的表物化下来,当遇见 UPSERT 操作的时候查询表中 key 的值,并根据情况填充对应的 DELETE 操作。
06
小结
本文展示了 RisingWave 流处理引擎的计算模型,描述了如何把声明式的 SQL 查询转换成一系列流算子。从 SQL 背后的关系代数说起,扩展出基于 TVR 的流计算模型,进一步给出了 RisingWave 的变更关系流模型。细致描述了 RisingWave 的变更流的操作种类和性质,并和其他系统常见的变更流进行了对比。
深入理解 RisingWave 流处理引擎(一):总览