深入理解 RisingWave 流处理引擎(三):触发机制
作者:师天硕,RisingWave Labs 内核开发工程师
在前两篇文章中,我们了解到 RisingWave 流计算引擎是从原关系表中的变更流进行增量计算得到目标关系表的变更流。在这样的模型下,由于输入的变更流是无界的,流计算引擎可以选择截取到任意部分进行处理,输出变更流也可以包含物化的中间结果。例如,对下面的例子。
CREATE TABLE Votes(user_id int, story_id int);CREATE MATERIALIZED VIEW StoriesVC ASSELECT story_id, COUNT(*) AS vcountFROM Votes GROUP BY story_id/* Txn 1 */INSERT INTO Votes VALUES (1,1), (2,1), (3,2);/* Txn 2 */DELETE FROM Votes WHERE user_id = 1 AND story_id = 1;/* Txn 3 */INSERT INTO Votes VALUES (4,1), (5,2);
对于相同的输入流,三种情况输出的输出流不同,但都是正确的,输出流最终物化的结果都是 (1, 2), (2, 2) 两条记录。
1Barrier
上面的例子中,最明显的不同就是将输入流切分成了不同的粒度,一段一段地进行处理。RisingWave 和其他一些流系统一样,在流上引入 Barrier 的概念。DAG 上的 source 会在数据流中插入 Barrier,这些 Barrier 将输入流切分成很多段。
每一个算子收到 Barrier 后会依次进行:
对齐(Align):对于多输入流的算子(Join、Union),需要消费其他流的数据一直等到他们各自的 Barrier,最终收集齐所有输入流的同一 Barrier。 刷写(Flush):算子对目前为止收到的所有输入进行处理,输出结果变更流给下游。 传播(Broadcast):将该 Barrier 发送至下游的算子
上文中的例子用插入 Barrier 的方式如下图所示。
顺便一提,Barrier 在 RisingWave 流处理引擎的容错与恢复中也起到至关重要的作用。从事务的角度说,在这里我们通过 Barrier 约束算子行为,保证了图上各节点在收到 Barrier 时,已经收到的全部变更流都是基于基表的同一个版本,实现了快照隔离(Snapshot Isolation),保证了图上各个节点状态的一致性(Consistency)。而在之后对于 RisingWave 流处理容错与恢复的详解文章中,我们会进一步探讨 RisingWave 是怎样保证原子性(Atomicity)和持久性(Durability)的。
2Barrier 注入策略
Barrier 控制了流处理图上每个算子的计算,让他们处于同一个步调上。在 Source 节点上向数据流注入 Barrier 的策略就非常重要。
直接的思路是定时注入,即每隔一段固定时间间隔(Barrier Interval)就向数据流中注入 barrier。对计算引擎来说,攒批(Batching)可以省略不必要的计算结果,提供更好的性能。但实际应用中,用户对于输出的结果有一定的新鲜度(Freshness)要求,因此计算引擎不能无限的攒批。在这里 Barrier Interval 就控制了算子攒批的上限,Barrier 到来时必须停止攒批,将目前的中间结果输出给下游。由此用户就可以通过简单的配置一个参数来达到性能和新鲜度的权衡。
另一方面,对于带有事务语义的变更流,Source 可以通过控制插入 Barrier 不截断上游事务,来维持上游的事务语义。
3Watermark 与 Trigger
对于何时触发流处理输出的问题,另一个经典思路是 Google 在 2015 年提出的 Dataflow Model。在事件时间(Event Time)时间列上定义时间窗口(Time Window)对数据进行划分,使用水位线(Watermark)来描述事件的完整性,使得计算引擎可以在每个窗口内的事件完整且不会更改后,再使用类似批处理的方式进行处理。这种触发计算的方式也被称为完整性触发。
Watermark(t) 插入在数据流中,表示数据流中 event time 早于 t 的事件都已经到齐,后续不会再出现。在 RisingWave 中,就是不会有变更涉及到 event time 小于 t 的记录。在该模型下,算子的计算是由 watermark 触发的,算子根据 watermark 确定计算结果不会更改的时候输出结果。
例如下图是对于长度为5秒钟的滚动窗口(Tumble Window)计数的一个例子,当聚合算子收到 tm > 12:00:11 的 watermark 的时候,可以知道不会再有 window_start 为 12:00:00 和 12:00:05 两个窗口中的数据,此时算子可以输出这两个窗口的结果。
Watermark 从哪里来呢?实践中,很少有上游系统可以在生产流时提供 watermark 语义保证(Perfect Watermark),通常是在流计算系统中通过某些 timeout 或其他策略指定注入 Watermark。比如在 RisingWave 中可以通过如下 SQL 定义 timeout 为 5s 的 Watermark。更多细节可以参考文档[1]。
在之后的文章中,我们也会对 watermark 的其他用处,如何处理迟到的数据等问题进行专题的讲解。
CREATE SOURCE s( tm TIMESTAMP, WATERMARK FOR tm AS tm - INTERVAL '5' SECOND);
4EMIT ON WINDOW CLOSE
当涉及窗口计算时,我们有两种输出策略:第一种是当 Barrier 到达时,即使窗口没有闭合,也输出部分的计算结果,称为 emit on update;第二种是忽略 Barrier,仅当窗口闭合时一次性输出整体计算结果,称为 emit on window close(EOWC)。前者的实时性和一致性更好,而后者可以保证输出的数据不再更改,省略了中间状态。RisingWave 默认采用第一种策略,而当用户想要第二种策略时,我们提供实验性的 EMIT ON WINDOW CLOSE 语法[2]。它主要用于以下这几种场景:
希望得到一个 append only 的输出流时。这通常是由于一些下游系统,例如 Kafka 或 S3 的要求。 希望通过一定延迟换取更好的性能。EOWC 语义省去了一些算子输出的中间状态,并使下游算子接受 append only 流,在部分 query 中能够优化性能,对窗口聚合、Over Window 窗口函数的查询尤为明显。 在监控告警等场景,有时用户不希望看到中间结果,只期望看到窗口闭合后的聚合结果。
5小结
本文展示了 RisingWave 流处理引擎内触发计算的两种模式。RisingWave 默认的 Barrier 触发计算保证了计算图上节点间的状态一致性,对外表现为 MView 之间的数据一致,同时允许用户通过调整 barrier interval 来在成本和新鲜度之间权衡。从 Dataflow Model 引入的 EOWC 语义借助额外的 watermark 定义,等待事件完整后触发计算,在 append only sink、窗口聚合等场景有着独特的作用。
6参考资料
[1]参考文档: https://www.risingwave.dev/docs/upcoming/watermarks/
[2]EMIT ON WINDOW CLOSE 语法: https://www.risingwave.dev/docs/upcoming/emit-on-window-close/