分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)
前传:分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
本文主要基于 RocketMQ 4.0.x 正式版
1、概述
2、Consumer
3、PushConsumer 一览
4、PushConsumer 订阅
5、PushConsumer 消息队列分配
6、PushConsumer 消费进度读取
7、PushConsumer 拉取消息
8、PushConsumer 消费消息
9、PushConsumer 发回消费失败消息
10、Consumer 消费进度
11、结尾
1、概述
本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。
主要解析 Consumer
在 消费 逻辑涉及到的源码。
2、Consumer
MQ 提供了两类消费者:
PushConsumer:
在大多数场景下使用。
名字虽然是
Push
开头,实际在实现时,使用Pull
方式实现。通过Pull
不断不断不断轮询Broker
获取消息。当不存在新消息时,Broker
会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和Broker
主动Push
做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询(Long-Polling
)。PullConsumer
本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
3、PushConsumer 一览
先看一张 PushConsumer
包含的组件以及组件之间的交互图:
RebalanceService
:均衡消息队列服务,负责分配当前Consumer
可消费的消息队列(MessageQueue
)。当有新的Consumer
的加入或移除,都会重新分配消息队列。PullMessageService
:拉取消息服务,不断不断不断从Broker
拉取消息,并提交消费任务到ConsumeMessageService
。ConsumeMessageService
:消费消息服务,不断不断不断消费消息,并处理消费结果。RemoteBrokerOffsetStore
:Consumer
消费进度管理,负责从Broker
获取消费进度,同步消费进度到Broker
。ProcessQueue
:消息处理队列。MQClientInstance
:封装对Namesrv
,Broker
的 API调用,提供给Producer
、Consumer
使用。
4、PushConsumer 订阅
DefaultMQPushConsumerImpl#subscribe(...)
1: public void subscribe(String topic, String subExpression) throws MQClientException {
2: try {
3: // 创建订阅数据
4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
5: topic, subExpression);
6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
7: // 通过心跳同步Consumer信息到Broker
8: if (this.mQClientFactory != null) {
9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
10: }
11: } catch (Exception e) {
12: throw new MQClientException("subscription exception", e);
13: }
14: }
说明 :订阅
Topic
。第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(...)。
第 7 至 10 行 :通过心跳同步
Consumer
信息到Broker
。
FilterAPI.buildSubscriptionData(...)
1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
2: String subString) throws Exception {
3: SubscriptionData subscriptionData = new SubscriptionData();
4: subscriptionData.setTopic(topic);
5: subscriptionData.setSubString(subString);
6: // 处理订阅表达式
7: if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
8: subscriptionData.setSubString(SubscriptionData.SUB_ALL);
9: } else {
10: String[] tags = subString.split("\\|\\|");
11: if (tags.length > 0) {
12: for (String tag : tags) {
13: if (tag.length() > 0) {
14: String trimString = tag.trim();
15: if (trimString.length() > 0) {
16: subscriptionData.getTagsSet().add(trimString);
17: subscriptionData.getCodeSet().add(trimString.hashCode());
18: }
19: }
20: }
21: } else {
22: throw new Exception("subString split error");
23: }
24: }
25:
26: return subscriptionData;
27: }
说明 :根据
Topic
和 订阅表达式 创建订阅数据subscriptionData.subVersion = System.currentTimeMillis()。
DefaultMQPushConsumer#registerMessageListener(...)
1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
2: this.messageListener = messageListener;
3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
4: }
说明 :注册消息监听器。
5、PushConsumer 消息队列分配
RebalanceService
1: public class RebalanceService extends ServiceThread {
2:
3: /**
4: * 等待间隔,单位:毫秒
5: */
6: private static long waitInterval =
7: Long.parseLong(System.getProperty(
8: "rocketmq.client.rebalance.waitInterval", "20000"));
9:
10: private final Logger log = ClientLogger.getLog();
11: /**
12: * MQClient对象
13: */
14: private final MQClientInstance mqClientFactory;
15:
16: public RebalanceService(MQClientInstance mqClientFactory) {
17: this.mqClientFactory = mqClientFactory;
18: }
19:
20: @Override
21: public void run() {
22: log.info(this.getServiceName() + " service started");
23:
24: while (!this.isStopped()) {
25: this.waitForRunning(waitInterval);
26: this.mqClientFactory.doRebalance();
27: }
28:
29: log.info(this.getServiceName() + " service end");
30: }
31:
32: @Override
33: public String getServiceName() {
34: return RebalanceService.class.getSimpleName();
35: }
36: }
说明 :均衡消息队列服务,负责分配当前
Consumer
可消费的消息队列(MessageQueue
)。第 26 行 :调用
MQClientInstance#doRebalance(...)
分配消息队列。目前有三种情况情况下触发:详细解析见:MQClientInstance#doRebalance(...)。
如
第25行
等待超时,每 20s 调用一次。PushConsumer
启动时,调用rebalanceService#wakeup(...)
触发。Broker
通知Consumer
加入 或 移除时,Consumer
响应通知,调用rebalanceService#wakeup(...)
触发。
MQClientInstance#doRebalance(...)
1: public void doRebalance() {
2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
3: MQConsumerInner impl = entry.getValue();
4: if (impl != null) {
5: try {
6: impl.doRebalance();
7: } catch (Throwable e) {
8: log.error("doRebalance exception", e);
9: }
10: }
11: }
12: }
说明 :遍历当前
Client
包含的consumerTable
(Consumer
集合 ),执行消息队列分配。疑问:目前代码调试下来,
consumerTable
只包含Consumer
自己。😈有大大对这个疑问有解答的,烦请解答下。第 6 行 :调用
MQConsumerInner#doRebalance(...)
进行队列分配。DefaultMQPushConsumerImpl
、DefaultMQPullConsumerImpl
分别对该接口方法进行了实现。DefaultMQPushConsumerImpl#doRebalance(...)
详细解析见:DefaultMQPushConsumerImpl#doRebalance(...)。
DefaultMQPushConsumerImpl#doRebalance(...)
1: public void doRebalance() {
2: if (!this.pause) {
3: this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
4: }
5: }
说明:执行消息队列分配。
第 3 行 :调用
RebalanceImpl#doRebalance(...)
进行队列分配。详细解析见:RebalancePushImpl#doRebalance(...)。
RebalanceImpl#doRebalance(...)
1: /**
2: * 执行分配消息队列
3: *
4: * @param isOrder 是否顺序消息
5: */
6: public void doRebalance(final boolean isOrder) {
7: // 分配每个 topic 的消息队列
8: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
9: if (subTable != null) {
10: for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
11: final String topic = entry.getKey();
12: try {
13: this.rebalanceByTopic(topic, isOrder);
14: } catch (Throwable e) {
15: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
16: log.warn("rebalanceByTopic Exception", e);
17: }
18: }
19: }
20: }
21: // 移除未订阅的topic对应的消息队列
22: this.truncateMessageQueueNotMyTopic();
23: }
24:
25: /**
26: * 移除未订阅的消息队列
27: */
28: private void truncateMessageQueueNotMyTopic() {
29: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
30: for (MessageQueue mq : this.processQueueTable.keySet()) {
31: if (!subTable.containsKey(mq.getTopic())) {
32:
33: ProcessQueue pq = this.processQueueTable.remove(mq);
34: if (pq != null) {
35: pq.setDropped(true);
36: log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
37: }
38: }
39: }
40: }
#doRebalance(...)
说明 :执行分配消息队列。第 7 至 20 行 :循环订阅主题集合(
subscriptionInner
),分配每一个Topic
的消息队列。第 22 行 :移除未订阅的
Topic
的消息队列。#truncateMessageQueueNotMyTopic(...)
说明 :移除未订阅的消息队列。当调用DefaultMQPushConsumer#unsubscribe(topic)
时,只移除订阅主题集合(subscriptionInner
),对应消息队列移除在该方法。
RebalanceImpl#rebalanceByTopic(...)
1: private void rebalanceByTopic(final String topic, final boolean isOrder) {
2: switch (messageModel) {
3: case BROADCASTING: {
4: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
5: if (mqSet != null) {
6: boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
7: if (changed) {
8: this.messageQueueChanged(topic, mqSet, mqSet);
9: log.info("messageQueueChanged {} {} {} {}", //
10: consumerGroup, //
11: topic, //
12: mqSet, //
13: mqSet);
14: }
15: } else {
16: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
17: }
18: break;
19: }
20: case CLUSTERING: {
21: // 获取 topic 对应的 队列 和 consumer信息
22: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
23: List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
24: if (null == mqSet) {
25: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
26: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
27: }
28: }
29:
30: if (null == cidAll) {
31: log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
32: }
33:
34: if (mqSet != null && cidAll != null) {
35: // 排序 消息队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。
36: List<MessageQueue> mqAll = new ArrayList<>();
37: mqAll.addAll(mqSet);
38:
39: Collections.sort(mqAll);
40: Collections.sort(cidAll);
41:
42: AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
43:
44: // 根据 队列分配策略 分配消息队列
45: List<MessageQueue> allocateResult;
46: try {
47: allocateResult = strategy.allocate(//
48: this.consumerGroup, //
49: this.mQClientFactory.getClientId(), //
50: mqAll, //
51: cidAll);
52: } catch (Throwable e) {
53: log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
54: e);
55: return;
56: }
57:
58: Set<MessageQueue> allocateResultSet = new HashSet<>();
59: if (allocateResult != null) {
60: allocateResultSet.addAll(allocateResult);
61: }
62:
63: // 更新消息队列
64: boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
65: if (changed) {
66: log.info(
67: "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
68: strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
69: allocateResultSet.size(), allocateResultSet);
70: this.messageQueueChanged(topic, mqSet, allocateResultSet);
71: }
72: }
73: break;
74: }
75: default:
76: break;
77: }
78: }
79:
80: /**
81: * 当负载均衡时,更新 消息处理队列
82: * - 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
83: * - 增加 不在processQueueTable && 存在于mqSet 里的消息队列
84: *
85: * @param topic Topic
86: * @param mqSet 负载均衡结果后的消息队列数组
87: * @param isOrder 是否顺序
88: * @return 是否变更
89: */
90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
91: boolean changed = false;
92:
93: // 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
94: Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
95: while (it.hasNext()) { // TODO 待读:
96: Entry<MessageQueue, ProcessQueue> next = it.next();
97: MessageQueue mq = next.getKey();
98: ProcessQueue pq = next.getValue();
99:
100: if (mq.getTopic().equals(topic)) {
101: if (!mqSet.contains(mq)) { // 不包含的队列
102: pq.setDropped(true);
103: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
104: it.remove();
105: changed = true;
106: log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
107: }
108: } else if (pq.isPullExpired()) { // 队列拉取超时,进行清理
109: switch (this.consumeType()) {
110: case CONSUME_ACTIVELY:
111: break;
112: case CONSUME_PASSIVELY:
113: pq.setDropped(true);
114: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
115: it.remove();
116: changed = true;
117: log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
118: consumerGroup, mq);
119: }
120: break;
121: default:
122: break;
123: }
124: }
125: }
126: }
127:
128: // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。
129: List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息请求数组
130: for (MessageQueue mq : mqSet) {
131: if (!this.processQueueTable.containsKey(mq)) {
132: if (isOrder && !this.lock(mq)) {
133: log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
134: continue;
135: }
136:
137: this.removeDirtyOffset(mq);
138: ProcessQueue pq = new ProcessQueue();
139: long nextOffset = this.computePullFromWhere(mq);
140: if (nextOffset >= 0) {
141: ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
142: if (pre != null) {
143: log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
144: } else {
145: log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
146: PullRequest pullRequest = new PullRequest();
147: pullRequest.setConsumerGroup(consumerGroup);
148: pullRequest.setNextOffset(nextOffset);
149: pullRequest.setMessageQueue(mq);
150: pullRequest.setProcessQueue(pq);
151: pullRequestList.add(pullRequest);
152: changed = true;
153: }
154: } else {
155: log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
156: }
157: }
158: }
159:
160: // 发起消息拉取请求
161: this.dispatchPullRequest(pullRequestList);
162:
163: return changed;
164: }
#rebalanceByTopic(...)
说明 :分配Topic
的消息队列。第 21 至 40 行 :获取
Topic
对应的消息队列和消费者们,并对其进行排序。因为各Consumer
是在本地分配消息队列,排序后才能保证各Consumer
顺序一致。第 42 至 61 行 :根据 队列分配策略(
AllocateMessageQueueStrategy
) 分配消息队列。详细解析见:AllocateMessageQueueStrategy。第 63 至 72 行 :更新
Topic
对应的消息队列。第 3 至 19 行 :广播模式(
BROADCASTING
) 下,分配Topic
对应的所有消息队列。第 20 至 74 行 :集群模式(
CLUSTERING
) 下,分配Topic
对应的部分消息队列。#updateProcessQueueTableInRebalance(...)
说明 :当分配队列时,更新Topic
对应的消息队列,并返回是否有变更。第 132 至 135 行 :
顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。第 137 行 :移除消息队列的消费进度。
第 139 行 :获取队列消费进度。详细解析见:RebalancePushImpl#computePullFromWhere(...)。
第 140 至 156 行 :添加新消费处理队列,添加消费拉取消息请求。
第 103 行 :移除不需要的消息队列。详细解析见:RebalancePushImpl#removeUnnecessaryMessageQueue(...)。
第 108 至 120 行 :队列拉取超时,即
当前时间-最后一次拉取消息时间>120s
( 120s 可配置),判定发生 BUG,过久未进行消息拉取,移除消息队列。移除后,下面#新增队列逻辑#可以重新加入新的该消息队列。第 93 至 126 行 :移除不存在于分配的消息队列(
mqSet
) 的 消息处理队列(processQueueTable
)。第 128 至 158 行 :增加 分配的消息队列(
mqSet
) 新增的消息队列。第 161 行 :发起新增的消息队列消息拉取请求。详细解析见:RebalancePushImpl#dispatchPullRequest(...)。
RebalanceImpl#removeUnnecessaryMessageQueue(...)
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: // 同步队列的消费进度,并移除之。
3: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
4: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
5: // TODO 顺序消费
6: if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
7: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
8: try {
9: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
10: try {
11: return this.unlockDelay(mq, pq);
12: } finally {
13: pq.getLockConsume().unlock();
14: }
15: } else {
16: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
17: mq, //
18: pq.getTryUnlockTimes());
19:
20: pq.incTryUnlockTimes();
21: }
22: } catch (Exception e) {
23: log.error("removeUnnecessaryMessageQueue Exception", e);
24: }
25:
26: return false;
27: }
28: return true;
29: }
说明 :移除不需要的消息队列相关的信息,并返回是否移除成功。
第 2 至 4 行 :同步队列的消费进度,并移除之。
第 5 至 27 行 :
顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
[PullConsumer]
RebalancePullImpl#removeUnnecessaryMessageQueue(...)
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
3: this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
4: return true;
5: }
说明 :移除不需要的消息队列相关的信息,并返回移除成功。和
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
基本一致。
RebalancePushImpl#dispatchPullRequest(...)
1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
2: for (PullRequest pullRequest : pullRequestList) {
3: this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
4: log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
5: }
6: }
说明 :发起消息拉取请求。该调用是
PushConsumer
不断不断不断拉取消息的起点。
DefaultMQPushConsumerImpl#executePullRequestImmediately(...)
1: public void executePullRequestImmediately(final PullRequest pullRequest) {
2: this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
3: }
说明 :提交拉取请求。提交后,
PullMessageService
异步执行,非阻塞。详细解析见:PullMessageService。
AllocateMessageQueueStrategy
AllocateMessageQueueAveragely
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :平均分配队列策略。
第 7 至 25 行 :参数校验。
第 26 至 36 行 :平均分配消息队列。
[0,mod)
:mqAll.size()/cidAll.size()+1
。前面mod
个Consumer
平分余数,多获得 1 个消息队列。[mod,cidAll.size())
:mqAll.size()/cidAll.size()
。第 27 行 :
index
:当前Consumer
在消费集群里是第几个。这里就是为什么需要对传入的cidAll
参数必须进行排序的原因。如果不排序,Consumer
在本地计算出来的index 55 56342 55 31370 0 0 5911 0 0:00:09 0:00:05 0:00:04 6126
无法一致,影响计算结果。第 28 行 :
mod
:余数,即多少消息队列无法平均分配。第 29 至 31 行 :
averageSize
:代码可以简化成(mod>0&&index<mod?mqAll.size()/cidAll.size()+1:mqAll.size()/cidAll.size())
。第 32 行 :
startIndex
:Consumer
分配消息队列开始位置。第 33 行 :
range
:分配队列数量。之所以要Math#min(...)
的原因:当mqAll.size()<=cidAll.size()
时,最后几个Consumer
分配不到消息队列。第 34 至 36 行 :生成分配消息队列结果。
举个例子:
固定消息队列长度为4。
Consumer * 2 可以整除 | Consumer * 3 不可整除 | Consumer * 5 无法都分配 | |
---|---|---|---|
消息队列[0] | Consumer[0] | Consumer[0] | Consumer[0] |
消息队列[1] | Consumer[0] | Consumer[0] | Consumer[1] |
消息队列[2] | Consumer[1] | Consumer[1] | Consumer[2] |
消息队列[3] | Consumer[1] | Consumer[2] | Consumer[3] |
AllocateMessageQueueByMachineRoom
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :平均分配可消费的
Broker
对应的消息队列。第 7 至 15 行 :参数校验。
第 16 至 23 行 :计算可消费的
Broker
对应的消息队列。第 25 至 34 行 :平均分配消息队列。该平均分配方式和
AllocateMessageQueueAveragely
略有不同,其是将多余的结尾部分分配给前rem
个Consumer
。疑问:使用该分配策略时,
Consumer
和Broker
分配需要怎么配置。😈等研究主从相关源码时,仔细考虑下。
AllocateMessageQueueAveragelyByCircle
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :环状分配消息队列。
AllocateMessageQueueByConfig
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :分配配置的消息队列。
疑问 :该分配策略的使用场景。
5、PushConsumer 消费进度读取
RebalancePushImpl#computePullFromWhere(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :计算消息队列开始消费位置。
PushConsumer
读取消费进度有三种选项:CONSUME_FROM_LAST_OFFSET
:第 6 至 29 行 :一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。CONSUME_FROM_FIRST_OFFSET
:第 30 至 40 行 :一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。CONSUME_FROM_TIMESTAMP
:第 41 至 65 行 :一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。
[PullConsumer]
RebalancePullImpl#computePullFromWhere(...)
暂时跳过。😈
6、PushConsumer 拉取消息
PullMessageService
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :拉取消息服务,不断不断不断从
Broker
拉取消息,并提交消费任务到ConsumeMessageService
。#executePullRequestLater(...)
:第 26 至 40 行 : 提交延迟拉取消息请求。#executePullRequestImmediately(...)
:第 42 至 53 行 :提交立即拉取消息请求。#executeTaskLater(...)
:第 55 至 63 行 :提交延迟任务。#pullMessage(...)
:第 69 至 82 行 :执行拉取消息逻辑。详细解析见:DefaultMQPushConsumerImpl#pullMessage(...)。#run(...)
:第 84 至 101 行 :循环拉取消息请求队列(pullRequestQueue
),进行消息拉取。
DefaultMQPushConsumerImpl#pullMessage(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
#pullMessage(...)
说明 :拉取消息。执行消息拉取异步请求。详细解析见:PullAPIWrapper#pullKernelImpl(...)。
当发起请求产生异常时,提交延迟拉取消息请求。对应
Broker
处理拉取消息逻辑见:PullMessageProcessor#processRequest(...)。第 3 至 6 行 :消息处理队列已经终止,不进行消息拉取。
第 9 行 :设置消息处理队列最后拉取消息时间。
第 11 至 18 行 :
Consumer
未处于运行中状态,不进行消息拉取,提交延迟拉取消息请求。第 20 至 25 行 :
Consumer
处于暂停中,不进行消息拉取,提交延迟拉取消息请求。第 27 至 37 行 :消息处理队列持有消息超过最大允许值(默认:1000条),不进行消息拉取,提交延迟拉取消息请求。
第 39 至 49 行 :
Consumer
为并发消费 并且 消息队列持有消息跨度过大(消息跨度 = 持有消息最后一条和第一条的消息位置差,默认:2000),不进行消息拉取,提交延迟拉取消息请求。第 50 至 70 行 :
顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。第 72 至 78 行 :
Topic
对应的订阅信息不存在,不进行消息拉取,提交延迟拉取消息请求。第 222 至 224 行 :判断请求是否使用
Consumer
本地的订阅信息(SubscriptionData
),而不使用Broker
里的订阅信息。详细解析见:PullMessageProcessor#processRequest(...) 第 64 至 110 行代码。第 226 行 :是否开启过滤类过滤模式。详细解析见:《RocketMQ 源码分析 —— Filtersrv》。
第 229 至 235 行 :计算拉取消息请求系统标识。详细解析见:PullMessageRequestHeader.sysFlag。
第 237 至 255 行 :
PullCallback
:拉取消息回调:第 86 行 :处理拉取结果。详细逻辑见:PullAPIWrapper#processPullResult(...)。
第 89 至 192 行 :处理拉取状态结果: * 第 90 至 139 行 :拉取到消息(
FOUND
) : * 第 91 至 93 行 :设置下次拉取消息队列位置。 * 第 95 至 97 行 :统计。 * 第 101 至 102 行 :拉取到消息的消息列表为空,提交立即拉取消息请求。为什么会存在拉取到消息,但是消息结果未空呢?原因见:PullAPIWrapper#processPullResult(...)。 * 第 106 至 108 行 :统计。 * 第 111 行 :提交拉取到的消息到消息处理队列。详细解析见:ProcessQueue#putMessage(...)。 * 第 113 至 118 行 :提交消费请求到ConsumeMessageService
。详细解析见:ConsumeMessageConcurrentlyService。 * 第 120 至 126 行 :根据拉取频率(pullInterval
),提交立即或者延迟拉取消息请求。默认拉取频率为 0ms ,提交立即拉取消息请求。 * 第 129 至 137 行 :下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置,则判定为BUG,输出警告日志。第 140 至 149 行 :没有新消息(
NO_NEW_MSG
) : * 第 142 行 : 设置下次拉取消息队列位置。 * 第 145 行 :更正消费进度。详细解析见:#correctTagsOffset(...)
。 * 第 148 行 :提交立即拉取消息请求。第 150 至 159 行 :有新消息但是不匹配(
NO_MATCHED_MSG
)。逻辑同NO_NEW_MSG
。第 160 至 189 行 :拉取请求的消息队列位置不合法 (
OFFSET_ILLEGAL
)。 * 第 164 行 :设置下次拉取消息队列位置。 * 第 167 行 :设置消息处理队列为dropped
。 * 第 169 至 188 行 :提交延迟任务,进行队列移除。 * 第 175 至 178 行 :更新消费进度,同步消费进度到Broker
。 * 第 181 行 :移除消费处理队列。 * 疑问:为什么不立即移除???第 196 至 204 行 :发生异常,提交延迟拉取消息请求。
#correctTagsOffset(...)
:更正消费进度。第 258 至 261 行 : 当消费处理队列持有消息数量为 0 时,更新消费进度为拉取请求的拉取消息队列位置。
PullAPIWrapper#pullKernelImpl(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :拉取消息核心方法。该方法参数较多,可以看下代码注释上每个参数的说明😈。
第 34 至 43 行 :获取
Broker
信息(Broker
地址、是否为从节点)。#recalculatePullFromWhichNode(...)
#MQClientInstance#findBrokerAddressInSubscribe(...)
第 45 至 78 行 :请求拉取消息。
第 81 行 :当
Broker
信息不存在,则抛出异常。
PullAPIWrapper#recalculatePullFromWhichNode(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :计算消息队列拉取消息对应的
Broker
编号。
MQClientInstance#findBrokerAddressInSubscribe(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :获取
Broker
信息(Broker
地址、是否为从节点)。
PullAPIWrapper#processPullResult(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :处理拉取结果。
更新消息队列拉取消息
Broker
编号的映射。解析消息,并根据订阅信息消息
tagCode
匹配合适消息。第 16 行 :更新消息队列拉取消息
Broker
编号的映射。下次拉取消息时,如果未设置默认拉取的Broker
编号,会使用更新后的Broker
编号。第 18 至 55 行 :解析消息,并根据订阅信息消息
tagCode
匹配合适消息。第 20 至 22 行 :解析消息。详细解析见:《RocketMQ 源码分析 —— Message基础》 。
第 24 至 35 行 :根据订阅信息
tagCode
匹配消息。第 37 至 43 行 :
Hook
。第 45 至 51 行 :设置消息队列当前最小/最大位置到消息拓展字段。
第 54 行 :设置消息队列。
第 58 行 :清空消息二进制数组。
ProcessQueue#putMessage(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
总结
如果用最简单粗暴的方式描述 PullConsumer
拉取消息的过程,那就是如下的代码:
while (true) {
if (不满足拉取消息) {
Thread.sleep(间隔);
continue;
}
主动拉取消息();
}
6、PushConsumer 消费消息
ConsumeMessageConcurrentlyService 提交消费请求
ConsumeMessageConcurrentlyService#submitConsumeRequest(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :提交立即消费请求。
第 16 至 22 行 :提交消息小于等于批量消费数,直接提交消费请求。
第 23 至 47 行 :当提交消息大于批量消费数,进行分拆成多个请求。
第 25 至 33 行 :计算当前拆分请求包含的消息。
第 35 至 38 行 :提交拆分消费请求。
第 39 至 44 行 :提交请求被拒绝,则将当前拆分消息 + 剩余消息提交延迟消费请求,结束拆分循环。
ConsumeMessageConcurrentlyService#submitConsumeRequestLater
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :提交延迟消费请求。
第 34 行 :直接调用
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
。如果消息数超过批量消费上限,会不会是BUG。
ConsumeRequest
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :消费请求。提交请求执行消费。
第 24 至 28 行 :废弃处理队列不进行消费。
第 34 至 44 行 :Hook。
第 51 行 :当消息为重试消息,设置
Topic
为原始Topic
。例如:原始Topic
为TopicTest
,重试时Topic
为%RETRY%please_rename_unique_group_name_4
,经过该方法,Topic
设置回TopicTest
。第 53 至 58 行 :设置开始消费时间。
第 61 行 :进行消费。
第 71 至 85 行 :解析消费返回结果类型
第 87 至 90 行 :
Hook
。第 92 至 99 行 :消费结果状态未空时,则设置消费结果状态为稍后消费。
第 101 至 106 行 :
Hook
。第 108 至 110 行 :统计。
第 112 至 117 行 :处理消费结果。如果消费处理队列被移除,恰好消息被消费,则可能导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。详细解析见:ConsumeMessageConcurrentlyService#processConsumeResult(...)。
ConsumeMessageConcurrentlyService#processConsumeResult(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :处理消费结果。
第 8 至 10 行 :消费请求消息未空时,直接返回。
第 12 至 32 行 :计算
ackIndex
值。consumeRequest.msgs[0-ackIndex]
为消费成功,需要进行ack
确认。第 14 至 23 行 :
CONSUME_SUCCESS
:ackIndex=context.getAckIndex()
。第 24 至 29 行 :
RECONSUME_LATER
:ackIndex=-1
。第34 至 63 行 :处理消费失败的消息。
第 43 至 52 行 :发回消费失败的消息到
Broker
。详细解析见:DefaultMQPushConsumerImpl#sendMessageBack(...)。第 54 至 59 行 :发回
Broker
失败的消息,直接提交延迟重新消费。如果发回
Broker
成功,结果因为例如网络异常,导致Consumer
以为发回失败,判定消费发回失败,会导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。第 36 至 41 行 :
BROADCASTING
:广播模式,无论是否消费失败,不发回消息到Broker
,只打印日志。第 42 至 60 行 :
CLUSTERING
:集群模式,消费失败的消息发回到Broker
。第 65 至 69 行 :移除【消费成功】和【消费失败但发回
Broker
成功】的消息,并更新最新消费进度。为什么会有【消费失败但发回
Broker
成功】的消息?见第 56 行。ProcessQueue#removeMessage(...)
ProcessQueue#removeMessage(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
ConsumeMessageConcurrentlyService#cleanExpireMsg(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :定时清理过期消息,默认周期:15min。
ProcessQueue#cleanExpiredMsg(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :移除过期消息。
第 2 至 5 行 :顺序消费时,直接返回。
第 7 至 9 行 :循环移除消息。默认最大循环次数:16次。
第 10 至 25 行 :获取第一条消息。判断是否超时,若不超时,则结束循环。
第 29 行 :发回超时消息到
Broker
。第 32 至 48 行 :判断此时消息是否依然是第一条,若是,则进行移除。
7、PushConsumer 发回消费失败消息
DefaultMQPushConsumerImpl#sendMessageBack(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :发回消息。
第 4 至 8 行 :
Consumer
发回消息。详细解析见:MQClientAPIImpl#consumerSendMessageBack(...)。第 10 至 25 行 :发生异常时,
Consumer
内置默认Producer
发送消息。😈疑问:什么样的情况下会发生异常呢?
MQClientAPIImpl#consumerSendMessageBack(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
8、Consumer 消费进度
OffsetStore
RemoteBrokerOffsetStore
:Consumer
集群模式 下,使用远程Broker
消费进度。LocalFileOffsetStore
:Consumer
广播模式下,使用本地文件
消费进度。
OffsetStore#load(...)
LocalFileOffsetStore#load(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :从本地文件加载消费进度到内存。
OffsetSerializeWrapper
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :本地
Offset
存储序列化。
Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json
{
"offsetTable":{{
"brokerName":"broker-a",
"queueId":3,
"topic":"TopicTest"
}:1470,{
"brokerName":"broker-a",
"queueId":2,
"topic":"TopicTest"
}:1471,{
"brokerName":"broker-a",
"queueId":1,
"topic":"TopicTest"
}:1470,{
"brokerName":"broker-a",
"queueId":0,
"topic":"TopicTest"
}:1470
}
}
RemoteBrokerOffsetStore#load(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :不进行加载,实际读取消费进度时,从
Broker
获取。
OffsetStore#readOffset(...)
读取消费进度类型:
READ_FROM_MEMORY
:从内存读取。READ_FROM_STORE
:从存储(Broker
或文件
)读取。MEMORY_FIRST_THEN_STORE
:优先从内存读取,读取不到,从存储读取。
LocalFileOffsetStore#readOffset(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
第 16 行 :从
文件
读取消费进度。
RemoteBrokerOffsetStore#readOffset(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
第 16 行 :从
Broker
读取消费进度。
OffsetStore#updateOffset(...)
该方法 RemoteBrokerOffsetStore
与 LocalFileOffsetStore
实现相同。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
OffsetStore#persistAll(...)
LocalFileOffsetStore#persistAll(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :持久化消费进度。将消费进度写入文件。
RemoteBrokerOffsetStore#persistAll(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :持久化指定消息队列数组的消费进度到
Broker
,并移除非指定消息队列。
MQClientInstance#persistAllConsumerOffset(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :定时进行持久化,默认周期:5000ms。
重要说明 :
消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
9、结尾
😈可能是本系列最长的一篇文章,如有表达错误和不清晰,请多多见谅。
感谢对本系列的阅读、收藏、点赞、分享,特别是翻到结尾。😜真的有丢丢长。
本文转载自「芋道源码」
http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
推荐阅读
Spring干货汇总(含Spring Boot与Spring Cloud)
点击 “阅读原文” 看看本号其他精彩内容