查看原文
其他

分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)

2017-12-24 芋艿 程序猿DD

前传分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

本文主要基于 RocketMQ 4.0.x 正式版

  • 1、概述

  • 2、Consumer

  • 3、PushConsumer 一览

  • 4、PushConsumer 订阅

  • 5、PushConsumer 消息队列分配

  • 6、PushConsumer 消费进度读取

  • 7、PushConsumer 拉取消息

  • 8、PushConsumer 消费消息

  • 9、PushConsumer 发回消费失败消息

  • 10、Consumer 消费进度

  • 11、结尾


1、概述

本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。

主要解析 Consumer消费 逻辑涉及到的源码。

2、Consumer

MQ 提供了两类消费者:

  • PushConsumer:

    • 在大多数场景下使用。

    • 名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时, Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )

  • PullConsumer

本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费
本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费
本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费

3、PushConsumer 一览

先看一张 PushConsumer 包含的组件以及组件之间的交互图:

  • RebalanceService:均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。

  • PullMessageService:拉取消息服务,不断不断不断从 Broker 拉取消息,并提交消费任务到 ConsumeMessageService

  • ConsumeMessageService:消费消息服务,不断不断不断消费消息,并处理消费结果。

  • RemoteBrokerOffsetStore: Consumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker

  • ProcessQueue :消息处理队列。

  • MQClientInstance :封装对 Namesrv, Broker 的 API调用,提供给 Producer、 Consumer 使用。

4、PushConsumer 订阅

DefaultMQPushConsumerImpl#subscribe(...)

  1.  1: public void subscribe(String topic, String subExpression) throws MQClientException {

  2.  2:     try {

  3.  3:         // 创建订阅数据

  4.  4:         SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //

  5.  5:             topic, subExpression);

  6.  6:         this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);

  7.  7:         // 通过心跳同步Consumer信息到Broker

  8.  8:         if (this.mQClientFactory != null) {

  9.  9:             this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

  10. 10:         }

  11. 11:     } catch (Exception e) {

  12. 12:         throw new MQClientException("subscription exception", e);

  13. 13:     }

  14. 14: }

  • 说明 :订阅 Topic 。

  • 第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(...)。

  • 第 7 至 10 行 :通过心跳同步 Consumer 信息到 Broker

FilterAPI.buildSubscriptionData(...)

  1.  1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,

  2.  2:     String subString) throws Exception {

  3.  3:     SubscriptionData subscriptionData = new SubscriptionData();

  4.  4:     subscriptionData.setTopic(topic);

  5.  5:     subscriptionData.setSubString(subString);

  6.  6:     // 处理订阅表达式

  7.  7:     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {

  8.  8:         subscriptionData.setSubString(SubscriptionData.SUB_ALL);

  9.  9:     } else {

  10. 10:         String[] tags = subString.split("\\|\\|");

  11. 11:         if (tags.length > 0) {

  12. 12:             for (String tag : tags) {

  13. 13:                 if (tag.length() > 0) {

  14. 14:                     String trimString = tag.trim();

  15. 15:                     if (trimString.length() > 0) {

  16. 16:                         subscriptionData.getTagsSet().add(trimString);

  17. 17:                         subscriptionData.getCodeSet().add(trimString.hashCode());

  18. 18:                     }

  19. 19:                 }

  20. 20:             }

  21. 21:         } else {

  22. 22:             throw new Exception("subString split error");

  23. 23:         }

  24. 24:     }

  25. 25:

  26. 26:     return subscriptionData;

  27. 27: }

  • 说明 :根据 Topic 和 订阅表达式 创建订阅数据

  • subscriptionData.subVersion = System.currentTimeMillis()。

DefaultMQPushConsumer#registerMessageListener(...)

  1.  1: public void registerMessageListener(MessageListenerConcurrently messageListener) {

  2.  2:     this.messageListener = messageListener;

  3.  3:     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);

  4.  4: }

  • 说明 :注册消息监听器。

5、PushConsumer 消息队列分配

