查看原文
其他

RocketMQ Streams:将轻量级实时计算引擎融合进消息系统

袁小栋、程君杰 阿里巴巴中间件 2022-03-18


随着各行各业移动互联和云计算技术的普及发展,大数据计算已深入人心,最常见的比如 flink、spark 等。这些大数据框架,采用中心化的 Master-Slave 架构,依赖和部署比较重,每个任务也有较大开销,有较大的使用成本。RocketMQ Streams 着重打造轻量计算引擎,除了消息队列,无额外依赖,对过滤场景做了大量优化,性能提升 3-5 倍,资源节省 50%-80%。

RocketMQ Streams 适合大数据量->高过滤->轻窗口计算的场景,核心打造轻资源,高性能优势,在资源敏感场景中有很大优势,最低 1core,1g 可部署,建议的应用场景(安全,风控,边缘计算,消息队列流计算)。

RocketMQ Streams 兼容 Blink(Flink 的阿里内部版本) 的 SQL,UDF/UDTF/UDAF,多数 Blink 任务可以直接迁移成 RocketMQ Streams 任务。将来还会发布和 Flink 的融合版本,RocketMQ Streams 可以直接发布成 Flink 任务,既可以享有 RocketMQ Streams 带来的高性能、轻资源,还可以和现有的 Flink 任务统一运维和管理。

01

什么是RocketMQ Streams?

Aliware

本章节从基础简介、设计思路和特点三方面对 RocketMQ Streams 进行整体介绍。

01

RocketMQ Streams 简介


1)它是一个 Lib 包,启动即运行,和业务直接集成;
2)它具备 SQL 引擎能力,兼容 Blink SQL 语法,兼容 Blink UDF/UDTF/UDAF;
3)它包含 ETL 引擎,可以无编码实现数据的 ETL、过滤和转存;
4)它基于数据开发 SDK,大量实用组件可直接使用,如:Source、sink、script、filter、lease、scheduler、configurable 不局限流的场景。


02

RocketMQ Streams 的特点


RocketMQ streams 基于上述的实现思路,可以看到它有以下几个特点:

  • 轻量

1 核 1g 就可以部署,依赖较轻,在测试场景下用 Jar 包直接写个 main 方法就可以运行,在正式环境下最多依赖消息队列和存储(其中存储是可选的,主要是为了分片切换时的容错)。

  • 高性能

实现高过滤优化器,包括前置指纹过滤,同源规则自动归并,hyperscan 加速,表达式指纹等,比优化前性能提升 3-5 倍,资源节省 50%以上。

  • 维表 JOIN(千万数据量维表支持)

设计高压缩内存存储数据,无 java 头部和对齐的开销,存储接近原始数据大小,纯内存操作,性能最大化,同时对于 Mysql 提供了多线程并发加载,提高加载维表的速度。

  • 高扩展的能力

1)Source 可按需扩展,已实现:RocketMQ,File,Kafka;
2)Sink 可按需扩展,已实现:RocketMQ,File,Kafka,Mysql,ES;
3)可按 Blink 规范扩展 UDF/UDTF/UDAF;
4)提供了更轻的 UDF/UDTF 扩展能力,不需要任何依赖就可以完成函数的扩展。

  • 提供了丰富的大数据的能力

包括精确计算一次灵活的窗口,双流 join,统计,开窗,各种转换过滤,满足大数据开发的各种场景,支持弹性容错的能力。

02

RocketMQ Streams的使用

Aliware

RocketMQ Streams 对外提供两种 SDK,一种是 DSL SDK,一种是 SQL SDK,用户可以按需选择;DSL SDK 支持实时场景 DSL 语义;SQL SDK 兼容 Blink(Flink 的阿里内部版本) SQL 的语法,多数 Blink SQL 可以通过 RocketMQ Streams 运行。

接下来,我们详细的介绍一下这两种 SDK。

01

环境要求


