查看原文
其他

深入理解之 Apache Pulsar Connector 与 Partition 关系篇

tuteng ApachePulsar 2021-10-18

🎙️阅读本文需 6 分钟


在前几篇文章中,我们已经介绍了 Connector 与 Function 的关系、在 Function Worker 中如何选举等。其中都涉及到了对 Producer 和 Consumer 的应用。

本篇文章我们就来尝试学习一下 pub/sub 模型与 Partition 的关系。



>>> Partition <<<

下面是官方文档对 Partition 的描述:

通常一个 Topic 仅被一个 Broker 服务,这限制了 Topic 的最大吞吐量。分区 Topic 是特殊的 Topic 类型,他可以被多个 Broker 处理,这让 Topic 有更高的吞吐量。

其实在背后,分区的 Topic 通过 N 个内部 Topic 实现,N 是分区的数量。当向分区的 Topic 发送消息,每条消息被路由到其中一个 Broker。Pulsar 自动处理跨 Broker 的分区分布。

可以看到在 Pulsar 中 Partition 也是 Topic,因此可以把 Partition 和 Topic 当成一回事(下文中出现的 Partiton 或者 Topic 可理解为同一个词)上面引用中提到了「Pulsar 自动处理跨 Broker 的分区分布」。

本篇文章就来了解一下是如何自动处理的。主要内容是 Producer 如何发送数据给 Paritition,Consumer 又是如何从多个分区中进行消费的。



>>> 创建分区 Topic <<<

从创建分区 Topic 开始看:

./bin/pulsar-admin topics create-partitioned-topic test-partition -p 4

以上命令会创建一个名称为 test-partition 分区数量为 4 的 Topic。


>> Producer 端

使用 Producer 发送消息:

./bin/pulsar-client produce test-partition --messages "333" -n 10

可以看到与正常的往 Topic 里发送消息是一样的。但是在代码中它们走的逻辑是不同的。

if (metadata.partitions > 1) { producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions, producerCreatedFuture, schema, interceptors); } else { producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors); }

可以看到当 Partition 数量大于 1 的时候,使用了 PartitionedProducerImpl 类来创建 Producer。


在 PartitionedProducerImpl 中初始化了路由策略。有三种路由策略,分别是:


  • RoundRobinPartition:如果没有 key,所有的消息通过 round-robin 方式被路由到不同的分区,以达到最大吞吐量。请注意 round-robin 并不是作用于每条单独的消息,而是作用于一批消息。如果为 message 指定了 key,分区的 Producer 会把 key 做散列,然后分配消息到指定的分区。这是默认的模式。

  • SinglePartition:如果没有 Key 被提供,Producer 将会随机选择一个分区,把所有的消息发往该分区。如果为 message 指定了 key,分区的 Producer 会把 key 做散列,然后将消息分配到指定分区。

  • CustomPartition:使用定制化消息路由实现,可以决定特定的消息进入指定的分区。用户可以创建定制化的路由模式,通过使用 Java client,实现 MessageRouter 接口。


this.routerPolicy = getMessageRouter();start();

在 start 中又有如下的逻辑:

for (int partitionIndex = 0; partitionIndex < topicMetadata.numPartitions(); partitionIndex++) { String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString(); ProducerImpl<T> producer = new ProducerImpl<>(client, partitionName, conf, new CompletableFuture<>(), partitionIndex, schema, interceptors); });}

可以看到对于多分区,会为每一个分区创建一个 Producer,分区名称的格式为 topic-name-partition-index,上面创建的分区会有 test-partition-partition-0,test-partition-partition-1,test-partition-partition-2,test-partition-partition-3 生成。


当 Producer 创建成功之后,就可以调用 send 方法进行数据的发送了,具体要发往哪个分区,继续往下看:


在 send 中会基于上面初始化的路由策略来进行选择具体发送到哪个分区:

int partition = routerPolicy.choosePartition(message, topicMetadata);return producers.get(partition).internalSendAsync(message);


