什么是RocketMQ Streams?
Aliware
RocketMQ Streams 简介
RocketMQ Streams 的特点
轻量
高性能
维表 JOIN(千万数据量维表支持)
高扩展的能力
提供了丰富的大数据的能力
RocketMQ Streams的使用
Aliware
环境要求
DSL SDK
依赖准备
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-clients</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
代码开发
DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
source.fromFile("~/admin/data/text.txt",false)
.map(message->message + "--")
.toPrint(1)
.start();
丰富的算子
部署执行
mvn -Prelease-all -DskipTests clean install -U
java -jar jarName mainClass &
SQL SDK
依赖准备
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>rsqldb-clients</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
代码开发
CREATE FUNCTION json_concat as 'xxx.xxx.JsonConcat';
CREATE TABLE `table_name` (
`scan_time` VARCHAR,
`file_name` VARCHAR,
`cmdline` VARCHAR,
) WITH (
type='file',
filePath='/tmp/file.txt',
isJsonData='true',
msgIsJsonArray='false'
);
-- 数据标准化
create view data_filter as
select
*
from (
select
scan_time as logtime
, lower(cmdline) as lower_cmdline
, file_name as proc_name
from
table_name
)x
where
(
lower(proc_name) like '%.xxxxxx'
or lower_cmdline like 'xxxxx%'
or lower_cmdline like 'xxxxxxx%'
or lower_cmdline like 'xxxx'
or lower_cmdline like 'xxxxxx'
)
;
CREATE TABLE `output` (
`logtime` VARCHAR
, `lower_cmdline` VARCHAR
, `proc_name` VARCHAR
) WITH (
type = 'print'
);
insert into output
select
*
from
aegis_log_proc_format_raw
;
SQL 扩展
SQL 执行
cd rsqldb/
mvn -Prelease-all -DskipTests clean install -U
cp rsqldb-runner/target/rocketmq-streams-sql-{版本号}-distribution.tar.gz 部署的目录
tar -xvf rocketmq-streams-{版本号}-distribution.tar.gz
cd rocketmq-streams-{版本号
执行 SQL
#指定 sql 的路径,启动实时任务
bin/start-sql.sh sql_file_path
执行多个 SQL
任务停止
# 停止过程不加任何参数,则会将目前所有运行的任务同时停止
bin/stop.sh
# 停止过程添加了任务名称, 则会将目前运行的所有同名的任务都全部停止
bin/stop.sh sqlname
日志查看
架构设计及原理分析
Aliware
RocketMQ Streams设计思路
设计目标
策略(适配场景:大数据量>高过滤/ETL>低窗口计算)
RocketMQ Streams Source 的实现
数据源在分片移除时,发送移除系统消息,让算子完成分片清理工作; 当有新分片时,发送新增分片消息,让算子完成分片初始化。
RocketMQ Streams Sink 的实现
RocketMQ Streams Exactly-ONCE 实现
RocketMQ Streams Window 实现方式
RocketMQ Streams 在安全场景的最佳实践
Aliware
背景
解决办法
RocketMQ Streams 在云安全的应用-流计算
业务结果
RocketMQ Streams 的未来规划
Aliware
打造 RocketMQ 一体化计算能力
Connector 增强
ETL 能力建设
稳定性和易用性打造
开源地址
Aliware
RocketMQ-Streams:
RocketMQ-Streams-SQL: