事件驱动的流处理
事件驱动的流处理,是指StreamSQL引擎逐条读取数据源流入的每条记录,进行必要的业务逻辑加工后,再输出到StreamSQL支持的输出终端。相比于等待消息集合后再打包的微批(mini-batch)处理模式,事件驱动的流处理延迟更低,在对延迟敏感的业务场景中表现更佳。
基于流的SQL引擎:StreamSQL(基础介绍)中已介绍的,Inceptor StreamSQL是用于替代Scala和API 来简化流计算编程的类SQL声明式语言。StreamSQL的计算运行于流计算引擎Transwarp Slipstream之上,该引擎混合了事件驱动和微批处理,因此既可以支持有低延迟需求的任务也可以处理高吞吐任务,能够应对不同类型业务。事件驱动模式是Slipstream的重要特性,本文将针对此模式,介绍如何正确的利用StreamSQL设置事件驱动模式,通过Slipstream进行低延迟的计算处理。
事件驱动的流处理
事件驱动的流处理,是指StreamSQL引擎逐条读取数据源流入的每条记录,进行必要的业务逻辑加工后,再输出到StreamSQL支持的输出终端。相比于等待消息集合后再打包的微批(mini-batch)处理模式,事件驱动的流处理延迟更低,在对延迟敏感的业务场景中表现更佳。
启用事件驱动模式
为了让StreamSQL运行在事件驱动模式下,我们需要在Transwarp Manager上为StreamSQL服务配置相应的参数,设置NGMR_ENGINE_MODE参数为morphling。注意,如果让StreamSQL运行在微批模式,要把NGMR_ENGINE_MODE设置为mapred。
应用实例
用户Emily有三个任务需要以低延迟实现。为了让StreamSQL以事件驱动的模式来处理流数据,她需要在触发StreamJob前配置参数streamsql.use.eventmode=true。StreamSQL中触发StreamJob的详细步骤可以参考基于流的SQL引擎:StreamSQL(基础介绍)。
1. 从流表导数据到普通表
Emily希望从Topic tps1中查询数据,并放在普通表中,在事件驱动模式下实现此业务,通过如下的语句完成:
SET streamsql.use.eventmode=true; CREATE STREAM s1(score INT, name STRING) TBLPROPERTIES("topic"="tps1","kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092"); CREATE TABLE t1(score INT, name STRING); INSERT INTO t1 SELECT * FROM s1; |
任务运行过程中,Emily在任务管理页面上发现,事件驱动模式下提交的StreamJob是一个常驻的Active Job,如下图所示:
2. Window Stream聚合后插入普通表
接着Emily要根据Topic tps1创建一个流,对它按事件时间切分窗口,再以窗口为时间区间做聚合,然后将结果插入普通表。这个业务涉及到窗口的概念,下面先来对窗口做以简单介绍。StreamSQL中流处理的窗口(STREAMWINDOW)分为下面两种。
滑动窗口:滑动窗口需要由两个量来定义,窗口长度(LENGTH)和滑动间隔(SLIDE)。滑动窗口是指按照一定的SLIDE 向未来滑动的长度为 LENGTH 的窗口。例如:如果窗口长度为2s,滑动间隔为1s,那么第一个窗口为[0s,2s),第二个窗口为[1s, 3s),第三个窗口为[2s, 4s),以此类推。
跳动窗口:当窗口间隔和滑动间隔相同,滑动窗口就退化为跳动窗口。换句话说,跳动窗口就是滑动窗口 LENGTH =SLIDE 的特例。例如:INTERVAL 为2s跳动窗口第一个区间为[0s, 2s),第二个区间为[2s, 4s),第三个区间为[4s, 6s),以此类推。
另外,StreamSQL中有两种切分窗口的方式:
系统时间(System Time)切分:以流处理引擎处理的时间为基准切分窗口。
事件时间(Event Time)切分:将数据中的某指定个字段作为时间字段切分窗口。
接下来,Emily将在事件驱动模式下,创建一个按事件时间切分的Window Stream,其中LENGTH为4s,SLIDE为2s,对它做聚合然后将结果插入普通表:
SET streamsql.use.eventmode=true; -- 使用事件时间 SET streamsql.use.eventtime=true; CREATE STREAM s1(score INT, name STRING, ts STRING) TBLPROPERTIES("topic"="tps1","kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092","timefield"="ts","timeformat"="yyyy-MM-dd HH:mm:ss","use.lowlevel.consumer"="true"); -- 创建Window Stream CREATE STREAM s1win AS SELECCT * FROM s1 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND); CREATE TABLE t1(score INT, name STRING); INSERT INTO t1 SELECT SUM(score), name FROM s1win GROUP BY NAME; |
3. 两个Window Stream关联后插入普通表
最后Emily需要在事件驱动模式下创建两个带窗口的流(LENGTH为4s,SLIDE为2s),并对它们在name字段上做关联,然后将结果写入某张普通表。实现的语句是这样的:
SET streamsql.use.eventmode=true; -- 使用事件时间 SET streamsql.use.eventtime=true; CREATE STREAM s1(score INT, name STRING, ts STRING) TBLPROPERTIES("topic"="tps1","kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092","timefield"="ts","timeformat"="yyyy-MM-dd HH:mm:ss","use.lowlevel.consumer"="true"); -- 创建Window Stream s1win CREATE STREAM s1win AS SELECT * FROM s1 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND); CREATE STREAM s2(class INT, name STRING, ts STRING) TBLPROPERTIES(“topic”=”tps2”,"kafka.zookeeper"="tw-node127:2181", “kafka.broker.list”=”tw-node127:9092”,”timefield”=”ts”,”timeformat”=”yyyy-MM-dd HH:mm:ss”,"use.lowlevel.consumer"="true"); -- 创建Window Stream s2win CREATE STREAM s2win AS SELECCT * FROM s2 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND); CREATE TABLE t1(score INT, class INT, name STRING); INSERT INTO t1 SELECT score, class, s1win.name FROM s1win JOIN s2win ON s1win.name=s2win.name; |
结语
由于Slipstream对于事件驱动模式和微批模式的混合支持,用户可以方便的实现低延时和高吞吐的实时计算。在事件驱动的模式下,数据触发的计算任务延迟可以低至5毫秒,这样就可以用来开发对延迟时间敏感度较高的应用,例如在线反欺诈应用。而在微批处理的模式下,Slipstream能够提供极高的吞吐,适合运用在某些对吞吐量要求较高的特殊行业,例如交通的视频检测。而有了StreamSQL对模式切换的语义支持,用户就可以灵活的借助Slipstream的实时计算能力应对不同业务的需求。
往期原创文章
大数据开放实验室由星环信息科技(上海)有限公司运营,专门致力于大数据技术的研究和传播。若转载请在文章开头明显注明“文章来源于微信订阅号——大数据开放实验室”,并保留作者和账号介绍。