复杂事件处理CEP
用Slipstream构建复杂事件处理应用
Transwarp Slipstream 是TDH中的流计算组件,旨在降低流应用开发的门槛,使得开发者仅通过SQL即可以快速实现业务逻辑。在 用StreamSQL实现事件驱动的实时计算(注:Transwarp Data Hub 4.X系列中的StreamSQL自5.0起已正式更名为Slipstream)中已介绍了如何使用Slipstream进行低延迟的应用计算处理。
但在大数据爆炸时代,企业早已不满足于简单的事件处理模式,而期望能够及时探测到众多事件的变化并联系上下文,了解其潜在影响和风险,快速做出反应。为了满足在流上构建复杂实时处理应用的需求,Slipstream紧跟趋势,推出了复杂事件处理引擎,使得开发者可以基于SQL写出复杂的数据处理规则和逻辑。
CEP :Complex Event Processing,复杂事件处理,是一种从实时事件流中检测符合特定匹配模式的一系列事件并进行处理、分析和挖掘的有效技术手段。
复杂事件的处理分为事件和操作两个部分:
事件是流中的数据,为了高效地筛选出目标事件,通常会在事件上加上过滤条件。以在判断是否发生银行卡盗刷的Case为例,我们可以先过滤出行为为“取款”的事件。
操作即事件的逻辑顺序及其生命周期的控制,例如两次取款操作发生在一个特定的时间间隔内且满足第二次取款操作发生在第一次取款操作之后。
我们将这种通过操作组合在一起的一系列事件称为复杂事件模式。相对于只能处理单一事件的简单事件处理模式,CEP还可以处理由多个事件组成的复合事件,通过分析多个事件间的关系和模式对比从繁杂的事件流中找出“有意义”的信息(例如潜在的威胁或行为预测)。
启用CEP
使用Slipstream的CEP功能需在Transwarp Manager上设置服务的ngmr.engine.mode参数为morphling,使引擎运行在事件驱动模式下。
保存更改后,为使配置生效,还需要执行“配置服务”和“重启Slipstream”两步操作。
CEP基本语法
CEP的基本语法如下所示:
SELECT EVENT1.[column], ① EVNET2.[column], ......, EVNETn.[column] FROM PATTERN ② ( EVENT1=[stream][condition] [FOLLOWEDBY | ,] ③ EVENT2= [stream][condition] [FOLLOWEDBY | ,] .... EVENTn= [stream][condition] ) ④ WITHIN (time interval); ⑤ |
① EVENT: 模式中定义的事件名,可以任意定义。
② PATTERN: 指定模式的关键字。
③ 表示事件之间的关系,FOLLOWEDBY表示只要事件B发生在事件A之后,那么事件B也应该参与计算;如果此处是“,”表示事件B必须是事件A之后发生的第一个事件。
④ condition:即该事件的发生条件,即SQL中的条件表达式。
⑤ WITHIN: 指定该次复杂事件处理的时间区间。
应用实例
接下来,我们通过两个实际应用场景中的Case说明如何在Slipstream中进行复杂事件处理。
场景1:银行检测盗刷行为
银行需要在10分钟之内检测出当前某笔取款交易是否存在盗刷银行卡的行为。
1. 创建输入流
CREATE APPLICATION cep_example; USE APPLICATION cep_example; SET streamsql.use.eventmode=true; CREATE STREAM transaction( location_id STRING, card_id STRING, behavior STRING ) tblproperties( "topic"="transaction_t1", "kafka.zookeeper"="localhost:2188", "kafka.broker.list"="localhost:9098" ); CREATE TABLE exception_ret( location_id_1 STRING, location_id_2 STRING, behavior STRING, card_id STRING ); |
2.创建规则启动流任务
INSERT INTO exception_ret SELECT e1.location_id, e1.card_id, e1.behavior, e2.location_id, e2.card_id, e2.behavior FROM PATTERN ( -- 同一张卡10分钟内在两个不同地点发生了取款行为,意味着有盗刷可能 e1=transaction[e1.behavior='withdraw'] FOLLOWEDBY e2=transaction[ e2.card_id = e1.card_id AND e2.behavior='withdraw' AND e2.location_id != e1.location_id] ) WITHIN ('10' minute); |
场景2:对交通车流量和套牌车进行预警监测
交警部门需要对车辆交通的以下两种情况进行监测:
Case1: 每分钟统计一次卡口车流量,若10分钟内某卡口的过车流量超过阀值,需要及时预警并反馈现场进行交通疏导。
Case2: 10分钟两个跨地市的行政区域出现同一个车牌,有理由怀疑是套牌车,需要及时预警反馈。
1. 创建输入流
CREATE APPLICATION cep_example; USE APPLICATION cep_example; SET streamsql.use.eventmode=true; CREATE STREAM traffic( veh_id STRING, veh_type STRING, speed FLOAT, location_id STRING) tblproperties( "topic"="traffic_t1", "kafka.zookeeper"="localhost:2188", "kafka.broker.list"="localhost:9098" ); CREATE TABLE traffic_flow_ret (location_id STRING, traffic_flow INT); CREATE TABLE traffic_susp(loc_id1 STRING, loc_id2 STRING, veh_id STRING); |
2. 创建规则启动流任务
CREATE STREAM traffic_flow AS SELECT location_id, count(*) as veh_flow FROM traffic STREAMWINDOW w1 as (length '1' minute slide '1' minute) GROUP BY location_id;
INSERT INTO traffic_flow_ret SELECT e2.location_id, e2.veh_flow FROM PATTERN( -- 10分钟内某同一卡口车流量增幅超过60 e1=traffic_flow[e1.veh_flow > 0], e2=traffic_flow[ e2.veh_flow - e1.veh_flow > 60 AND e2.location_id = e1.location_id] ) WITHIN ('10' minute);
INSERT INTO traffic_susp SELECT e1.location_id, e2.location_id, e2.veh_id FROM PATTERN( -- 车辆类型为A1类,10分钟内不同地区出现同一个车牌号 e1=traffic[e1.veh_type="A1"] FOLLOWEDBY e2=traffic[e2.veh_id = e1.veh_id AND e2.location_id != e1.location_id] ) WITHIN ('10' minute); |
结语
Transwarp Slipstream帮助用户快速实现流上的数据挖掘,全方位覆盖业务需求,不仅支持事件驱动和微批两种模式,还提供了复杂事件处理(CEP)的高级功能。根据不同业务对于应用延迟和吞吐量的需求,用户可以灵活切换模式。而且,得益于CEP,当事件流中符合预设模式的复杂事件组合出现时,企业能够及时做出决策,先发制人。
往期原创文章
混合负载下的资源调度神器--Inceptor Scheduler
你应该知道的工作流调度平台——Transwarp Workflow
OLAP Cube可视化设计工具—Transwarp Rubik
TDH荣获TPC官方测试(TPCx-HS@10TB)最佳性能
星环的划时代版本-Transwarp Data Hub 5.0
大数据开放实验室由星环信息科技(上海)有限公司运营,专门致力于大数据技术的研究和传播。若转载请在文章开头明显注明“文章来源于微信订阅号——大数据开放实验室”,并保留作者和账号介绍。