1)JDK 1.8 版本以上;
2)Maven 3.2 版本以上。

02

DSL SDK


利用 DSL SDK 开发实时任务时,需要做如下的一些准备工作:

  • 依赖准备

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams-clients</artifactId> <version>1.0.0-SNAPSHOT</version></dependency>

准备工作完成后,就可以直接开发自己的实时程序。

  • 代码开发

DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
source.fromFile("~/admin/data/text.txt",false) .map(message->message + "--") .toPrint(1) .start();

其中:
1)Namespace 是业务隔离的,相同的业务可以写成相同的Namespace。相同的Namespace 在任务调度里可以跑在进程里,也可以共享一些配置;
2)pipelineName 可以理解成就是 job name ,唯一区分 job;
3)DataStreamSource 主要是创建 Source,然后这个程序运行起来,最终的结果就是在原始的消息里面会加"--",然后把它打印出来。

  • 丰富的算子

RocketMQ streams 提供了丰富的算子, 包括:
1)source 算子:包括 fromFile、fromRocketMQ、fromKafka 以及可以自定义 source 来源的 from 算子;
2)sink 算子: 包括 toFile、toRocketMQ、toKafka、toDB、toPrint、toES 以及可以自定义 sink 的 to 算子;
3)action 算子:包括 Filter、Expression、Script、selectFields、Union、forEach、Split、Select、Join、Window 等多个算子。

  • 部署执行

基于 DSL SDK 完成开发,通过下面命令打成 jar 包,执行 jar,或直接执行任务的 main 方法。
mvn -Prelease-all -DskipTests clean install -Ujava -jar jarName mainClass &

03

 SQL SDK


  • 依赖准备

<dependency> <groupId>com.alibaba</groupId> <artifactId>rsqldb-clients</artifactId> <version>1.0.0-SNAPSHOT</version></dependency>

  • 代码开发

首先开发业务逻辑代码, 可以保存为文件也可以直接使用文本:
CREATE FUNCTION json_concat as 'xxx.xxx.JsonConcat';
CREATE TABLE `table_name` ( `scan_time` VARCHAR, `file_name` VARCHAR, `cmdline` VARCHAR,) WITH ( type='file', filePath='/tmp/file.txt', isJsonData='true', msgIsJsonArray='false');-- 数据标准化create view data_filter asselect *from ( select scan_time as logtime , lower(cmdline) as lower_cmdline , file_name as proc_name from table_name)xwhere ( lower(proc_name) like '%.xxxxxx' or lower_cmdline like 'xxxxx%' or lower_cmdline like 'xxxxxxx%' or lower_cmdline like 'xxxx' or lower_cmdline like 'xxxxxx' );
CREATE TABLE `output` ( `logtime` VARCHAR , `lower_cmdline` VARCHAR , `proc_name` VARCHAR) WITH ( type = 'print');
insert into outputselect *from aegis_log_proc_format_raw;

其中:
1)CREATE FUNCTION:引入外部的函数来支持业务逻辑, 包括 flink 以及系统函数;
2)CREATE Table:创建 source/sink;
3)CREATE VIEW:执行字段转化,拆分,过滤;
4)INSERT INTO:数据写入 sink;
5)函数:内置函数,udf 函数。

  • SQL 扩展

RocketMQ streams 支持三种 SQL 扩展能力,具体实现细节请看:
1)通过 Blink UDF/UDTF/UDAF 扩展 SQL 能力;
2)通过 RocketMQ streams 扩展 SQL 能力,只要实现函数名是 eval 的 java bean 即可;
3)通过现有 java 代码扩展 SQL 能力,create function 函数名就是 java 类的方法名。

  • SQL 执行

你可以从下载最新的 RocketMQ Streams 代码并构建。
cd rsqldb/mvn -Prelease-all -DskipTests clean install -Ucp rsqldb-runner/target/rocketmq-streams-sql-{版本号}-distribution.tar.gz 部署的目录

