其他
Kafka KSQL实战
点击上方蓝色字体,选择“设为星标”
背景
需求
介绍
架构
部署架构
处理架构
抽象概念
部署
下载
tar zxvf confluent-oss-5.0.0-2.11.tar.gz -C /opt/programs/confluent_5.0.0
启动zk
bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties
启动kafka
bin/kafka-server-start -daemon etc/kafka/server.properties
创建topic和data
[quickstart=<quickstart preset> (case-insensitive; one of 'orders', 'users', or 'pageviews')]
schema=<avro schema file>
[schemaRegistryUrl=<url for Confluent Schema Registry> (defaults to http://localhost:8081)]
format=<message format> (case-insensitive; one of 'avro', 'json', or 'delimited')
topic=<kafka topic name>
key=<name of key column>
[iterations=<number of rows> (defaults to 1,000,000)]
[maxInterval=<Max time in ms between rows> (defaults to 500)]
[propertiesFile=<file specifying Kafka client properties>]
./ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500
8011 --> ([ 1539063767981 | 'User_9' | 'Page_75' ]) ts:1539063767981
8021 --> ([ 1539063768086 | 'User_5' | 'Page_16' ]) ts:1539063768086
1539066430915,User_6,Page_74
1539066431192,User_4,Page_28
1539066431621,User_6,Page_38
1539066431772,User_7,Page_29
1539066432122,User_8,Page_34
./ksql-datagen quickstart=users format=json topic=users maxInterval=100
User_7 --> ([ 1513998830510 | 'User_7' | 'Region_4' | 'MALE' ]) ts:1539063787430
User_6 --> ([ 1514865642822 | 'User_6' | 'Region_2' | 'MALE' ]) ts:1539063787481
{"registertime":1506192314325,"userid":"User_1","regionid":"Region_1","gender":"MALE"}
{"registertime":1489277749526,"userid":"User_6","regionid":"Region_4","gender":"FEMALE"}
{"registertime":1497188917765,"userid":"User_9","regionid":"Region_3","gender":"OTHER"}
{"registertime":1493121964253,"userid":"User_4","regionid":"Region_3","gender":"MALE"}
{"registertime":1515609444511,"userid":"User_5","regionid":"Region_9","gender":"FEMALE"}
启动ksql
bin/ksql-server-start -daemon etc/ksql/ksql-server.properties
连接ksql
bin/ksql http://10.205.151.145:8088
创建stream和table
stream
(kafka_topic='pageviews', value_format='DELIMITED');
table
(kafka_topic='users', value_format='JSON', key = 'userid');
查询数据
ksql> SELECT * FROM pageviews_original LIMIT 3;
持久化查询
创建查询
查询新stream
查询执行任务
消费新数据
./kafka-console-consumer --bootstrap-server 10.205.151.145:9092 --from-beginning --topic
PAGEVIEWS2
终止查询任务
文章不错?点个【在看】吧! 👇