RebalanceService

  1.  1: public class RebalanceService extends ServiceThread {

  2.  2:

  3.  3:     /**

  4.  4:      * 等待间隔,单位:毫秒

  5.  5:      */

  6.  6:     private static long waitInterval =

  7.  7:         Long.parseLong(System.getProperty(

  8.  8:             "rocketmq.client.rebalance.waitInterval", "20000"));

  9.  9:

  10. 10:     private final Logger log = ClientLogger.getLog();

  11. 11:     /**

  12. 12:      * MQClient对象

  13. 13:      */

  14. 14:     private final MQClientInstance mqClientFactory;

  15. 15:

  16. 16:     public RebalanceService(MQClientInstance mqClientFactory) {

  17. 17:         this.mqClientFactory = mqClientFactory;

  18. 18:     }

  19. 19:

  20. 20:     @Override

  21. 21:     public void run() {

  22. 22:         log.info(this.getServiceName() + " service started");

  23. 23:

  24. 24:         while (!this.isStopped()) {

  25. 25:             this.waitForRunning(waitInterval);

  26. 26:             this.mqClientFactory.doRebalance();

  27. 27:         }

  28. 28:

  29. 29:         log.info(this.getServiceName() + " service end");

  30. 30:     }

  31. 31:

  32. 32:     @Override

  33. 33:     public String getServiceName() {

  34. 34:         return RebalanceService.class.getSimpleName();

  35. 35:     }

  36. 36: }

  • 说明 :均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。


  • 第 26 行 :调用 MQClientInstance#doRebalance(...) 分配消息队列。目前有三种情况情况下触发:

    详细解析见:MQClientInstance#doRebalance(...)。


    • 如 第25行 等待超时,每 20s 调用一次。

    • PushConsumer 启动时,调用 rebalanceService#wakeup(...) 触发。

    • Broker 通知 Consumer 加入 或 移除时, Consumer 响应通知,调用 rebalanceService#wakeup(...) 触发。

MQClientInstance#doRebalance(...)

  1.  1: public void doRebalance() {

  2.  2:     for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {

  3.  3:         MQConsumerInner impl = entry.getValue();

  4.  4:         if (impl != null) {

  5.  5:             try {

  6.  6:                 impl.doRebalance();

  7.  7:             } catch (Throwable e) {

  8.  8:                 log.error("doRebalance exception", e);

  9.  9:             }

  10. 10:         }

  11. 11:     }

  12. 12: }

  • 说明 :遍历当前 Client 包含的 consumerTableConsumer集合 ),执行消息队列分配。

  • 疑问:目前代码调试下来, consumerTable 只包含 Consumer 自己。😈有大大对这个疑问有解答的,烦请解答下。

  • 第 6 行 :调用 MQConsumerInner#doRebalance(...) 进行队列分配。 DefaultMQPushConsumerImpl、 DefaultMQPullConsumerImpl 分别对该接口方法进行了实现。 DefaultMQPushConsumerImpl#doRebalance(...)详细解析见:DefaultMQPushConsumerImpl#doRebalance(...)。

DefaultMQPushConsumerImpl#doRebalance(...)

  1.  1: public void doRebalance() {

  2.  2:     if (!this.pause) {

  3.  3:         this.rebalanceImpl.doRebalance(this.isConsumeOrderly());

  4.  4:     }

  5.  5: }

  • 说明:执行消息队列分配。

  • 第 3 行 :调用 RebalanceImpl#doRebalance(...) 进行队列分配。详细解析见:RebalancePushImpl#doRebalance(...)。

RebalanceImpl#doRebalance(...)

  1.  1: /**

  2.  2:  * 执行分配消息队列

  3.  3:  *

  4.  4:  * @param isOrder 是否顺序消息

  5.  5:  */

  6.  6: public void doRebalance(final boolean isOrder) {

  7.  7:     // 分配每个 topic 的消息队列

  8.  8:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();

  9.  9:     if (subTable != null) {

  10. 10:         for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {

  11. 11:             final String topic = entry.getKey();

  12. 12:             try {

  13. 13:                 this.rebalanceByTopic(topic, isOrder);

  14. 14:             } catch (Throwable e) {

  15. 15:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

  16. 16:                     log.warn("rebalanceByTopic Exception", e);

  17. 17:                 }

  18. 18:             }

  19. 19:         }

  20. 20:     }

  21. 21:     // 移除未订阅的topic对应的消息队列

  22. 22:     this.truncateMessageQueueNotMyTopic();

  23. 23: }

  24. 24:

  25. 25: /**

  26. 26:  * 移除未订阅的消息队列

  27. 27:  */

  28. 28: private void truncateMessageQueueNotMyTopic() {

  29. 29:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();

  30. 30:     for (MessageQueue mq : this.processQueueTable.keySet()) {

  31. 31:         if (!subTable.containsKey(mq.getTopic())) {

  32. 32:

  33. 33:             ProcessQueue pq = this.processQueueTable.remove(mq);

  34. 34:             if (pq != null) {

  35. 35:                 pq.setDropped(true);

  36. 36:                 log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);

  37. 37:             }

  38. 38:         }

  39. 39:     }

  40. 40: }

  • #doRebalance(...) 说明 :执行分配消息队列。

    • 第 7 至 20 行 :循环订阅主题集合( subscriptionInner ),分配每一个 Topic 的消息队列。

    • 第 22 行 :移除未订阅的 Topic 的消息队列。

  • #truncateMessageQueueNotMyTopic(...) 说明 :移除未订阅的消息队列。当调用 DefaultMQPushConsumer#unsubscribe(topic) 时,只移除订阅主题集合( subscriptionInner ),对应消息队列移除在该方法。

