基于 Flink SQL 开发 Kafka+ES+MySQL 实践(附demo案例)
Editor's Note
上周四在 Flink 中文社区钉钉群中直播分享了《Demo:基于 Flink SQL 构建流式应用》,直播内容偏向实战演示。这篇文章是对直播内容的一个总结,并且改善了部分内容。
The following article is from Flink 中文社区 Author 伍翀(云邪)
摘要:这篇文章是对我们直播内容的一个总结,并且改善了部分内容,比如除 Flink 外其他组件全部采用 Docker Compose 安装,简化准备流程。读者也可以结合视频和本文一起学习。
完整分享可以观看视频回顾:https://www.bilibili.com/video/av90560012
Flink 1.10.0 于近期刚发布,释放了许多令人激动的新特性。尤其是 Flink SQL 模块,发展速度非常快,因此本文特意从实践的角度出发,带领大家一起探索使用 Flink SQL 如何快速构建流式应用。
准备
使用 Docker Compose 启动容器
mkdir flink-demo; cd flink-demo;
wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml
DataGen:数据生成器。容器启动后会自动开始生成用户行为数据,并发送到 Kafka 集群中。默认每秒生成 1000 条数据,持续生成约 3 小时。也可以更改 docker-compose.yml 中 datagen 的 speedup 参数来调整生成速率(重启 docker compose 才能生效)。
MySQL:集成了 MySQL 5.7 ,以及预先创建好了类目表(category),预先填入了子类目与顶级类目的映射关系,后续作为维表使用。
Kafka:主要用作数据源。DataGen 组件会自动将数据灌入这个容器中。
Zookeeper:Kafka 容器依赖。
Elasticsearch:主要存储 Flink SQL 产出的数据。
Kibana:可视化 Elasticsearch 中的数据。
docker-compose up -d
docker-compose down
下载安装 Flink 本地集群
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.0/flink-sql-connector-elasticsearch7_2.11-1.10.0.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
使用 DDL 创建 Kafka 表
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'
{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
{"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
...
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(), -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址
'format.type' = 'json' -- 数据源格式为 json
);
时间属性:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html
DDL:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
统计每小时的成交量
使用 DDL 创建 Elasticsearch 表
CREATE TABLE buy_cnt_per_hour (
hour_of_day BIGINT,
buy_cnt BIGINT
) WITH (
'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector
'connector.version' = '6', -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本
'connector.hosts' = 'http://localhost:9200', -- elasticsearch 地址
'connector.index' = 'buy_cnt_per_hour', -- elasticsearch 索引名,相当于数据库的表名
'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相当于数据库的库名
'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新
'format.type' = 'json', -- 输出数据格式 json
'update-mode' = 'append'
);
提交 Query
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
使用 Kibana 可视化结果
统计一天每10分钟累计独立用户数
CREATE TABLE cumulative_uv (
time_str STRING,
uv BIGINT
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'cumulative_uv',
'connector.document-type' = 'user_behavior',
'format.type' = 'json',
'update-mode' = 'upsert'
);
CREATE VIEW uv_per_10min AS
SELECT
MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;
顶级类目排行榜
CREATE TABLE category_dim (
sub_category_id BIGINT, -- 子类目
parent_category_id BIGINT -- 顶级类目
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink',
'connector.table' = 'category',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);
CREATE TABLE top_category (
category_name STRING, -- 类目名称
buy_cnt BIGINT -- 销量
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'top_category',
'connector.document-type' = 'user_behavior',
'format.type' = 'json',
'update-mode' = 'upsert'
);
CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior,
CASE C.parent_category_id
WHEN 1 THEN '服饰鞋包'
WHEN 2 THEN '家装家饰'
WHEN 3 THEN '家电'
WHEN 4 THEN '美妆'
WHEN 5 THEN '母婴'
WHEN 6 THEN '3C数码'
WHEN 7 THEN '运动户外'
WHEN 8 THEN '食品'
ELSE '其他'
END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;
INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;
结尾
作者介绍: