其他
深入理解之 Apache Pulsar Connector 与 Partition 关系篇
🎙️阅读本文需 6 分钟
通常一个 Topic 仅被一个 Broker 服务,这限制了 Topic 的最大吞吐量。分区 Topic 是特殊的 Topic 类型,他可以被多个 Broker 处理,这让 Topic 有更高的吞吐量。
其实在背后,分区的 Topic 通过 N 个内部 Topic 实现,N 是分区的数量。当向分区的 Topic 发送消息,每条消息被路由到其中一个 Broker。Pulsar 自动处理跨 Broker 的分区分布。
./bin/pulsar-admin topics create-partitioned-topic test-partition -p 4
./bin/pulsar-client produce test-partition --messages "333" -n 10
if (metadata.partitions > 1) {
producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions,
producerCreatedFuture, schema, interceptors);
} else {
producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors);
}
在 PartitionedProducerImpl 中初始化了路由策略。有三种路由策略,分别是:
RoundRobinPartition:如果没有 key,所有的消息通过 round-robin 方式被路由到不同的分区,以达到最大吞吐量。请注意 round-robin 并不是作用于每条单独的消息,而是作用于一批消息。如果为 message 指定了 key,分区的 Producer 会把 key 做散列,然后分配消息到指定的分区。这是默认的模式。
SinglePartition:如果没有 Key 被提供,Producer 将会随机选择一个分区,把所有的消息发往该分区。如果为 message 指定了 key,分区的 Producer 会把 key 做散列,然后将消息分配到指定分区。
CustomPartition:使用定制化消息路由实现,可以决定特定的消息进入指定的分区。用户可以创建定制化的路由模式,通过使用 Java client,实现 MessageRouter 接口。
this.routerPolicy = getMessageRouter();
start();
for (int partitionIndex = 0; partitionIndex < topicMetadata.numPartitions(); partitionIndex++) {
String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString();
ProducerImpl<T> producer = new ProducerImpl<>(client, partitionName, conf, new CompletableFuture<>(),
partitionIndex, schema, interceptors);
});
}
int partition = routerPolicy.choosePartition(message, topicMetadata);
return producers.get(partition).internalSendAsync(message);
./bin/pulsar-client consume persistent://public/default/test-partition --num-messages 0 --subscription-name test-partition -t Exclusive
./bin/pulsar-client consume persistent://public/default/test-partition --num-messages 0 --subscription-name test-partition -t Failover
./bin/pulsar-client consume persistent://public/default/test-partition --num-messages 0 --subscription-name test-partition -t Shared
if (metadata.partitions > 1) {
consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
} else {
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());
}
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Exclusive, 0, topic, this);
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
// For non partition topics, assume index 0 to pick a predictable consumer
partitionIndex = 0;
}
if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, partitionIndex,
topic, this);
}
Consumer PriorityLevel Permits
C1 0 2
C2 0 1
C3 0 1
C4 1 2
C5 1 1
👆🏻
点击图片可查看活动详情
或者直接点击「阅读原文」进行报名啦!
👇🏻