RebalanceImpl#rebalanceByTopic(...)

  1.  1: private void rebalanceByTopic(final String topic, final boolean isOrder) {

  2.  2:     switch (messageModel) {

  3.  3:         case BROADCASTING: {

  4.  4:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

  5.  5:             if (mqSet != null) {

  6.  6:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);

  7.  7:                 if (changed) {

  8.  8:                     this.messageQueueChanged(topic, mqSet, mqSet);

  9.  9:                     log.info("messageQueueChanged {} {} {} {}", //

  10. 10:                         consumerGroup, //

  11. 11:                         topic, //

  12. 12:                         mqSet, //

  13. 13:                         mqSet);

  14. 14:                 }

  15. 15:             } else {

  16. 16:                 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);

  17. 17:             }

  18. 18:             break;

  19. 19:         }

  20. 20:         case CLUSTERING: {

  21. 21:             // 获取 topic 对应的 队列 和 consumer信息

  22. 22:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

  23. 23:             List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

  24. 24:             if (null == mqSet) {

  25. 25:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

  26. 26:                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);

  27. 27:                 }

  28. 28:             }

  29. 29:

  30. 30:             if (null == cidAll) {

  31. 31:                 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);

  32. 32:             }

  33. 33:

  34. 34:             if (mqSet != null && cidAll != null) {

  35. 35:                 // 排序 消息队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。

  36. 36:                 List<MessageQueue> mqAll = new ArrayList<>();

  37. 37:                 mqAll.addAll(mqSet);

  38. 38:

  39. 39:                 Collections.sort(mqAll);

  40. 40:                 Collections.sort(cidAll);

  41. 41:

  42. 42:                 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

  43. 43:

  44. 44:                 // 根据 队列分配策略 分配消息队列

  45. 45:                 List<MessageQueue> allocateResult;

  46. 46:                 try {

  47. 47:                     allocateResult = strategy.allocate(//

  48. 48:                         this.consumerGroup, //

  49. 49:                         this.mQClientFactory.getClientId(), //

  50. 50:                         mqAll, //

  51. 51:                         cidAll);

  52. 52:                 } catch (Throwable e) {

  53. 53:                     log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),

  54. 54:                         e);

  55. 55:                     return;

  56. 56:                 }

  57. 57:

  58. 58:                 Set<MessageQueue> allocateResultSet = new HashSet<>();

  59. 59:                 if (allocateResult != null) {

  60. 60:                     allocateResultSet.addAll(allocateResult);

  61. 61:                 }

  62. 62:

  63. 63:                 // 更新消息队列

  64. 64:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);

  65. 65:                 if (changed) {

  66. 66:                     log.info(

  67. 67:                         "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",

  68. 68:                         strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),

  69. 69:                         allocateResultSet.size(), allocateResultSet);

  70. 70:                     this.messageQueueChanged(topic, mqSet, allocateResultSet);

  71. 71:                 }

  72. 72:             }

  73. 73:             break;

  74. 74:         }

  75. 75:         default:

  76. 76:             break;

  77. 77:     }

  78. 78: }

  79. 79:

  80. 80: /**

  81. 81:  * 当负载均衡时,更新 消息处理队列

  82. 82:  * - 移除 在processQueueTable && 不存在于 mqSet 里的消息队列

  83. 83:  * - 增加 不在processQueueTable && 存在于mqSet 里的消息队列

  84. 84:  *

  85. 85:  * @param topic Topic

  86. 86:  * @param mqSet 负载均衡结果后的消息队列数组

  87. 87:  * @param isOrder 是否顺序

  88. 88:  * @return 是否变更

  89. 89:  */

  90. 90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {

  91. 91:     boolean changed = false;

  92. 92:

  93. 93:     // 移除 在processQueueTable && 不存在于 mqSet 里的消息队列

  94. 94:     Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();

  95. 95:     while (it.hasNext()) { // TODO 待读:

  96. 96:         Entry<MessageQueue, ProcessQueue> next = it.next();

  97. 97:         MessageQueue mq = next.getKey();

  98. 98:         ProcessQueue pq = next.getValue();

  99. 99:

  100. 100:         if (mq.getTopic().equals(topic)) {

  101. 101:             if (!mqSet.contains(mq)) { // 不包含的队列

  102. 102:                 pq.setDropped(true);

  103. 103:                 if (this.removeUnnecessaryMessageQueue(mq, pq)) {

  104. 104:                     it.remove();

  105. 105:                     changed = true;

  106. 106:                     log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);

  107. 107:                 }

  108. 108:             } else if (pq.isPullExpired()) { // 队列拉取超时,进行清理

  109. 109:                 switch (this.consumeType()) {

  110. 110:                     case CONSUME_ACTIVELY:

  111. 111:                         break;

  112. 112:                     case CONSUME_PASSIVELY:

  113. 113:                         pq.setDropped(true);

  114. 114:                         if (this.removeUnnecessaryMessageQueue(mq, pq)) {

  115. 115:                             it.remove();

  116. 116:                             changed = true;

  117. 117:                             log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",

  118. 118:                                 consumerGroup, mq);

  119. 119:                         }

  120. 120:                         break;

  121. 121:                     default:

  122. 122:                         break;

  123. 123:                 }

  124. 124:             }

  125. 125:         }

  126. 126:     }

  127. 127:

  128. 128:     // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。

  129. 129:     List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息请求数组

  130. 130:     for (MessageQueue mq : mqSet) {

  131. 131:         if (!this.processQueueTable.containsKey(mq)) {

  132. 132:             if (isOrder && !this.lock(mq)) {

  133. 133:                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);

  134. 134:                 continue;

  135. 135:             }

  136. 136:

  137. 137:             this.removeDirtyOffset(mq);

  138. 138:             ProcessQueue pq = new ProcessQueue();

  139. 139:             long nextOffset = this.computePullFromWhere(mq);

  140. 140:             if (nextOffset >= 0) {

  141. 141:                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);

  142. 142:                 if (pre != null) {

  143. 143:                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);

  144. 144:                 } else {

  145. 145:                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);

  146. 146:                     PullRequest pullRequest = new PullRequest();

  147. 147:                     pullRequest.setConsumerGroup(consumerGroup);

  148. 148:                     pullRequest.setNextOffset(nextOffset);

  149. 149:                     pullRequest.setMessageQueue(mq);

  150. 150:                     pullRequest.setProcessQueue(pq);

  151. 151:                     pullRequestList.add(pullRequest);

  152. 152:                     changed = true;

  153. 153:                 }

  154. 154:             } else {

  155. 155:                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);

  156. 156:             }

  157. 157:         }

  158. 158:     }

  159. 159:

  160. 160:     // 发起消息拉取请求

  161. 161:     this.dispatchPullRequest(pullRequestList);

  162. 162:

  163. 163:     return changed;

  164. 164: }

  • #rebalanceByTopic(...) 说明 :分配 Topic 的消息队列。

    • 第 21 至 40 行 :获取 Topic 对应的消息队列和消费者们,并对其进行排序。因为各 Consumer 是在本地分配消息队列,排序后才能保证各 Consumer 顺序一致。

    • 第 42 至 61 行 :根据 队列分配策略( AllocateMessageQueueStrategy ) 分配消息队列。详细解析见:AllocateMessageQueueStrategy。

    • 第 63 至 72 行 :更新 Topic 对应的消息队列。

    • 第 3 至 19 行 :广播模式( BROADCASTING ) 下,分配 Topic 对应的所有消息队列。

    • 第 20 至 74 行 :集群模式( CLUSTERING ) 下,分配 Topic 对应的部分消息队列。

  • #updateProcessQueueTableInRebalance(...) 说明 :当分配队列时,更新 Topic 对应的消息队列,并返回是否有变更。

    • 第 132 至 135 行 : 顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。

    • 第 137 行 :移除消息队列的消费进度。

    • 第 139 行 :获取队列消费进度。详细解析见:RebalancePushImpl#computePullFromWhere(...)。

    • 第 140 至 156 行 :添加新消费处理队列,添加消费拉取消息请求

    • 第 103 行 :移除不需要的消息队列。详细解析见:RebalancePushImpl#removeUnnecessaryMessageQueue(...)。

    • 第 108 至 120 行 :队列拉取超时,即 当前时间-最后一次拉取消息时间>120s ( 120s 可配置),判定发生 BUG,过久未进行消息拉取,移除消息队列。移除后,下面#新增队列逻辑#可以重新加入新的该消息队列。

    • 第 93 至 126 行 :移除不存在于分配的消息队列( mqSet ) 的 消息处理队列( processQueueTable )。

    • 第 128 至 158 行 :增加 分配的消息队列( mqSet ) 新增的消息队列。

    • 第 161 行 :发起新增的消息队列消息拉取请求。详细解析见:RebalancePushImpl#dispatchPullRequest(...)。

