查看原文
其他

Pulsar Source 入门篇

Growth ApachePulsar 2021-10-18


阅读本文需要约 5 分钟。


Apache Pulsar 是一个分布式发布订阅的消息系统。Source 是 Pulsar 的一个组件,用来将其他系统的数据输入至 Pulsar。本文介绍 Apache Pulsar Source 的基础知识,例如,Source 的常用命令、环境搭建以及使用示例。


Source 常用命令


Create

创建 source


常用参数

  • archive: 指定 source 的 NAR 包

  • classname: 指定 source 的类名称

  • destination-topic-name: 指定目标 topic 名称

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • parallelism: 指定 source 的并发数

  • source-config-file: 指定 source 使用的配置文件

  • tenant: 指定 source 所属的租户


Update

更新 source


常用参数

  • archive: 指定 source 的 NAR 包

  • classname: 指定 source 的类名称

  • destination-topic-name: 指定目标 topic 名称

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • parallelism: 指定 source 的并发数

  • source-config-file: 指定 source 使用的配置文件

  • tenant: 指定 source 所属的租户


Delete

删除 source


常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户


Start

启动 source


常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户

  • instance-id: 指定 source 的 instance-id,如果未指定,将启动所有实例


Stop

停止 source


常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户

  • instance-id: 指定 source 的 instance-id,如果未指定,将停止所有实例


Get

获取 source 信息


常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户


Status

检查 source 状态


常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户

  • instance-id: 指定 source 的 instance-id,如果未指定,将获取所有实例状态


List

列出所有 source 信息


常用参数

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户


Restart

重启 source


常用参数

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • tenant: 指定 source 所属的租户

  • instance-id: 指定 source 的 instance-id,如果未指定,将重启所有实例


Localrun

在本地运行 source,方便调试


常用参数

  • archive: 指定 source 的 NAR 包

  • classname: 指定 source 的类名称

  • destination-topic-name: 指定目标 topic 名称

  • name: 指定 source 的名称

  • namespace: 指定 source 的命名空间

  • parallelism: 指定 source 的并发数

  • source-config-file: 指定 source 使用的配置文件

  • tenant: 指定 source 所属的租户


环境搭建


本示例以 Kafka source 为例,实践这些命令。


1. 下载所需文件。


wget http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar
wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar


2. 创建网络。

实践发现,使用下述方式才能成功;使用 --link 的方式指定网络,会出现问题。


docker network create kafka-pulsar


3. 拉取 ZooKeeper 镜像并启动 ZooKeeper 服务。


docker pull wurstmeister/zookeeper
docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper


4. 拉取 Kafka 镜像并启动 Kafka 服务。


docker pull wurstmeister/kafka:2.11-1.0.2
docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2


5. 拉取 Pulsar 镜像并启动 Pulsar standalone 服务。


docker pull apachepulsar/pulsar:2.4.0
docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone


6. 创建 source 配置文件 kafkaSourceConfig.yaml。


configs:
   bootstrapServers: "pulsar-kafka:9092"
   groupId: "test-pulsar-io"
   topic: "my-topic"
   sessionTimeoutMs: "10000"
   autoCommitEnabled: "false"


7. 创建生产者文件 kafka-producer.py。


from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092')
future = producer.send('my-topic', b'hello world')
future.get()


8. 创建消费者文件 pulsar-client.py。


import pulsar

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', subscription_name='my-aa')

while True:
   msg = consumer.receive()
   print msg
   print dir(msg)
   print("Received message: '%s'" % msg.data())
   consumer.acknowledge(msg)

client.close()


9. 复制以下文件到 pulsar-kafka-standalone。


docker cp pulsar-io-kafka-2.4.0.nar pulsar-kafka-standalone:/pulsar
docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
docker cp kafka-clients-0.10.2.1.jar pulsar-kafka-standalone:/pulsar/lib
docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/


10. 开启新窗口,使用 localrun 运行 source。


docker exec -it pulsar-kafka-standalone /bin/bash
./bin/pulsar-admin source localrun --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1


11. 开启新窗口,运行消费者。


docker exec -it pulsar-kafka-standalone /bin/bash
python pulsar-client.py


12. 开启新窗口,运行生产者。


docker exec -it pulsar-kafka-standalone /bin/bash
pip install kafka-python
python3 kafka-producer.py


13. 验证。


此时消费者窗口显示以下消息,说明环境搭建成功。


Received message: 'hello world'


使用示例


Localrun 该命令已在前文 #10 实现。前文 #9 已向 /pulsar/lib 文件夹中复制了 kafka 的 clients 库,因此,需要首先重启 pulsar-kafka-standalone。


