Pulsar Source 入门篇
阅读本文需要约 5 分钟。
Apache Pulsar 是一个分布式发布订阅的消息系统。Source 是 Pulsar 的一个组件,用来将其他系统的数据输入至 Pulsar。本文介绍 Apache Pulsar Source 的基础知识,例如,Source 的常用命令、环境搭建以及使用示例。
Source 常用命令
常用参数
archive: 指定 source 的 NAR 包
classname: 指定 source 的类名称
destination-topic-name: 指定目标 topic 名称
name: 指定 source 的名称
namespace: 指定 source 的命名空间
parallelism: 指定 source 的并发数
source-config-file: 指定 source 使用的配置文件
tenant: 指定 source 所属的租户
常用参数
archive: 指定 source 的 NAR 包
classname: 指定 source 的类名称
destination-topic-name: 指定目标 topic 名称
name: 指定 source 的名称
namespace: 指定 source 的命名空间
parallelism: 指定 source 的并发数
source-config-file: 指定 source 使用的配置文件
tenant: 指定 source 所属的租户
常用参数
name: 指定 source 的名称
namespace: 指定 source 的命名空间
tenant: 指定 source 所属的租户
常用参数
name: 指定 source 的名称
namespace: 指定 source 的命名空间
tenant: 指定 source 所属的租户
instance-id: 指定 source 的 instance-id,如果未指定,将启动所有实例
常用参数
name: 指定 source 的名称
namespace: 指定 source 的命名空间
tenant: 指定 source 所属的租户
instance-id: 指定 source 的 instance-id,如果未指定,将停止所有实例
常用参数
name: 指定 source 的名称
namespace: 指定 source 的命名空间
tenant: 指定 source 所属的租户
常用参数
name: 指定 source 的名称
namespace: 指定 source 的命名空间
tenant: 指定 source 所属的租户
instance-id: 指定 source 的 instance-id,如果未指定,将获取所有实例状态
常用参数
namespace: 指定 source 的命名空间
tenant: 指定 source 所属的租户
常用参数
name: 指定 source 的名称
namespace: 指定 source 的命名空间
tenant: 指定 source 所属的租户
instance-id: 指定 source 的 instance-id,如果未指定,将重启所有实例
常用参数
archive: 指定 source 的 NAR 包
classname: 指定 source 的类名称
destination-topic-name: 指定目标 topic 名称
name: 指定 source 的名称
namespace: 指定 source 的命名空间
parallelism: 指定 source 的并发数
source-config-file: 指定 source 使用的配置文件
tenant: 指定 source 所属的租户
环境搭建
本示例以 Kafka source 为例,实践这些命令。
1. 下载所需文件。
wget http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar
wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar
2. 创建网络。
实践发现,使用下述方式才能成功;使用 --link 的方式指定网络,会出现问题。
docker network create kafka-pulsar
3. 拉取 ZooKeeper 镜像并启动 ZooKeeper 服务。
docker pull wurstmeister/zookeeper
docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper
4. 拉取 Kafka 镜像并启动 Kafka 服务。
docker pull wurstmeister/kafka:2.11-1.0.2
docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2
5. 拉取 Pulsar 镜像并启动 Pulsar standalone 服务。
docker pull apachepulsar/pulsar:2.4.0
docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
6. 创建 source 配置文件 kafkaSourceConfig.yaml。
configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"
7. 创建生产者文件 kafka-producer.py。
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092')
future = producer.send('my-topic', b'hello world')
future.get()
8. 创建消费者文件 pulsar-client.py。
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', subscription_name='my-aa')
while True:
msg = consumer.receive()
print msg
print dir(msg)
print("Received message: '%s'" % msg.data())
consumer.acknowledge(msg)
client.close()
9. 复制以下文件到 pulsar-kafka-standalone。
docker cp pulsar-io-kafka-2.4.0.nar pulsar-kafka-standalone:/pulsar
docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
docker cp kafka-clients-0.10.2.1.jar pulsar-kafka-standalone:/pulsar/lib
docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
10. 开启新窗口,使用 localrun 运行 source。
docker exec -it pulsar-kafka-standalone /bin/bash
./bin/pulsar-admin source localrun --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1
11. 开启新窗口,运行消费者。
docker exec -it pulsar-kafka-standalone /bin/bash
python pulsar-client.py
12. 开启新窗口,运行生产者。
docker exec -it pulsar-kafka-standalone /bin/bash
pip install kafka-python
python3 kafka-producer.py
13. 验证。
此时消费者窗口显示以下消息,说明环境搭建成功。
Received message: 'hello world'
使用示例
Localrun 该命令已在前文 #10 实现。前文 #9 已向 /pulsar/lib 文件夹中复制了 kafka 的 clients 库,因此,需要首先重启 pulsar-kafka-standalone。
docker restart pulsar-kafka-standalone
在租户 public 和命名空间 default 下,创建名为 kafka 的 source。
./bin/pulsar-admin source create --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1
"Created successfully"
如果命令行窗口显示以上信息,说明创建成功。
显示租户为 public、命名空间为 default 的 source。
./bin/pulsar-admin source list --tenant public --namespace default
[
"kafka"
]
获取名称为 kafka 的 source 的信息。
./bin/pulsar-admin source get --tenant public --namespace default --name kafka
{
"tenant": "public",
"namespace": "default",
"name": "kafka",
"className": "org.apache.pulsar.io.kafka.KafkaBytesSource",
"topicName": "my-topic",
"configs": {
"bootstrapServers": "pulsar-kafka:9092",
"groupId": "test-pulsar-io1",
"topic": "my-topic",
"sessionTimeoutMs": "10000",
"autoCommitEnabled": "false"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE"
}
以上显示了刚才创建的 source 信息,包括租户、 namespace 、 名称、类名称、所在机器等。
获取名称为 kafka 的 source 的运行状态。
./bin/pulsar-admin source status --tenant public --namespace default --name kafka
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 0,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [ ],
"numWritten" : 0,
"lastReceivedTime" : 0,
"workerId" : "c-standalone-fw-7e0cf1b3bf9d-8080"
}
} ]
}
以上展示了 source 的实例信息,包括是否正在运行、实例 id、workId 等。
停止租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source stop --tenant public --namespace default --name kafka --instance-id 0
Stopped successfully
启动租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source start --tenant public --namespace default --name kafka --instance-id 0
Started successfully
重启租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source restart --tenant public --namespace default --name kafka --instance-id 0
Restarted successfully
更新租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source update --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1 --cpu 2
"Updated successfully"
./bin/pulsar-admin source get --tenant public --namespace default --name kafka
{
"tenant": "public",
"namespace": "default",
"name": "kafka",
"className": "org.apache.pulsar.io.kafka.KafkaBytesSource",
"topicName": "my-topic",
"configs": {
"bootstrapServers": "pulsar-kafka:9092",
"groupId": "test-pulsar-io1",
"topic": "my-topic",
"sessionTimeoutMs": "10000",
"autoCommitEnabled": "false"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 2.0,
"ram": 1073741824,
"disk": 10737418240
}
}
以上示例成功更新了CPU。
删除租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source delete --tenant public --namespace default --name kafka
"Delete source successfully"
./bin/pulsar-admin source get --tenant public --namespace default --name kafka
HTTP 404 Not Found
Reason: Source kafka doesn't exist
以上示例成功删除了该 source。
总结
本文以 Kafka source 为例,介绍了 source 的常用命令、环境搭建和使用示例,之后会有更多文章深入介绍 source,敬请期待。
您在使用 Pulsar 吗?
感谢您对 Apache Pulsar 的关注和支持。
为了让 Pulsar 更贴近用户需求,
我们想聆听您的意见。
诚邀您填写 1 分钟 Pulsar 调查问卷。
期待您的反馈,非常感谢。
作者 | tuteng
审校 | Irene + Anonymitaet
编辑 | Irene
更多关于 Pulsar 的技术干货和产品动态,请关注 Apache Pulsar 微信公众号。