RebalanceImpl#removeUnnecessaryMessageQueue(...)

RebalancePushImpl#removeUnnecessaryMessageQueue(...)

  1.  1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {

  2.  2:     // 同步队列的消费进度,并移除之。

  3.  3:     this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);

  4.  4:     this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);

  5.  5:     // TODO 顺序消费

  6.  6:     if (this.defaultMQPushConsumerImpl.isConsumeOrderly()

  7.  7:         && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {

  8.  8:         try {

  9.  9:             if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {

  10. 10:                 try {

  11. 11:                     return this.unlockDelay(mq, pq);

  12. 12:                 } finally {

  13. 13:                     pq.getLockConsume().unlock();

  14. 14:                 }

  15. 15:             } else {

  16. 16:                 log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //

  17. 17:                     mq, //

  18. 18:                     pq.getTryUnlockTimes());

  19. 19:

  20. 20:                 pq.incTryUnlockTimes();

  21. 21:             }

  22. 22:         } catch (Exception e) {

  23. 23:             log.error("removeUnnecessaryMessageQueue Exception", e);

  24. 24:         }

  25. 25:

  26. 26:         return false;

  27. 27:     }

  28. 28:     return true;

  29. 29: }

  • 说明 :移除不需要的消息队列相关的信息,并返回是否移除成功。

  • 第 2 至 4 行 :同步队列的消费进度,并移除之。

  • 第 5 至 27 行 : 顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。