docker restart pulsar-kafka-standalone


Create

创建 source


在租户 public 和命名空间 default 下,创建名为 kafka 的 source。


./bin/pulsar-admin source create --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1
"Created successfully"


如果命令行窗口显示以上信息,说明创建成功。


List

列出所有 source 信息


显示租户为 public、命名空间为 default 的 source。


./bin/pulsar-admin source list --tenant public --namespace default
[
  "kafka"
]


Get

获取 source 信息


获取名称为 kafka 的 source 的信息。


./bin/pulsar-admin source get --tenant public --namespace default --name kafka
{
  "tenant": "public",
  "namespace": "default",
  "name": "kafka",
  "className": "org.apache.pulsar.io.kafka.KafkaBytesSource",
  "topicName": "my-topic",
  "configs": {
    "bootstrapServers": "pulsar-kafka:9092",
    "groupId": "test-pulsar-io1",
    "topic": "my-topic",
    "sessionTimeoutMs": "10000",
    "autoCommitEnabled": "false"
  },
  "parallelism": 1,
  "processingGuarantees": "ATLEAST_ONCE"
}


以上显示了刚才创建的 source 信息,包括租户、 namespace 、 名称、类名称、所在机器等。


Status

检查 source 状态


获取名称为 kafka 的 source 的运行状态。


./bin/pulsar-admin source status --tenant public --namespace default --name kafka
{
  "numInstances" : 1,
  "numRunning" : 1,
  "instances" : [ {
    "instanceId" : 0,
    "status" : {
      "running" : true,
      "error" : "",
      "numRestarts" : 0,
      "numReceivedFromSource" : 0,
      "numSystemExceptions" : 0,
      "latestSystemExceptions" : [ ],
      "numSourceExceptions" : 0,
      "latestSourceExceptions" : [ ],
      "numWritten" : 0,
      "lastReceivedTime" : 0,
      "workerId" : "c-standalone-fw-7e0cf1b3bf9d-8080"
    }
  } ]
}


以上展示了 source 的实例信息,包括是否正在运行、实例 id、workId 等。


Stop

停止 source


停止租户 public 命名空间 default 下面名称为 kafka 的 source。


./bin/pulsar-admin source stop --tenant public --namespace default --name kafka --instance-id 0
Stopped successfully


Start

启动 source


启动租户 public 命名空间 default 下面名称为 kafka 的 source。


./bin/pulsar-admin source start --tenant public --namespace default --name kafka --instance-id 0
Started successfully


Restart

重启 source


重启租户 public 命名空间 default 下面名称为 kafka 的 source。


./bin/pulsar-admin source restart --tenant public --namespace default --name kafka --instance-id 0
Restarted successfully


Update

更新 source


更新租户 public 命名空间 default 下面名称为 kafka 的 source。


./bin/pulsar-admin source update --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1 --cpu 2
"Updated successfully"
./bin/pulsar-admin source get --tenant public --namespace default --name kafka
{
  "tenant": "public",
  "namespace": "default",
  "name": "kafka",
  "className": "org.apache.pulsar.io.kafka.KafkaBytesSource",
  "topicName": "my-topic",
  "configs": {
    "bootstrapServers": "pulsar-kafka:9092",
    "groupId": "test-pulsar-io1",
    "topic": "my-topic",
    "sessionTimeoutMs": "10000",
    "autoCommitEnabled": "false"
  },
  "parallelism": 1,
  "processingGuarantees": "ATLEAST_ONCE",
  "resources": {
    "cpu": 2.0,
    "ram": 1073741824,
    "disk": 10737418240
  }
}


以上示例成功更新了CPU。


Delete

删除 source


删除租户 public 命名空间 default 下面名称为 kafka 的 source。


./bin/pulsar-admin source delete --tenant public --namespace default --name kafka
"Delete source successfully"
./bin/pulsar-admin source get --tenant public --namespace default --name kafka
HTTP 404 Not Found

Reason: Source kafka doesn't exist


以上示例成功删除了该 source。


总结


本文以 Kafka source 为例,介绍了 source 的常用命令、环境搭建和使用示例,之后会有更多文章深入介绍 source,敬请期待。



 您在使用 Pulsar 吗?
感谢您对 Apache Pulsar 的关注和支持。
为了让 Pulsar 更贴近用户需求,

我们想聆听您的意见。
诚邀您填写 1 分钟 Pulsar 调查问卷。
期待您的反馈,非常感谢。

作者 | tuteng

审校 | Irene + Anonymitaet

编辑 | Irene



更多关于 Pulsar 的技术干货和产品动态,请关注 Apache Pulsar 微信公众号。


: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存