关于每种路由策略,后续会有相关文章进行介绍,本篇着重介绍从发送数据到分区,以及从分区接收数据的整个流程。

这样就调用普通的 Producer 将数据发送到了 Broker。


>> Consumer 端


当 Broker 收到数据后,就会将该数据再发送给相应的 Consumer。


使用下面的命令创建消费者,对于每种模式都会生成 4 个 Consumer,这 4 个 Consumer 都是基于相同的订阅。

./bin/pulsar-client consume persistent://public/default/test-partition --num-messages 0 --subscription-name test-partition -t Exclusive./bin/pulsar-client consume persistent://public/default/test-partition --num-messages 0 --subscription-name test-partition -t Failover./bin/pulsar-client consume persistent://public/default/test-partition --num-messages 0 --subscription-name test-partition -t Shared

在消费者端的代码中可以看到,当分区数大于 1 时,选择了不同的逻辑来初始化 Consumer,实际在 MultiTopicsConsumerImpl 内部也是初始化了多个 Consumer。

if (metadata.partitions > 1) { consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);} else { consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1, consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors, this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());}

>> Broker 端

再回到 Broker 端,看看 Broker 是如何进行数据路由的。

Consumer 有三种订阅模式,分别是 Exclusive,Faliover,Shared。 

在 Exclusive 下,分区参数会一直被初始化为 0,因此会将数据发送给 topicname-partition-0 命名的 Consumer。

dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Exclusive, 0, topic, this);

在 Failover 模式下,会将数据发送给当前活跃的 Consumer :

int partitionIndex = TopicName.getPartitionIndex(topicName);if (partitionIndex < 0) { // For non partition topics, assume index 0 to pick a predictable consumer partitionIndex = 0;}if (dispatcher == null || dispatcher.getType() != SubType.Failover) { dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, partitionIndex, topic, this);}

在 Shared 模式下是最有趣的,现在的场景有多个分区,初始化了多个 Consumer。

Consumer PriorityLevel Permits C1 0 2 C2 0 1 C3 0 1 C4 1 2 C5 1 1

上面有 5 个 Consumer,第一列为 Consumer 名称,第二列是优先级,数字越小优先级越高,第三列是可以接收的数据量。

首先会发送数据给比较高优先级的 Consumer,这里会先发送给 C1、C2、C3,它们三个有相同的优先级,对于有不同优先级的 Consumer,Broker 会先投递数据给高优先级的 Consumer,直到达到 Consumer 的消息数限制,才会再投递消息给下一个优先级的 Consumer。

对于有相同优先级的 Consumer,Broker 默认会轮询进行数据投递,直到达到 Consumer 的消息数限制。

因此上面的数据投递顺序会是:C1、C2、C3、C1、C4、C5、C4。

对于上面初始化的多个 Consumer 因为具有相同的优先级,因此基本上轮询进行数据投递的。



>>> 总结 <<<

本文主要分享了 Producer、Consumer 和 Partition 的关系,Producer 如何往多 Partition 发送数据,Consumer 又是如何从多 Partition 消费数据。

如果你想了解更多关于 Pulsar 的相关内容,记得添加我们的小 bot 哦~



🎉活动推荐


在 Pulsar 如火如荼的发展过程中,Pulsar 积极与周边大数据生态互联,目前已经完成 Pulsar + Flink,Pulsar + Spark 等的集成应用。Apache Pulsar 的关注度与日倍增,正在发展成为最受欢迎的流数据平台。


2020 年 1 月 4 日,Apache Pulsar 将走入校园,与国内顶级开源数据库 Apache IoTDB 进行密切合作。我们希望与更多高校学子进行深入交流,碰撞出不一样的火花,同时也诚挚的邀请所有热爱开源、对 Pulsar/IoTDB 感兴趣的小伙伴参与。


👆🏻

点击图片可查看活动详情

或者直接点击「阅读原文」进行报名啦!

👇🏻


: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存