[PullConsumer] RebalancePullImpl#removeUnnecessaryMessageQueue(...)

  1.  1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {

  2.  2:     this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);

  3.  3:     this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);

  4.  4:     return true;

  5.  5: }

  • 说明 :移除不需要的消息队列相关的信息,并返回移除成功。和 RebalancePushImpl#removeUnnecessaryMessageQueue(...)基本一致。

RebalancePushImpl#dispatchPullRequest(...)

  1.  1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {

  2.  2:     for (PullRequest pullRequest : pullRequestList) {

  3.  3:         this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);

  4.  4:         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);

  5.  5:     }

  6.  6: }

  • 说明 :发起消息拉取请求。该调用是 PushConsumer不断不断不断拉取消息的起点

DefaultMQPushConsumerImpl#executePullRequestImmediately(...)

  1.  1: public void executePullRequestImmediately(final PullRequest pullRequest) {

  2.  2:     this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);

  3.  3: }

  • 说明 :提交拉取请求。提交后, PullMessageService 异步执行非阻塞。详细解析见:PullMessageService。

AllocateMessageQueueStrategy

AllocateMessageQueueAveragely

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :平均分配队列策略。

  • 第 7 至 25 行 :参数校验。

  • 第 26 至 36 行 :平均分配消息队列。

    • [0,mod) : mqAll.size()/cidAll.size()+1。前面 mod 个 Consumer 平分余数,多获得 1 个消息队列。

    • [mod,cidAll.size()) : mqAll.size()/cidAll.size()

    • 第 27 行 : index :当前 Consumer 在消费集群里是第几个。这里就是为什么需要对传入的 cidAll 参数必须进行排序的原因。如果不排序, Consumer 在本地计算出来的 index 55 56342 55 31370 0 0 5911 0 0:00:09 0:00:05 0:00:04 6126 无法一致,影响计算结果。

    • 第 28 行 : mod :余数,即多少消息队列无法平均分配。

    • 第 29 至 31 行 : averageSize :代码可以简化成 (mod>0&&index<mod?mqAll.size()/cidAll.size()+1:mqAll.size()/cidAll.size())

    • 第 32 行 : startIndex : Consumer 分配消息队列开始位置。

    • 第 33 行 : range :分配队列数量。之所以要 Math#min(...) 的原因:当 mqAll.size()<=cidAll.size() 时,最后几个 Consumer 分配不到消息队列。

    • 第 34 至 36 行 :生成分配消息队列结果。

  • 举个例子:

固定消息队列长度为4


Consumer * 2 可以整除Consumer * 3 不可整除Consumer * 5 无法都分配
消息队列[0]Consumer[0]Consumer[0]Consumer[0]
消息队列[1]Consumer[0]Consumer[0]Consumer[1]
消息队列[2]Consumer[1]Consumer[1]Consumer[2]
消息队列[3]Consumer[1]Consumer[2]Consumer[3]

AllocateMessageQueueByMachineRoom

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :平均分配可消费的 Broker 对应的消息队列。

  • 第 7 至 15 行 :参数校验。

  • 第 16 至 23 行 :计算可消费的 Broker 对应的消息队列。

  • 第 25 至 34 行 :平均分配消息队列。该平均分配方式和 AllocateMessageQueueAveragely 略有不同,其是将多余的结尾部分分配给前 rem 个 Consumer

  • 疑问:使用该分配策略时, Consumer 和 Broker 分配需要怎么配置。😈等研究主从相关源码时,仔细考虑下。

AllocateMessageQueueAveragelyByCircle

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :环状分配消息队列。

AllocateMessageQueueByConfig

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :分配配置的消息队列。

  • 疑问 :该分配策略的使用场景。

5、PushConsumer 消费进度读取

RebalancePushImpl#computePullFromWhere(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :计算消息队列开始消费位置。

  • PushConsumer 读取消费进度有三种选项:

    • CONSUME_FROM_LAST_OFFSET :第 6 至 29 行 :一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费

    • CONSUME_FROM_FIRST_OFFSET :第 30 至 40 行 :一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费

    • CONSUME_FROM_TIMESTAMP :第 41 至 65 行 :一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费

[PullConsumer] RebalancePullImpl#computePullFromWhere(...)

暂时跳过。😈

6、PushConsumer 拉取消息