解压 tar.gz 包, 进入目录结构
tar -xvf rocketmq-streams-{版本号}-distribution.tar.gzcd rocketmq-streams-{版本号

其目录结构如下 
1)bin 指令目录,包括启动和停止指令
2)conf 配置目录,包括日志配置以及应用的相关配置文件
3)jobs 存放 sql,可以两级目录存储
4)ext 存放扩展的 UDF/UDTF/UDAF/Source/Sink
5)lib 依赖包目录
6)log 日志目录

  • 执行 SQL

#指定 sql 的路径,启动实时任务bin/start-sql.sh sql_file_path

  • 执行多个 SQL

如果想批量执行一批 SQL,可以把 SQL 放到 jobs 目录,最多可以有两层,把 sql 放到对应目录中,通过 start 指定子目录或 sql 执行任务。

  • 任务停止

# 停止过程不加任何参数,则会将目前所有运行的任务同时停止bin/stop.sh

# 停止过程添加了任务名称, 则会将目前运行的所有同名的任务都全部停止bin/stop.sh sqlname

  • 日志查看

目前所有的运行日志都会存储在 log/catalina.out 文件中。

03

架构设计及原理分析

Aliware

01

RocketMQ Streams设计思路


在了解完  RocketMQ Streams 的基本简介,接下来,我们看下 RocketMQ Streams 的设计思路,设计思路主要从设计目标和策略两个方面来介绍:

  • 设计目标

1)依赖少,部署简单,1 核 1g 单实例可部署,可随意扩展规模;
2)打造场景优势,重点打造大数据量->高过滤->轻窗口计算的场景,功能覆盖度要全,实现需要的大数据特性:Exactly-ONCE、灵活的窗口(滚动、滑动、会话窗口);
3)要在保持低资源的前提下,对高过滤有性能突破,打造性能优势;
4)兼容 Blink SQL,UDF/UDTF/UDAF,让非技术人员更容易上手。

  • 策略(适配场景:大数据量>高过滤/ETL>低窗口计算)

1)采用 shared-nothing 的分布式架构设计,依赖消息队列做负载均衡和容错机制,单实例可启动,增加实例实现能力扩展,并发能力取决于分片数;
2)利用消息队列的分片做 shuffle,利用消息队列负载均衡实现容错;
3)利用存储实现状态备份,实现 Exactly-ONCE 的语义。用结构化远程存储实现快速启动,不等本地存储恢复。
4)重力打造过滤优化器,通过前置指纹过滤,同源规则自动归并,hyperscan 加速,表达式指纹提高过滤性能


02

RocketMQ Streams Source 的实现


1)Source 要求实现最少消费一次的语义,系统通过 checkpoint 系统消息实现,在提交 offset 前发送 checkpoint 消息,通知所有算子刷新内存。
2)Source 支持分片的自动负载和容错
  • 数据源在分片移除时,发送移除系统消息,让算子完成分片清理工作;
  • 当有新分片时,发送新增分片消息,让算子完成分片初始化。
3)数据源通过 start 方法,启动 consuemr 获取消息;
4)原始消息经过编码,附加头部信息包装成 Message 投递给后续算子。



03


RocketMQ Streams Sink 的实现


1)Sink 是实时性和吞吐的一个结合;
2)实现一个 sink 只要继承 AbstractSink 类实现 batchInsert 方法即可。batchInsert 的含义是一批数据写入存储,需要子类调用存储接口实现,尽量应用存储的批处理接口,提高吞吐;
3)常规的使用方式是写 message->cache->flush->存储的方式,系统会严格保证每次批次写入存储的量不超过 batchsize 的量,如果超过了,会拆分成多批写入;



4)Sink 有一个 cache,数据默认写 cache,批次写入存储,提高吞吐(一个分片一个 cache);
5)可以开启自动刷新,每个分片会有一个线程,定时刷新 cache 数据到存储,提高实时性。实现类:DataSourceAutoFlushTask;
6)通过调用 flush 方法刷新 cache 到存储;
7)Sink 的 cache 会有内存保护,当 cache 的消息条数>batchSize,会强制刷新,释放内存。

