分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)
摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 「芋道源码」欢迎转载,保留摘要,谢谢!
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
本文主要基于 RocketMQ 4.0.x 正式版
1、概述
2、Consumer
3、PushConsumer 一览
4、PushConsumer 订阅
5、PushConsumer 消息队列分配
6、PushConsumer 消费进度读取
7、PushConsumer 拉取消息
8、PushConsumer 消费消息
9、PushConsumer 发回消费失败消息
10、Consumer 消费进度
11、结尾
1、概述
本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。
主要解析 Consumer
在 消费 逻辑涉及到的源码。
2、Consumer
MQ 提供了两类消费者:
PushConsumer:
在大多数场景下使用。
名字虽然是
Push
开头,实际在实现时,使用Pull
方式实现。通过Pull
不断不断不断轮询Broker
获取消息。当不存在新消息时,Broker
会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和Broker
主动Push
做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询(Long-Polling
)。PullConsumer
本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
3、PushConsumer 一览
先看一张 PushConsumer
包含的组件以及组件之间的交互图:
RebalanceService
:均衡消息队列服务,负责分配当前Consumer
可消费的消息队列(MessageQueue
)。当有新的Consumer
的加入或移除,都会重新分配消息队列。PullMessageService
:拉取消息服务,不断不断不断从Broker
拉取消息,并提交消费任务到ConsumeMessageService
。ConsumeMessageService
:消费消息服务,不断不断不断消费消息,并处理消费结果。RemoteBrokerOffsetStore
:Consumer
消费进度管理,负责从Broker
获取消费进度,同步消费进度到Broker
。ProcessQueue
:消息处理队列。MQClientInstance
:封装对Namesrv
,Broker
的 API调用,提供给Producer
、Consumer
使用。
4、PushConsumer 订阅
DefaultMQPushConsumerImpl#subscribe(...)
1: public void subscribe(String topic, String subExpression) throws MQClientException {
2: try {
3: // 创建订阅数据
4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
5: topic, subExpression);
6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
7: // 通过心跳同步Consumer信息到Broker
8: if (this.mQClientFactory != null) {
9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
10: }
11: } catch (Exception e) {
12: throw new MQClientException("subscription exception", e);
13: }
14: }
说明 :订阅
Topic
。第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(...)。
第 7 至 10 行 :通过心跳同步
Consumer
信息到Broker
。
FilterAPI.buildSubscriptionData(...)
1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
2: String subString) throws Exception {
3: SubscriptionData subscriptionData = new SubscriptionData();
4: subscriptionData.setTopic(topic);
5: subscriptionData.setSubString(subString);
6: // 处理订阅表达式
7: if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
8: subscriptionData.setSubString(SubscriptionData.SUB_ALL);
9: } else {
10: String[] tags = subString.split("\\|\\|");
11: if (tags.length > 0) {
12: for (String tag : tags) {
13: if (tag.length() > 0) {
14: String trimString = tag.trim();
15: if (trimString.length() > 0) {
16: subscriptionData.getTagsSet().add(trimString);
17: subscriptionData.getCodeSet().add(trimString.hashCode());
18: }
19: }
20: }
21: } else {
22: throw new Exception("subString split error");
23: }
24: }
25:
26: return subscriptionData;
27: }
说明 :根据
Topic
和 订阅表达式 创建订阅数据subscriptionData.subVersion = System.currentTimeMillis()。
DefaultMQPushConsumer#registerMessageListener(...)
1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
2: this.messageListener = messageListener;
3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
4: }
说明 :注册消息监听器。
5、PushConsumer 消息队列分配
RebalanceService
1: public class RebalanceService extends ServiceThread {
2:
3: /**
4: * 等待间隔,单位:毫秒
5: */
6: private static long waitInterval =
7: Long.parseLong(System.getProperty(
8: "rocketmq.client.rebalance.waitInterval", "20000"));
9:
10: private final Logger log = ClientLogger.getLog();
11: /**
12: * MQClient对象
13: */
14: private final MQClientInstance mqClientFactory;
15:
16: public RebalanceService(MQClientInstance mqClientFactory) {
17: this.mqClientFactory = mqClientFactory;
18: }
19:
20: @Override
21: public void run() {
22: log.info(this.getServiceName() + " service started");
23:
24: while (!this.isStopped()) {
25: this.waitForRunning(waitInterval);
26: this.mqClientFactory.doRebalance();
27: }
28:
29: log.info(this.getServiceName() + " service end");
30: }
31:
32: @Override
33: public String getServiceName() {
34: return RebalanceService.class.getSimpleName();
35: }
36: }
说明 :均衡消息队列服务,负责分配当前
Consumer
可消费的消息队列(MessageQueue
)。第 26 行 :调用
MQClientInstance#doRebalance(...)
分配消息队列。目前有三种情况情况下触发:详细解析见:MQClientInstance#doRebalance(...)。
如
第25行
等待超时,每 20s 调用一次。PushConsumer
启动时,调用rebalanceService#wakeup(...)
触发。Broker
通知Consumer
加入 或 移除时,Consumer
响应通知,调用rebalanceService#wakeup(...)
触发。
MQClientInstance#doRebalance(...)
1: public void doRebalance() {
2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
3: MQConsumerInner impl = entry.getValue();
4: if (impl != null) {
5: try {
6: impl.doRebalance();
7: } catch (Throwable e) {
8: log.error("doRebalance exception", e);
9: }
10: }
11: }
12: }
说明 :遍历当前
Client
包含的consumerTable
(Consumer
集合 ),执行消息队列分配。疑问:目前代码调试下来,
consumerTable
只包含Consumer
自己。😈有大大对这个疑问有解答的,烦请解答下。第 6 行 :调用
MQConsumerInner#doRebalance(...)
进行队列分配。DefaultMQPushConsumerImpl
、DefaultMQPullConsumerImpl
分别对该接口方法进行了实现。DefaultMQPushConsumerImpl#doRebalance(...)
详细解析见:DefaultMQPushConsumerImpl#doRebalance(...)。
DefaultMQPushConsumerImpl#doRebalance(...)
1: public void doRebalance() {
2: if (!this.pause) {
3: this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
4: }
5: }
说明:执行消息队列分配。
第 3 行 :调用
RebalanceImpl#doRebalance(...)
进行队列分配。详细解析见:RebalancePushImpl#doRebalance(...)。
RebalanceImpl#doRebalance(...)
1: /**
2: * 执行分配消息队列
3: *
4: * @param isOrder 是否顺序消息
5: */
6: public void doRebalance(final boolean isOrder) {
7: // 分配每个 topic 的消息队列
8: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
9: if (subTable != null) {
10: for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
11: final String topic = entry.getKey();
12: try {
13: this.rebalanceByTopic(topic, isOrder);
14: } catch (Throwable e) {
15: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
16: log.warn("rebalanceByTopic Exception", e);
17: }
18: }
19: }
20: }
21: // 移除未订阅的topic对应的消息队列
22: this.truncateMessageQueueNotMyTopic();
23: }
24:
25: /**
26: * 移除未订阅的消息队列
27: */
28: private void truncateMessageQueueNotMyTopic() {
29: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
30: for (MessageQueue mq : this.processQueueTable.keySet()) {
31: if (!subTable.containsKey(mq.getTopic())) {
32:
33: ProcessQueue pq = this.processQueueTable.remove(mq);
34: if (pq != null) {
35: pq.setDropped(true);
36: log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
37: }
38: }
39: }
40: }
#doRebalance(...)
说明 :执行分配消息队列。第 7 至 20 行 :循环订阅主题集合(
subscriptionInner
),分配每一个Topic
的消息队列。第 22 行 :移除未订阅的
Topic
的消息队列。#truncateMessageQueueNotMyTopic(...)
说明 :移除未订阅的消息队列。当调用DefaultMQPushConsumer#unsubscribe(topic)
时,只移除订阅主题集合(subscriptionInner
),对应消息队列移除在该方法。
RebalanceImpl#rebalanceByTopic(...)
1: private void rebalanceByTopic(final String topic, final boolean isOrder) {
2: switch (messageModel) {
3: case BROADCASTING: {
4: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
5: if (mqSet != null) {
6: boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
7: if (changed) {
8: this.messageQueueChanged(topic, mqSet, mqSet);
9: log.info("messageQueueChanged {} {} {} {}", //
10: consumerGroup, //
11: topic, //
12: mqSet, //
13: mqSet);
14: }
15: } else {
16: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
17: }
18: break;
19: }
20: case CLUSTERING: {
21: // 获取 topic 对应的 队列 和 consumer信息
22: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
23: List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
24: if (null == mqSet) {
25: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
26: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
27: }
28: }
29:
30: if (null == cidAll) {
31: log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
32: }
33:
34: if (mqSet != null && cidAll != null) {
35: // 排序 消息队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。
36: List<MessageQueue> mqAll = new ArrayList<>();
37: mqAll.addAll(mqSet);
38:
39: Collections.sort(mqAll);
40: Collections.sort(cidAll);
41:
42: AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
43:
44: // 根据 队列分配策略 分配消息队列
45: List<MessageQueue> allocateResult;
46: try {
47: allocateResult = strategy.allocate(//
48: this.consumerGroup, //
49: this.mQClientFactory.getClientId(), //
50: mqAll, //
51: cidAll);
52: } catch (Throwable e) {
53: log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
54: e);
55: return;
56: }
57:
58: Set<MessageQueue> allocateResultSet = new HashSet<>();
59: if (allocateResult != null) {
60: allocateResultSet.addAll(allocateResult);
61: }
62:
63: // 更新消息队列
64: boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
65: if (changed) {
66: log.info(
67: "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
68: strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
69: allocateResultSet.size(), allocateResultSet);
70: this.messageQueueChanged(topic, mqSet, allocateResultSet);
71: }
72: }
73: break;
74: }
75: default:
76: break;
77: }
78: }
79:
80: /**
81: * 当负载均衡时,更新 消息处理队列
82: * - 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
83: * - 增加 不在processQueueTable && 存在于mqSet 里的消息队列
84: *
85: * @param topic Topic
86: * @param mqSet 负载均衡结果后的消息队列数组
87: * @param isOrder 是否顺序
88: * @return 是否变更
89: */
90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
91: boolean changed = false;
92:
93: // 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
94: Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
95: while (it.hasNext()) { // TODO 待读:
96: Entry<MessageQueue, ProcessQueue> next = it.next();
97: MessageQueue mq = next.getKey();
98: ProcessQueue pq = next.getValue();
99:
100: if (mq.getTopic().equals(topic)) {
101: if (!mqSet.contains(mq)) { // 不包含的队列
102: pq.setDropped(true);
103: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
104: it.remove();
105: changed = true;
106: log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
107: }
108: } else if (pq.isPullExpired()) { // 队列拉取超时,进行清理
109: switch (this.consumeType()) {
110: case CONSUME_ACTIVELY:
111: break;
112: case CONSUME_PASSIVELY:
113: pq.setDropped(true);
114: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
115: it.remove();
116: changed = true;
117: log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
118: consumerGroup, mq);
119: }
120: break;
121: default:
122: break;
123: }
124: }
125: }
126: }
127:
128: // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。
129: List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息请求数组
130: for (MessageQueue mq : mqSet) {
131: if (!this.processQueueTable.containsKey(mq)) {
132: if (isOrder && !this.lock(mq)) {
133: log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
134: continue;
135: }
136:
137: this.removeDirtyOffset(mq);
138: ProcessQueue pq = new ProcessQueue();
139: long nextOffset = this.computePullFromWhere(mq);
140: if (nextOffset >= 0) {
141: ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
142: if (pre != null) {
143: log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
144: } else {
145: log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
146: PullRequest pullRequest = new PullRequest();
147: pullRequest.setConsumerGroup(consumerGroup);
148: pullRequest.setNextOffset(nextOffset);
149: pullRequest.setMessageQueue(mq);
150: pullRequest.setProcessQueue(pq);
151: pullRequestList.add(pullRequest);
152: changed = true;
153: }
154: } else {
155: log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
156: }
157: }
158: }
159:
160: // 发起消息拉取请求
161: this.dispatchPullRequest(pullRequestList);
162:
163: return changed;
164: }
#rebalanceByTopic(...)
说明 :分配Topic
的消息队列。第 21 至 40 行 :获取
Topic
对应的消息队列和消费者们,并对其进行排序。因为各Consumer
是在本地分配消息队列,排序后才能保证各Consumer
顺序一致。第 42 至 61 行 :根据 队列分配策略(
AllocateMessageQueueStrategy
) 分配消息队列。详细解析见:AllocateMessageQueueStrategy。第 63 至 72 行 :更新
Topic
对应的消息队列。第 3 至 19 行 :广播模式(
BROADCASTING
) 下,分配Topic
对应的所有消息队列。第 20 至 74 行 :集群模式(
CLUSTERING
) 下,分配Topic
对应的部分消息队列。#updateProcessQueueTableInRebalance(...)
说明 :当分配队列时,更新Topic
对应的消息队列,并返回是否有变更。第 132 至 135 行 :
顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。第 137 行 :移除消息队列的消费进度。
第 139 行 :获取队列消费进度。详细解析见:RebalancePushImpl#computePullFromWhere(...)。
第 140 至 156 行 :添加新消费处理队列,添加消费拉取消息请求。
第 103 行 :移除不需要的消息队列。详细解析见:RebalancePushImpl#removeUnnecessaryMessageQueue(...)。
第 108 至 120 行 :队列拉取超时,即
当前时间-最后一次拉取消息时间>120s
( 120s 可配置),判定发生 BUG,过久未进行消息拉取,移除消息队列。移除后,下面#新增队列逻辑#可以重新加入新的该消息队列。第 93 至 126 行 :移除不存在于分配的消息队列(
mqSet
) 的 消息处理队列(processQueueTable
)。第 128 至 158 行 :增加 分配的消息队列(
mqSet
) 新增的消息队列。第 161 行 :发起新增的消息队列消息拉取请求。详细解析见:RebalancePushImpl#dispatchPullRequest(...)。
RebalanceImpl#removeUnnecessaryMessageQueue(...)
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: // 同步队列的消费进度,并移除之。
3: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
4: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
5: // TODO 顺序消费
6: if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
7: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
8: try {
9: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
10: try {
11: return this.unlockDelay(mq, pq);
12: } finally {
13: pq.getLockConsume().unlock();
14: }
15: } else {
16: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
17: mq, //
18: pq.getTryUnlockTimes());
19:
20: pq.incTryUnlockTimes();
21: }
22: } catch (Exception e) {
23: log.error("removeUnnecessaryMessageQueue Exception", e);
24: }
25:
26: return false;
27: }
28: return true;
29: }
说明 :移除不需要的消息队列相关的信息,并返回是否移除成功。
第 2 至 4 行 :同步队列的消费进度,并移除之。
第 5 至 27 行 :
顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
[PullConsumer]
RebalancePullImpl#removeUnnecessaryMessageQueue(...)
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
3: this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
4: return true;
5: }
说明 :移除不需要的消息队列相关的信息,并返回移除成功。和
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
基本一致。
RebalancePushImpl#dispatchPullRequest(...)
1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
2: for (PullRequest pullRequest : pullRequestList) {
3: this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
4: log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
5: }
6: }
说明 :发起消息拉取请求。该调用是
PushConsumer
不断不断不断拉取消息的起点。
DefaultMQPushConsumerImpl#executePullRequestImmediately(...)
1: public void executePullRequestImmediately(final PullRequest pullRequest) {
2: this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
3: }
说明 :提交拉取请求。提交后,
PullMessageService
异步执行,非阻塞。详细解析见:PullMessageService。
AllocateMessageQueueStrategy
AllocateMessageQueueAveragely
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :平均分配队列策略。
第 7 至 25 行 :参数校验。
第 26 至 36 行 :平均分配消息队列。
[0,mod)
:mqAll.size()/cidAll.size()+1
。前面mod
个Consumer
平分余数,多获得 1 个消息队列。[mod,cidAll.size())
:mqAll.size()/cidAll.size()
。第 27 行 :
index
:当前Consumer
在消费集群里是第几个。这里就是为什么需要对传入的cidAll
参数必须进行排序的原因。如果不排序,Consumer
在本地计算出来的index
无法一致,影响计算结果。第 28 行 :
mod
:余数,即多少消息队列无法平均分配。第 29 至 31 行 :
averageSize
:代码可以简化成(mod>0&&index<mod?mqAll.size()/cidAll.size()+1:mqAll.size()/cidAll.size())
。第 32 行 :
startIndex
:Consumer
分配消息队列开始位置。第 33 行 :
range
:分配队列数量。之所以要Math#min(...)
的原因:当mqAll.size()<=cidAll.size()
时,最后几个Consumer
分配不到消息队列。第 34 至 36 行 :生成分配消息队列结果。
举个例子:
固定消息队列长度为4。
Consumer * 2 可以整除 | Consumer * 3 不可整除 | Consumer * 5 无法都分配 | |
---|---|---|---|
消息队列[0] | Consumer[0] | Consumer[0] | Consumer[0] |
消息队列[1] | Consumer[0] | Consumer[0] | Consumer[1] |
消息队列[2] | Consumer[1] | Consumer[1] | Consumer[2] |
消息队列[3] | Consumer[1] | Consumer[2] | Consumer[3] |
AllocateMessageQueueByMachineRoom
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :平均分配可消费的
Broker
对应的消息队列。第 7 至 15 行 :参数校验。
第 16 至 23 行 :计算可消费的
Broker
对应的消息队列。第 25 至 34 行 :平均分配消息队列。该平均分配方式和
AllocateMessageQueueAveragely
略有不同,其是将多余的结尾部分分配给前rem
个Consumer
。疑问:使用该分配策略时,
Consumer
和Broker
分配需要怎么配置。😈等研究主从相关源码时,仔细考虑下。
AllocateMessageQueueAveragelyByCircle
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :环状分配消息队列。
AllocateMessageQueueByConfig
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :分配配置的消息队列。
疑问 :该分配策略的使用场景。
5、PushConsumer 消费进度读取
RebalancePushImpl#computePullFromWhere(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :计算消息队列开始消费位置。
PushConsumer
读取消费进度有三种选项:CONSUME_FROM_LAST_OFFSET
:第 6 至 29 行 :一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。CONSUME_FROM_FIRST_OFFSET
:第 30 至 40 行 :一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。CONSUME_FROM_TIMESTAMP
:第 41 至 65 行 :一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。
[PullConsumer]
RebalancePullImpl#computePullFromWhere(...)
暂时跳过。😈
6、PushConsumer 拉取消息
PullMessageService
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :拉取消息服务,不断不断不断从
Broker
拉取消息,并提交消费任务到ConsumeMessageService
。#executePullRequestLater(...)
:第 26 至 40 行 : 提交延迟拉取消息请求。#executePullRequestImmediately(...)
:第 42 至 53 行 :提交立即拉取消息请求。#executeTaskLater(...)
:第 55 至 63 行 :提交延迟任务。#pullMessage(...)
:第 69 至 82 行 :执行拉取消息逻辑。详细解析见:DefaultMQPushConsumerImpl#pullMessage(...)。#run(...)
:第 84 至 101 行 :循环拉取消息请求队列(pullRequestQueue
),进行消息拉取。
DefaultMQPushConsumerImpl#pullMessage(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
#pullMessage(...)
说明 :拉取消息。执行消息拉取异步请求。详细解析见:PullAPIWrapper#pullKernelImpl(...)。
当发起请求产生异常时,提交延迟拉取消息请求。对应
Broker
处理拉取消息逻辑见:PullMessageProcessor#processRequest(...)。第 3 至 6 行 :消息处理队列已经终止,不进行消息拉取。
第 9 行 :设置消息处理队列最后拉取消息时间。
第 11 至 18 行 :
Consumer
未处于运行中状态,不进行消息拉取,提交延迟拉取消息请求。第 20 至 25 行 :
Consumer
处于暂停中,不进行消息拉取,提交延迟拉取消息请求。第 27 至 37 行 :消息处理队列持有消息超过最大允许值(默认:1000条),不进行消息拉取,提交延迟拉取消息请求。
第 39 至 49 行 :
Consumer
为并发消费 并且 消息队列持有消息跨度过大(消息跨度 = 持有消息最后一条和第一条的消息位置差,默认:2000),不进行消息拉取,提交延迟拉取消息请求。第 50 至 70 行 :
顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。第 72 至 78 行 :
Topic
对应的订阅信息不存在,不进行消息拉取,提交延迟拉取消息请求。第 222 至 224 行 :判断请求是否使用
Consumer
本地的订阅信息(SubscriptionData
),而不使用Broker
里的订阅信息。详细解析见:PullMessageProcessor#processRequest(...) 第 64 至 110 行代码。第 226 行 :是否开启过滤类过滤模式。详细解析见:《RocketMQ 源码分析 —— Filtersrv》。
第 229 至 235 行 :计算拉取消息请求系统标识。详细解析见:PullMessageRequestHeader.sysFlag。
第 237 至 255 行 :
PullCallback
:拉取消息回调:第 86 行 :处理拉取结果。详细逻辑见:PullAPIWrapper#processPullResult(...)。
第 89 至 192 行 :处理拉取状态结果: * 第 90 至 139 行 :拉取到消息(
FOUND
) : * 第 91 至 93 行 :设置下次拉取消息队列位置。 * 第 95 至 97 行 :统计。 * 第 101 至 102 行 :拉取到消息的消息列表为空,提交立即拉取消息请求。为什么会存在拉取到消息,但是消息结果未空呢?原因见:PullAPIWrapper#processPullResult(...)。 * 第 106 至 108 行 :统计。 * 第 111 行 :提交拉取到的消息到消息处理队列。详细解析见:ProcessQueue#putMessage(...)。 * 第 113 至 118 行 :提交消费请求到ConsumeMessageService
。详细解析见:ConsumeMessageConcurrentlyService。 * 第 120 至 126 行 :根据拉取频率(pullInterval
),提交立即或者延迟拉取消息请求。默认拉取频率为 0ms ,提交立即拉取消息请求。 * 第 129 至 137 行 :下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置,则判定为BUG,输出警告日志。第 140 至 149 行 :没有新消息(
NO_NEW_MSG
) : * 第 142 行 : 设置下次拉取消息队列位置。 * 第 145 行 :更正消费进度。详细解析见:#correctTagsOffset(...)
。 * 第 148 行 :提交立即拉取消息请求。第 150 至 159 行 :有新消息但是不匹配(
NO_MATCHED_MSG
)。逻辑同NO_NEW_MSG
。第 160 至 189 行 :拉取请求的消息队列位置不合法 (
OFFSET_ILLEGAL
)。 * 第 164 行 :设置下次拉取消息队列位置。 * 第 167 行 :设置消息处理队列为dropped
。 * 第 169 至 188 行 :提交延迟任务,进行队列移除。 * 第 175 至 178 行 :更新消费进度,同步消费进度到Broker
。 * 第 181 行 :移除消费处理队列。 * 疑问:为什么不立即移除???第 196 至 204 行 :发生异常,提交延迟拉取消息请求。
#correctTagsOffset(...)
:更正消费进度。第 258 至 261 行 : 当消费处理队列持有消息数量为 0 时,更新消费进度为拉取请求的拉取消息队列位置。
PullAPIWrapper#pullKernelImpl(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :拉取消息核心方法。该方法参数较多,可以看下代码注释上每个参数的说明😈。
第 34 至 43 行 :获取
Broker
信息(Broker
地址、是否为从节点)。#recalculatePullFromWhichNode(...)
#MQClientInstance#findBrokerAddressInSubscribe(...)
第 45 至 78 行 :请求拉取消息。
第 81 行 :当
Broker
信息不存在,则抛出异常。
PullAPIWrapper#recalculatePullFromWhichNode(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :计算消息队列拉取消息对应的
Broker
编号。
MQClientInstance#findBrokerAddressInSubscribe(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :获取
Broker
信息(Broker
地址、是否为从节点)。
PullAPIWrapper#processPullResult(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :处理拉取结果。
更新消息队列拉取消息
Broker
编号的映射。解析消息,并根据订阅信息消息
tagCode
匹配合适消息。第 16 行 :更新消息队列拉取消息
Broker
编号的映射。下次拉取消息时,如果未设置默认拉取的Broker
编号,会使用更新后的Broker
编号。第 18 至 55 行 :解析消息,并根据订阅信息消息
tagCode
匹配合适消息。第 20 至 22 行 :解析消息。详细解析见:《RocketMQ 源码分析 —— Message基础》 。
第 24 至 35 行 :根据订阅信息
tagCode
匹配消息。第 37 至 43 行 :
Hook
。第 45 至 51 行 :设置消息队列当前最小/最大位置到消息拓展字段。
第 54 行 :设置消息队列。
第 58 行 :清空消息二进制数组。
ProcessQueue#putMessage(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
总结
如果用最简单粗暴的方式描述 PullConsumer
拉取消息的过程,那就是如下的代码:
while (true) {
if (不满足拉取消息) {
Thread.sleep(间隔);
continue;
}
主动拉取消息();
}
6、PushConsumer 消费消息
ConsumeMessageConcurrentlyService 提交消费请求
ConsumeMessageConcurrentlyService#submitConsumeRequest(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :提交立即消费请求。
第 16 至 22 行 :提交消息小于等于批量消费数,直接提交消费请求。
第 23 至 47 行 :当提交消息大于批量消费数,进行分拆成多个请求。
第 25 至 33 行 :计算当前拆分请求包含的消息。
第 35 至 38 行 :提交拆分消费请求。
第 39 至 44 行 :提交请求被拒绝,则将当前拆分消息 + 剩余消息提交延迟消费请求,结束拆分循环。
ConsumeMessageConcurrentlyService#submitConsumeRequestLater
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :提交延迟消费请求。
第 34 行 :直接调用
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
。如果消息数超过批量消费上限,会不会是BUG。
ConsumeRequest
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :消费请求。提交请求执行消费。
第 24 至 28 行 :废弃处理队列不进行消费。
第 34 至 44 行 :Hook。
第 51 行 :当消息为重试消息,设置
Topic
为原始Topic
。例如:原始Topic
为TopicTest
,重试时Topic
为%RETRY%please_rename_unique_group_name_4
,经过该方法,Topic
设置回TopicTest
。第 53 至 58 行 :设置开始消费时间。
第 61 行 :进行消费。
第 71 至 85 行 :解析消费返回结果类型
第 87 至 90 行 :
Hook
。第 92 至 99 行 :消费结果状态未空时,则设置消费结果状态为稍后消费。
第 101 至 106 行 :
Hook
。第 108 至 110 行 :统计。
第 112 至 117 行 :处理消费结果。如果消费处理队列被移除,恰好消息被消费,则可能导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。详细解析见:ConsumeMessageConcurrentlyService#processConsumeResult(...)。
ConsumeMessageConcurrentlyService#processConsumeResult(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :处理消费结果。
第 8 至 10 行 :消费请求消息未空时,直接返回。
第 12 至 32 行 :计算
ackIndex
值。consumeRequest.msgs[0-ackIndex]
为消费成功,需要进行ack
确认。第 14 至 23 行 :
CONSUME_SUCCESS
:ackIndex=context.getAckIndex()
。第 24 至 29 行 :
RECONSUME_LATER
:ackIndex=-1
。第34 至 63 行 :处理消费失败的消息。
第 43 至 52 行 :发回消费失败的消息到
Broker
。详细解析见:DefaultMQPushConsumerImpl#sendMessageBack(...)。第 54 至 59 行 :发回
Broker
失败的消息,直接提交延迟重新消费。如果发回
Broker
成功,结果因为例如网络异常,导致Consumer
以为发回失败,判定消费发回失败,会导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。第 36 至 41 行 :
BROADCASTING
:广播模式,无论是否消费失败,不发回消息到Broker
,只打印日志。第 42 至 60 行 :
CLUSTERING
:集群模式,消费失败的消息发回到Broker
。第 65 至 69 行 :移除【消费成功】和【消费失败但发回
Broker
成功】的消息,并更新最新消费进度。为什么会有【消费失败但发回
Broker
成功】的消息?见第 56 行。ProcessQueue#removeMessage(...)
ProcessQueue#removeMessage(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
ConsumeMessageConcurrentlyService#cleanExpireMsg(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :定时清理过期消息,默认周期:15min。
ProcessQueue#cleanExpiredMsg(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :移除过期消息。
第 2 至 5 行 :顺序消费时,直接返回。
第 7 至 9 行 :循环移除消息。默认最大循环次数:16次。
第 10 至 25 行 :获取第一条消息。判断是否超时,若不超时,则结束循环。
第 29 行 :发回超时消息到
Broker
。第 32 至 48 行 :判断此时消息是否依然是第一条,若是,则进行移除。
7、PushConsumer 发回消费失败消息
DefaultMQPushConsumerImpl#sendMessageBack(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :发回消息。
第 4 至 8 行 :
Consumer
发回消息。详细解析见:MQClientAPIImpl#consumerSendMessageBack(...)。第 10 至 25 行 :发生异常时,
Consumer
内置默认Producer
发送消息。😈疑问:什么样的情况下会发生异常呢?
MQClientAPIImpl#consumerSendMessageBack(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
8、Consumer 消费进度
OffsetStore
RemoteBrokerOffsetStore
:Consumer
集群模式 下,使用远程Broker
消费进度。LocalFileOffsetStore
:Consumer
广播模式下,使用本地文件
消费进度。
OffsetStore#load(...)
LocalFileOffsetStore#load(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :从本地文件加载消费进度到内存。
OffsetSerializeWrapper
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :本地
Offset
存储序列化。
Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json
{
"offsetTable":{{
"brokerName":"broker-a",
"queueId":3,
"topic":"TopicTest"
}:1470,{
"brokerName":"broker-a",
"queueId":2,
"topic":"TopicTest"
}:1471,{
"brokerName":"broker-a",
"queueId":1,
"topic":"TopicTest"
}:1470,{
"brokerName":"broker-a",
"queueId":0,
"topic":"TopicTest"
}:1470
}
}
RemoteBrokerOffsetStore#load(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :不进行加载,实际读取消费进度时,从
Broker
获取。
OffsetStore#readOffset(...)
读取消费进度类型:
READ_FROM_MEMORY
:从内存读取。READ_FROM_STORE
:从存储(Broker
或文件
)读取。MEMORY_FIRST_THEN_STORE
:优先从内存读取,读取不到,从存储读取。
LocalFileOffsetStore#readOffset(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
第 16 行 :从
文件
读取消费进度。
RemoteBrokerOffsetStore#readOffset(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
第 16 行 :从
Broker
读取消费进度。
OffsetStore#updateOffset(...)
该方法 RemoteBrokerOffsetStore
与 LocalFileOffsetStore
实现相同。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
OffsetStore#persistAll(...)
LocalFileOffsetStore#persistAll(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :持久化消费进度。将消费进度写入文件。
RemoteBrokerOffsetStore#persistAll(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :持久化指定消息队列数组的消费进度到
Broker
,并移除非指定消息队列。
MQClientInstance#persistAllConsumerOffset(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
说明 :定时进行持久化,默认周期:5000ms。
重要说明 :
消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
9、结尾
😈可能是本系列最长的一篇文章,如有表达错误和不清晰,请多多见谅。
感谢对本系列的阅读、收藏、点赞、分享,特别是翻到结尾。😜真的有丢丢长。