PullMessageService

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :拉取消息服务,不断不断不断从 Broker 拉取消息,并提交消费任务到 ConsumeMessageService

  • #executePullRequestLater(...) :第 26 至 40 行 : 提交延迟拉取消息请求。

  • #executePullRequestImmediately(...) :第 42 至 53 行 :提交立即拉取消息请求。

  • #executeTaskLater(...) :第 55 至 63 行 :提交延迟任务

  • #pullMessage(...) :第 69 至 82 行 :执行拉取消息逻辑。详细解析见:DefaultMQPushConsumerImpl#pullMessage(...)。

  • #run(...) :第 84 至 101 行 :循环拉取消息请求队列( pullRequestQueue ),进行消息拉取。

DefaultMQPushConsumerImpl#pullMessage(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • #pullMessage(...) 说明 :拉取消息。

    • 执行消息拉取异步请求。详细解析见:PullAPIWrapper#pullKernelImpl(...)。

    • 当发起请求产生异常时,提交延迟拉取消息请求。对应 Broker 处理拉取消息逻辑见:PullMessageProcessor#processRequest(...)。

    • 第 3 至 6 行 :消息处理队列已经终止,不进行消息拉取。

    • 第 9 行 :设置消息处理队列最后拉取消息时间。

    • 第 11 至 18 行 : Consumer 未处于运行中状态,不进行消息拉取,提交延迟拉取消息请求。

    • 第 20 至 25 行 : Consumer 处于暂停中,不进行消息拉取,提交延迟拉取消息请求。

    • 第 27 至 37 行 :消息处理队列持有消息超过最大允许值(默认:1000条),不进行消息拉取,提交延迟拉取消息请求。

    • 第 39 至 49 行 : Consumer 为并发消费 并且 消息队列持有消息跨度过大(消息跨度 = 持有消息最后一条和第一条的消息位置差,默认:2000),不进行消息拉取,提交延迟拉取消息请求。

    • 第 50 至 70 行 : 顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。

    • 第 72 至 78 行 : Topic 对应的订阅信息不存在,不进行消息拉取,提交延迟拉取消息请求。

    • 第 222 至 224 行 :判断请求是否使用 Consumer 本地的订阅信息( SubscriptionData ),而不使用 Broker 里的订阅信息。详细解析见:PullMessageProcessor#processRequest(...) 第 64 至 110 行代码。

    • 第 226 行 :是否开启过滤类过滤模式。详细解析见:《RocketMQ 源码分析 —— Filtersrv》。

    • 第 229 至 235 行 :计算拉取消息请求系统标识。详细解析见:PullMessageRequestHeader.sysFlag。

    • 第 237 至 255 行 :

  • PullCallback :拉取消息回调:

  • 第 86 行 :处理拉取结果。详细逻辑见:PullAPIWrapper#processPullResult(...)。

  • 第 89 至 192 行 :处理拉取状态结果: * 第 90 至 139 行 :拉取到消息( FOUND ) : * 第 91 至 93 行 :设置下次拉取消息队列位置。 * 第 95 至 97 行 :统计。 * 第 101 至 102 行 :拉取到消息的消息列表为空,提交立即拉取消息请求。为什么会存在拉取到消息,但是消息结果未空呢?原因见:PullAPIWrapper#processPullResult(...)。 * 第 106 至 108 行 :统计。 * 第 111 行 :提交拉取到的消息到消息处理队列。详细解析见:ProcessQueue#putMessage(...)。 * 第 113 至 118 行 :提交消费请求到 ConsumeMessageService。详细解析见:ConsumeMessageConcurrentlyService。 * 第 120 至 126 行 :根据拉取频率( pullInterval ),提交立即或者延迟拉取消息请求。默认拉取频率为 0ms ,提交立即拉取消息请求。 * 第 129 至 137 行 :下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置,则判定为BUG,输出警告日志。

    • 第 140 至 149 行 :没有新消息( NO_NEW_MSG ) : * 第 142 行 : 设置下次拉取消息队列位置。 * 第 145 行 :更正消费进度。详细解析见: #correctTagsOffset(...)。 * 第 148 行 :提交立即拉取消息请求。

    • 第 150 至 159 行 :有新消息但是不匹配( NO_MATCHED_MSG )。逻辑同 NO_NEW_MSG 。

    • 第 160 至 189 行 :拉取请求的消息队列位置不合法 ( OFFSET_ILLEGAL)。 * 第 164 行 :设置下次拉取消息队列位置。 * 第 167 行 :设置消息处理队列为 dropped。 * 第 169 至 188 行 :提交延迟任务,进行队列移除。 * 第 175 至 178 行 :更新消费进度,同步消费进度到 Broker。 * 第 181 行 :移除消费处理队列。 * 疑问:为什么不立即移除???

  • 第 196 至 204 行 :发生异常,提交延迟拉取消息请求。

  • #correctTagsOffset(...) :更正消费进度。

    • 第 258 至 261 行 : 当消费处理队列持有消息数量为 0 时,更新消费进度为拉取请求的拉取消息队列位置。