04

RocketMQ Streams Exactly-ONCE 实现


1)Source 确保在 commit offset 时,会发送 checkpoint 系统消息,收到消息的组件会完成存盘操作,消息至少消费一次;
2)每条消息会有消息头部,里面封装了 queueld 和 offset;
2)组件在存储数据时,会把 queueld 和处理的最大 offset 存储下来,当有消息重复时,根据 maxoffset 去重;
3)内存保护,一个 checkpoint 周期可能有多次 flush(条数触发),保障内存占用可控。



05

RocketMQ Streams Window 实现方式


1)支持滚动、滑动和会话窗口,支持事件时间和自然时间(消息进入算子的时间);
2)支持 Emit 语法,可以在触发前或触发后,每隔 n 段时间,更新一次数据;比如1小时窗口,窗口触发前希望每分钟看到最新结果,窗口触发后希望不丢失迟到一天内的数据,且每 10 分钟更新数据。
3)支持高性能模式和高可靠模式,高性能模式不依赖远程存储,但在分片切换时,有丢失窗数据的风险;
4)快速启动,无需等待本地存储恢复,在发生错误或分片切换时,异步从远程存储恢复数据,同时直接访问远程存储计算;
5)利用消息队列负载均衡,实现扩容缩容容,每个 queue 是一份组,一个分组同一刻只被一台机器消费;
6)正常计算依赖本地存储,具备 flink 相似的计算性能。


04

RocketMQ Streams 在安全场景的最佳实践

Aliware

01

背景


从公共云转战专有云,遇到了新的问题。因为专有云像大数据这种 SaaS 服务是非必须输出的,且最小输出规模也比较大,用户成本会增加很多,难落地,导致安全能力无法快速同步到专有云。


02

解决办法


  • RocketMQ Streams 在云安全的应用-流计算

1)基于安全场景打造轻量级计算引擎,基于安全高过滤的场景特点,可以针对高过滤场景优化,然后再做较重的统计、窗口、join 操作,因为过滤率比较高,可以用更轻的方案实现统计和 join 操作;
2)SQL 和引擎都可热升级。


  • 业务结果

1)规则覆盖:自建引擎,覆盖 100%规则(正则、join、统计);
2)轻资源,内存是公共云引擎的 1/24,cpu 是 1/6,依赖过滤优化器,资源不随规则线性增加,新增规则无资源压力,通过高压缩表,支持千万情报;
3)SQL 发布,通过 c/s 部署模式,SQL 引擎热发布,尤其护网场景,可快速上线规则;
4)性能优化,对核心组件进行专题性能优化,保持高性能,每实例(2g,4 核,41 规则)5000qps 以上。

05

RocketMQ Streams 的未来规划

Aliware

01

打造 RocketMQ 一体化计算能力


1)和 RocketMQ 整合,去除 DB 依赖,融合 RocketMQ KV;
2)和 RocketMQ 混部,支持本地计算,利用本地特点,打造高性能;
3)打造边缘计算最佳实践

02

Connector 增强


1)支持 pull 消费方式,checkpoint 异步刷新;
2)兼容 blink/flink connector。

03

ETL 能力建设


1)增加文件,syslog 的数据接入能力
2)兼容 Grok 解析,增加常用日志的解析能力;
3)打造日志 ETL 的最佳实践

04

稳定性和易用性打造


1)Window 多场景测试,提升稳定性,性能优化;
2)补充测试用例,文档,应用场景。

06

开源地址

Aliware

  • RocketMQ-Streams:

https://github.com/apache/rocketmq-streams

  • RocketMQ-Streams-SQL:

https://github.com/alibaba/rsqldb

以上是本次对 RocketMQ Stream 的整体介绍,希望对大家有所帮助和启发。

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存