其他
Pulsar Functions Worker 的选举机制
🎙️阅读本文需要约 6 分钟
摘要
Function Worker 架构
Worker 的选举机制
蓝色背景的是 Worker 椭圆浅绿色的是订阅,也是一个 Consumer 方形绿色背景的是 Broker 方形红色背景的是 Topic
图中启动了三个 Worker,名称分别是 Worker Service1,Worker Service2,Worker Service3。这些 Worker 可以在同一台机器上,也可以在不同的机器。 启动时,每个 Worker 内部都会启动一个 Consumer,该 Consumer 用来进行选举,每个 Worker 的 id,主机名以及端口号会和该 Consumer 绑定。 该 Consumer 基于 Failover 模式启动(关于 Consumer 的 Failover 订阅模式可以参考这里 http://pulsar.apache.org/docs/en/concepts-messaging/#failover。)连接到同一个 Topic,具有相同订阅名称的 Consumer 中同一时刻只有一个处于活跃状态。 使用该 Consumer 的 Worker 就是 Leader,它负责进行调度并处理一些其他操作。
Worker Service3 为另一个集群,连接到 Topic2,订阅名称为 sub3,因为只有一个 Worker,所以它本身就是 Leader。
测试选举过程
启动单机 Pulsar 服务
docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone-function-leader apachepulsar/pulsar:2.3.1 bin/pulsar standalone
1docker logs -f
211:17:17.363 [pulsar-web-55-4] INFO org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
311:17:17.369 [pulsar-web-55-4] INFO org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Created namespace public/default
411:17:17.370 [pulsar-web-55-4] INFO org.eclipse.jetty.server.RequestLog - 172.17.0.2 - - [18/May/2019:11:17:17 +0000] "PUT /admin/v2/namespaces/public/default HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.3.1" 10
511:17:17.377 [pulsar-web-55-12] INFO org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
模拟选举过程
如上选举图所示,本次测试会启动三个 Failover 模式的 Consumer,其中两个 Consumer 连接到 Topic1,使用的订阅名称为 sub,它们代表一个 Worker 集群;另一个 Consumer 连接到 Topic2,使用订阅名称 sub3 代表另一个 Worker 集群。
1docker exec -it pulsar-standalone-function-leader /bin/bash
2./bin/pulsar-client consume persistent://public/default/topic1 --num-messages 0 --subscription-name sub -t Failover
—— window2
以 Failover 模式启动一个消费者,使用订阅名称 sub,订阅到 public/default/topic1。1docker exec -it pulsar-standalone-function-leader /bin/bash
2./bin/pulsar-client consume persistent://public/default/topic1 --num-messages 0 --subscription-name sub -t Failover
—— window3
以 Failover 模式启动一个消费者,使用订阅名称 sub3,订阅到 public/default/topic2。1docker exec -it pulsar-standalone-function-leader /bin/bash
2./bin/pulsar-client consume persistent://public/default/topic2 --num-messages 0 --subscription-name sub3 -t Failover
—— window4
通过命令 topics stats 来获取 topic1 的统计信息。 1./bin/pulsar-admin topics stats topic1
2{
3 "msgRateIn" : 0.0,
4 "msgThroughputIn" : 0.0,
5 "msgRateOut" : 0.0,
6 "msgThroughputOut" : 0.0,
7 "averageMsgSize" : 0.0,
8 "storageSize" : 0,
9 "publishers" : [ ],
10 "subscriptions" : {
11 "sub" : {
12 "msgRateOut" : 0.0,
13 "msgThroughputOut" : 0.0,
14 "msgRateRedeliver" : 0.0,
15 "msgBacklog" : 0,
16 "blockedSubscriptionOnUnackedMsgs" : false,
17 "unackedMessages" : 0,
18 "type" : "Failover",
19 "activeConsumerName" : "383dc",
20 "msgRateExpired" : 0.0,
21 "consumers" : [ {
22 "msgRateOut" : 0.0,
23 "msgThroughputOut" : 0.0,
24 "msgRateRedeliver" : 0.0,
25 "consumerName" : "383dc",
26 "availablePermits" : 1000,
27 "unackedMessages" : 0,
28 "blockedConsumerOnUnackedMsgs" : false,
29 "metadata" : { },
30 "address" : "/127.0.0.1:50014",
31 "connectedSince" : "2019-05-18T11:30:34.161Z",
32 "clientVersion" : "2.3.1"
33 }, {
34 "msgRateOut" : 0.0,
35 "msgThroughputOut" : 0.0,
36 "msgRateRedeliver" : 0.0,
37 "consumerName" : "51911",
38 "availablePermits" : 1000,
39 "unackedMessages" : 0,
40 "blockedConsumerOnUnackedMsgs" : false,
41 "metadata" : { },
42 "address" : "/127.0.0.1:50018",
43 "connectedSince" : "2019-05-18T11:30:42.742Z",
44 "clientVersion" : "2.3.1"
45 } ]
46 }
47 },
48 "replication" : { },
49 "deduplicationStatus" : "Disabled"
50}
1./bin/pulsar-admin topics stats topic1
2{
3 "msgRateIn" : 0.0,
4 "msgThroughputIn" : 0.0,
5 "msgRateOut" : 0.0,
6 "msgThroughputOut" : 0.0,
7 "averageMsgSize" : 0.0,
8 "storageSize" : 0,
9 "publishers" : [ ],
10 "subscriptions" : {
11 "sub" : {
12 "msgRateOut" : 0.0,
13 "msgThroughputOut" : 0.0,
14 "msgRateRedeliver" : 0.0,
15 "msgBacklog" : 0,
16 "blockedSubscriptionOnUnackedMsgs" : false,
17 "unackedMessages" : 0,
18 "type" : "Failover",
19 "activeConsumerName" : "51911",
20 "msgRateExpired" : 0.0,
21 "consumers" : [ {
22 "msgRateOut" : 0.0,
23 "msgThroughputOut" : 0.0,
24 "msgRateRedeliver" : 0.0,
25 "consumerName" : "51911",
26 "availablePermits" : 1000,
27 "unackedMessages" : 0,
28 "blockedConsumerOnUnackedMsgs" : false,
29 "metadata" : { },
30 "connectedSince" : "2019-05-18T11:30:42.742Z",
31 "clientVersion" : "2.3.1",
32 "address" : "/127.0.0.1:50018"
33 } ]
34 }
35 },
36 "replication" : { },
37 "deduplicationStatus" : "Disabled"
38}
总结
作者 | tuteng
审校 | Jennifer
编辑 | Irene
📣Join Pulsar 📣