PullAPIWrapper#pullKernelImpl(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :拉取消息核心方法。该方法参数较多,可以看下代码注释上每个参数的说明😈。

  • 第 34 至 43 行 :获取 Broker 信息( Broker 地址、是否为从节点)。

    • #recalculatePullFromWhichNode(...)

    • #MQClientInstance#findBrokerAddressInSubscribe(...)

  • 第 45 至 78 行 :请求拉取消息

  • 第 81 行 :当 Broker 信息不存在,则抛出异常。

PullAPIWrapper#recalculatePullFromWhichNode(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :计算消息队列拉取消息对应的 Broker 编号。

MQClientInstance#findBrokerAddressInSubscribe(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :获取 Broker 信息( Broker 地址、是否为从节点)。

PullAPIWrapper#processPullResult(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :处理拉取结果。

    • 更新消息队列拉取消息 Broker 编号的映射。

    • 解析消息,并根据订阅信息消息 tagCode匹配合适消息。

  • 第 16 行 :更新消息队列拉取消息 Broker 编号的映射。下次拉取消息时,如果未设置默认拉取的 Broker 编号,会使用更新后的 Broker 编号。

  • 第 18 至 55 行 :解析消息,并根据订阅信息消息 tagCode 匹配合适消息。

    • 第 20 至 22 行 :解析消息。详细解析见:《RocketMQ 源码分析 —— Message基础》 。

    • 第 24 至 35 行 :根据订阅信息 tagCode 匹配消息。

    • 第 37 至 43 行 : Hook

    • 第 45 至 51 行 :设置消息队列当前最小/最大位置到消息拓展字段。

    • 第 54 行 :设置消息队列。

  • 第 58 行 :清空消息二进制数组。

ProcessQueue#putMessage(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

总结

如果用最简单粗暴的方式描述 PullConsumer 拉取消息的过程,那就是如下的代码:

  1. while (true) {

  2.    if (不满足拉取消息) {

  3.        Thread.sleep(间隔);

  4.        continue;

  5.    }

  6.    主动拉取消息();

  7. }

6、PushConsumer 消费消息

ConsumeMessageConcurrentlyService 提交消费请求

ConsumeMessageConcurrentlyService#submitConsumeRequest(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :提交立即消费请求。

  • 第 16 至 22 行 :提交消息小于等于批量消费数,直接提交消费请求。

  • 第 23 至 47 行 :当提交消息大于批量消费数,进行分拆成多个请求。

    • 第 25 至 33 行 :计算当前拆分请求包含的消息。

    • 第 35 至 38 行 :提交拆分消费请求。

    • 第 39 至 44 行 :提交请求被拒绝,则将当前拆分消息 + 剩余消息提交延迟消费请求,结束拆分循环。

ConsumeMessageConcurrentlyService#submitConsumeRequestLater

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :提交延迟消费请求。

  • 第 34 行 :直接调用 ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);。如果消息数超过批量消费上限,会不会是BUG

ConsumeRequest

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :消费请求。提交请求执行消费。

  • 第 24 至 28 行 :废弃处理队列不进行消费。

  • 第 34 至 44 行 :Hook。

  • 第 51 行 :当消息为重试消息,设置 Topic为原始 Topic。例如:原始 Topic 为 TopicTest,重试时 Topic 为 %RETRY%please_rename_unique_group_name_4,经过该方法, Topic 设置回 TopicTest

  • 第 53 至 58 行 :设置开始消费时间。

  • 第 61 行 :进行消费

  • 第 71 至 85 行 :解析消费返回结果类型

  • 第 87 至 90 行 : Hook

  • 第 92 至 99 行 :消费结果状态未空时,则设置消费结果状态为稍后消费。

  • 第 101 至 106 行 : Hook

  • 第 108 至 110 行 :统计。

  • 第 112 至 117 行 :处理消费结果。如果消费处理队列被移除,恰好消息被消费,则可能导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。详细解析见:ConsumeMessageConcurrentlyService#processConsumeResult(...)。

ConsumeMessageConcurrentlyService#processConsumeResult(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :处理消费结果。

  • 第 8 至 10 行 :消费请求消息未空时,直接返回。

  • 第 12 至 32 行 :计算 ackIndex 值。 consumeRequest.msgs[0-ackIndex]为消费成功,需要进行 ack 确认。

    • 第 14 至 23 行 : CONSUME_SUCCESS : ackIndex=context.getAckIndex()

    • 第 24 至 29 行 : RECONSUME_LATER : ackIndex=-1

  • 第34 至 63 行 :处理消费失败的消息。

    • 第 43 至 52 行 :发回消费失败的消息到 Broker。详细解析见:DefaultMQPushConsumerImpl#sendMessageBack(...)。

    • 第 54 至 59 行 :发回 Broker 失败的消息,直接提交延迟重新消费。

    • 如果发回 Broker 成功,结果因为例如网络异常,导致 Consumer以为发回失败,判定消费发回失败,会导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。

    • 第 36 至 41 行 : BROADCASTING :广播模式,无论是否消费失败,不发回消息到 Broker,只打印日志。

    • 第 42 至 60 行 : CLUSTERING :集群模式,消费失败的消息发回到 Broker

  • 第 65 至 69 行 :移除【消费成功】【消费失败但发回 Broker成功】的消息,并更新最新消费进度。

    • 为什么会有【消费失败但发回 Broker成功】的消息?见第 56 行

    • ProcessQueue#removeMessage(...)

ProcessQueue#removeMessage(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

ConsumeMessageConcurrentlyService#cleanExpireMsg(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :定时清理过期消息,默认周期:15min。

ProcessQueue#cleanExpiredMsg(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :移除过期消息。

  • 第 2 至 5 行 :顺序消费时,直接返回。

  • 第 7 至 9 行 :循环移除消息。默认最大循环次数:16次。

  • 第 10 至 25 行 :获取第一条消息。判断是否超时,若不超时,则结束循环。

  • 第 29 行 :发回超时消息到 Broker

  • 第 32 至 48 行 :判断此时消息是否依然是第一条,若是,则进行移除。

7、PushConsumer 发回消费失败消息

DefaultMQPushConsumerImpl#sendMessageBack(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :发回消息。

  • 第 4 至 8 行 : Consumer 发回消息。详细解析见:MQClientAPIImpl#consumerSendMessageBack(...)。

  • 第 10 至 25 行 :发生异常时, Consumer 内置默认 Producer 发送消息。

    • 😈疑问:什么样的情况下会发生异常呢?

MQClientAPIImpl#consumerSendMessageBack(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

8、Consumer 消费进度

OffsetStore

  • RemoteBrokerOffsetStore : Consumer 集群模式 下,使用远程 Broker 消费进度。

  • LocalFileOffsetStore : Consumer 广播模式下,使用本地 文件 消费进度。

OffsetStore#load(...)

LocalFileOffsetStore#load(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :从本地文件加载消费进度到内存。

OffsetSerializeWrapper

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :本地 Offset 存储序列化。

  1. Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json

  2. {

  3.    "offsetTable":{{

  4.            "brokerName":"broker-a",

  5.            "queueId":3,

  6.            "topic":"TopicTest"

  7.        }:1470,{

  8.            "brokerName":"broker-a",

  9.            "queueId":2,

  10.            "topic":"TopicTest"

  11.        }:1471,{

  12.            "brokerName":"broker-a",

  13.            "queueId":1,

  14.            "topic":"TopicTest"

  15.        }:1470,{

  16.            "brokerName":"broker-a",

  17.            "queueId":0,

  18.            "topic":"TopicTest"

  19.        }:1470

  20.    }

  21. }

RemoteBrokerOffsetStore#load(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :不进行加载,实际读取消费进度时,从 Broker 获取。

OffsetStore#readOffset(...)

读取消费进度类型:

  • READ_FROM_MEMORY :从内存读取。

  • READ_FROM_STORE :从存储( Broker 或 文件 )读取。

  • MEMORY_FIRST_THEN_STORE :优先从内存读取,读取不到,从存储读取。

LocalFileOffsetStore#readOffset(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 第 16 行 :从 文件 读取消费进度。

RemoteBrokerOffsetStore#readOffset(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 第 16 行 :从 Broker 读取消费进度。

OffsetStore#updateOffset(...)

该方法 RemoteBrokerOffsetStoreLocalFileOffsetStore 实现相同。

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

OffsetStore#persistAll(...)

LocalFileOffsetStore#persistAll(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :持久化消费进度。将消费进度写入文件

RemoteBrokerOffsetStore#persistAll(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :持久化指定消息队列数组的消费进度到 Broker,并移除非指定消息队列。

MQClientInstance#persistAllConsumerOffset(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :定时进行持久化,默认周期:5000ms。

  • 重要说明 :

    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。

    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。

    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。

9、结尾

😈可能是本系列最长的一篇文章,如有表达错误和不清晰,请多多见谅。
感谢对本系列的阅读、收藏、点赞、分享,特别是翻到结尾。😜真的有丢丢长。


本文转载自「芋道源码」 

http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

推荐阅读

那些有趣又实用的开源人工智能项目 Top 10

自学编程需要注意什么?

Spring干货汇总(含Spring Boot与Spring Cloud)

谷歌大神为你解释Kubernetes, 微服务和容器化

我最常用的Intellij IDEA快捷键

最好用的 IntelliJ 插件 Top 10


点击 “阅读原文” 看看本号其他精彩内容

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存