查看原文
其他

分布式消息队列 RocketMQ 源码分析:定时消息与消息重试

2018-02-11 芋道源码 程序猿DD

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-schedule-and-retry/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 RocketMQ 4.0.x 正式版

  • 1. 概述

  • 2. 定时消息

    • 2.1 延迟级别

    • 2.2 Producer 发送定时消息

    • 2.3 Broker 存储定时消息

    • 2.4 Broker 发送定时消息

    • 2.5 Broker 持久化定时发送进度

  • 3. 消息重试

1. 概述

建议前置阅读内容:

  • 《RocketMQ 源码分析 —— Message 发送与接收》

  • 《RocketMQ 源码分析 —— Message 拉取与消费(下)》

😈 为什么把定时消息消息重试放在一起?你猜。 👻 你猜我猜不猜。

2. 定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

下图是定时消息的处理逻辑图:

2.1 延迟级别

RocketMQ 目前只支持固定精度的定时消息。官方说法如下:

如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

  • 延迟级别


核心源码如下:


  1.   1: // ⬇️⬇️⬇️【MessageStoreConfig.java】

  2.   2: /**

  3.   3:  * 消息延迟级别字符串配置

  4.   4:  */

  5.   5: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

  6.   6:

  7.   7: // ⬇️⬇️⬇️【ScheduleMessageService.java】

  8.   8: /**

  9.   9:  * 解析延迟级别

  10.  10:  *

  11.  11:  * @return 是否解析成功

  12.  12:  */

  13.  13: public boolean parseDelayLevel() {

  14.  14:     HashMap<String, Long> timeUnitTable = new HashMap<>();

  15.  15:     timeUnitTable.put("s", 1000L);

  16.  16:     timeUnitTable.put("m", 1000L * 60);

  17.  17:     timeUnitTable.put("h", 1000L * 60 * 60);

  18.  18:     timeUnitTable.put("d", 1000L * 60 * 60 * 24);

  19.  19:

  20.  20:     String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();

  21.  21:     try {

  22.  22:         String[] levelArray = levelString.split(" ");

  23.  23:         for (int i = 0; i < levelArray.length; i++) {

  24.  24:             String value = levelArray[i];

  25.  25:             String ch = value.substring(value.length() - 1);

  26.  26:             Long tu = timeUnitTable.get(ch);

  27.  27:

  28.  28:             int level = i + 1;

  29.  29:             if (level > this.maxDelayLevel) {

  30.  30:                 this.maxDelayLevel = level;

  31.  31:             }

  32.  32:             long num = Long.parseLong(value.substring(0, value.length() - 1));

  33.  33:             long delayTimeMillis = tu * num;

  34.  34:             this.delayLevelTable.put(level, delayTimeMillis);

  35.  35:         }

  36.  36:     } catch (Exception e) {

  37.  37:         log.error("parseDelayLevel exception", e);

  38.  38:         log.info("levelString String = {}", levelString);

  39.  39:         return false;

  40.  40:     }

  41.  41:

  42.  42:     return true;

  43.  43: }


2.2 Producer 发送定时消息

  • 🦅发送时,设置消息的延迟级别

  1. Message msg = new Message(...);

  2. msg.setDelayTimeLevel(level);

2.3 Broker 存储定时消息

  • 🦅 存储消息时,延迟消息进入 Topic 为 SCHEDULE_TOPIC_XXXX

  • 🦅 延迟级别 与 消息队列编号 做固定映射:QueueId = DelayLevel - 1

核心代码如下:

  1. 1: // ⬇️⬇️⬇️【CommitLog.java】

  2. 2: /**

  3. 3:  * 添加消息,返回消息结果

  4. 4:  *

  5. 5:  * @param msg 消息

  6. 6:  * @return 结果

  7. 7:  */

  8. 8: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

  9. 9:     // ....(省略代码)

  10. 10:

  11. 11:     // 定时消息处理

  12. 12:     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());

  13. 13:     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//

  14. 14:         || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {

  15. 15:         // Delay Delivery

  16. 16:         if (msg.getDelayTimeLevel() > 0) {

  17. 17:             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {

  18. 18:                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());

  19. 19:             }

  20. 20:

  21. 21:             // 存储消息时,延迟消息进入 `Topic` 为 `SCHEDULE_TOPIC_XXXX` 。

  22. 22:             topic = ScheduleMessageService.SCHEDULE_TOPIC;

  23. 23:

  24. 24:             // 延迟级别 与 消息队列编号 做固定映射

  25. 25:             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

  26. 26:

  27. 27:             // Backup real topic, queueId

  28. 28:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());

  29. 29:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));

  30. 30:             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

  31. 31:

  32. 32:             msg.setTopic(topic);

  33. 33:             msg.setQueueId(queueId);

  34. 34:         }

  35. 35:     }

  36. 36:

  37. 37:     // ....(省略代码)

  38. 38: }

  39. 39:

  40. 40: // ⬇️⬇️⬇️【ScheduleMessageService.java】

  41. 41: /**

  42. 42:  * 根据 延迟级别 计算 消息队列编号

  43. 43:  * QueueId = DelayLevel - 1

  44. 44:  *

  45. 45:  * @param delayLevel 延迟级别

  46. 46:  * @return 消息队列编号

  47. 47:  */

  48. 48: public static int delayLevel2QueueId(final int delayLevel) {

  49. 49:     return delayLevel - 1;

  50. 50: }


  • 🦅 生成 ConsumeQueue 时,每条消息的 tagsCode 使用【消息计划消费时间】。这样, ScheduleMessageService在轮询 ConsumeQueue 时,可以使用 tagsCode 进行过滤。

核心代码如下:

  1. 1: // ⬇️⬇️⬇️【CommitLog.java】

  2. 2: /**

  3. 3:  * check the message and returns the message size

  4. 4:  *

  5. 5:  * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure

  6. 6:  */

  7. 7: public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {

  8. 8:     try {

  9. 9:         // // ....(省略代码)

  10. 10:

  11. 11:         // 17 properties

  12. 12:         short propertiesLength = byteBuffer.getShort();

  13. 13:         if (propertiesLength > 0) {

  14. 14:             // ....(省略代码)

  15. 15:             String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);

  16. 16:             if (tags != null && tags.length() > 0) {

  17. 17:                 tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);

  18. 18:             }

  19. 19:

  20. 20:             // Timing message processing

  21. 21:             {

  22. 22:                 String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);

  23. 23:                 if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {

  24. 24:                     int delayLevel = Integer.parseInt(t);

  25. 25:

  26. 26:                     if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {

  27. 27:                         delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();

  28. 28:                     }

  29. 29:

  30. 30:                     if (delayLevel > 0) {

  31. 31:                         tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,

  32. 32:                             storeTimestamp);

  33. 33:                     }

  34. 34:                 }

  35. 35:             }

  36. 36:         }

  37. 37:

  38. 38:         // ....(省略代码)

  39. 39:

  40. 40:         return new DispatchRequest(//

  41. 41:             topic, // 1

  42. 42:             queueId, // 2

  43. 43:             physicOffset, // 3

  44. 44:             totalSize, // 4

  45. 45:             tagsCode, // 5

  46. 46:             storeTimestamp, // 6

  47. 47:             queueOffset, // 7

  48. 48:             keys, // 8

  49. 49:             uniqKey, //9

  50. 50:             sysFlag, // 9

  51. 51:             preparedTransactionOffset// 10

  52. 52:         );

  53. 53:     } catch (Exception e) {

  54. 54:     }

  55. 55:

  56. 56:     return new DispatchRequest(-1, false /* success */);

  57. 57: }

  58. 58:

  59. 59: // ⬇️⬇️⬇️【ScheduleMessageService.java】

  60. 60: /**

  61. 61:  * 计算 投递时间【计划消费时间】

  62. 62:  *

  63. 63:  * @param delayLevel 延迟级别

  64. 64:  * @param storeTimestamp 存储时间

  65. 65:  * @return 投递时间【计划消费时间】

  66. 66:  */

  67. 67: public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {

  68. 68:     Long time = this.delayLevelTable.get(delayLevel);

  69. 69:     if (time != null) {

  70. 70:         return time + storeTimestamp;

  71. 71:     }

  72. 72:

  73. 73:     return storeTimestamp + 1000;

  74. 74: }

2.4 Broker 发送定时消息

  • 🦅 对 SCHEDULE_TOPIC_XXXX 每条消费队列对应单独一个定时任务进行轮询,发送 到达投递时间【计划消费时间】 的消息。

下图是发送定时消息的处理逻辑图:

实现代码如下:

  1.  1: /**

  2.  2:  * ⬇️⬇️⬇️ 发送(投递)延迟消息定时任务

  3.  3:  */

  4.  4: class DeliverDelayedMessageTimerTask extends TimerTask {

  5.  5:     /**

  6.  6:      * 延迟级别

  7.  7:      */

  8.  8:     private final int delayLevel;

  9.  9:     /**

  10. 10:      * 位置

  11. 11:      */

  12. 12:     private final long offset;

  13. 13:

  14. 14:     public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {

  15. 15:         this.delayLevel = delayLevel;

  16. 16:         this.offset = offset;

  17. 17:     }

  18. 18:

  19. 19:     @Override

  20. 20:     public void run() {

  21. 21:         try {

  22. 22:             this.executeOnTimeup();

  23. 23:         } catch (Exception e) {

  24. 24:             // XXX: warn and notify me

  25. 25:             log.error("ScheduleMessageService, executeOnTimeup exception", e);

  26. 26:             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(

  27. 27:                 this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);

  28. 28:         }

  29. 29:     }

  30. 30:

  31. 31:     /**

  32. 32:      * 纠正可投递时间。

  33. 33:      * 因为发送级别对应的发送间隔可以调整,如果超过当前间隔,则修正成当前配置,避免后面的消息无法发送。

  34. 34:      *

  35. 35:      * @param now 当前时间

  36. 36:      * @param deliverTimestamp 投递时间

  37. 37:      * @return 纠正结果

  38. 38:      */

  39. 39:     private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {

  40. 40:         long result = deliverTimestamp;

  41. 41:

  42. 42:         long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);

  43. 43:         if (deliverTimestamp > maxTimestamp) {

  44. 44:             result = now;

  45. 45:         }

  46. 46:

  47. 47:         return result;

  48. 48:     }

  49. 49:

  50. 50:     public void executeOnTimeup() {

  51. 51:         ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,  delayLevel2QueueId(delayLevel));

  52. 52:

  53. 53:         long failScheduleOffset = offset;

  54. 54:

  55. 55:         if (cq != null) {

  56. 56:             SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);

  57. 57:             if (bufferCQ != null) {

  58. 58:                 try {

  59. 59:                     long nextOffset = offset;

  60. 60:                     int i = 0;

  61. 61:                     for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {

  62. 62:                         long offsetPy = bufferCQ.getByteBuffer().getLong();

  63. 63:                         int sizePy = bufferCQ.getByteBuffer().getInt();

  64. 64:                         long tagsCode = bufferCQ.getByteBuffer().getLong();

  65. 65:

  66. 66:                         long now = System.currentTimeMillis();

  67. 67:                         long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

  68. 68:

  69. 69:                         nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

  70. 70:

  71. 71:                         long countdown = deliverTimestamp - now;

  72. 72:

  73. 73:                         if (countdown <= 0) { // 消息到达可发送时间

  74. 74:                             MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);

  75. 75:                             if (msgExt != null) {

  76. 76:                                 try {

  77. 77:                                     // 发送消息

  78. 78:                                     MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);

  79. 79:                                     PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);

  80. 80:                                     if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功

  81. 81:                                         continue;

  82. 82:                                     } else { // 发送失败

  83. 83:                                         // XXX: warn and notify me

  84. 84:                                         log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());

  85. 85:

  86. 86:                                         // 安排下一次任务

  87. 87:                                         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);

  88. 88:

  89. 89:                                         // 更新进度

  90. 90:                                         ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);

  91. 91:                                         return;

  92. 92:                                     }

  93. 93:                                 } catch (Exception e) {

  94. 94:                                     // XXX: warn and notify me

  95. 95:                                     log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="

  96. 96:                                             + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);

  97. 97:                                 }

  98. 98:                             }

  99. 99:                         } else {

  100. 100:                             // 安排下一次任务

  101. 101:                             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);

  102. 102:

  103. 103:                             // 更新进度

  104. 104:                             ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);

  105. 105:                             return;

  106. 106:                         }

  107. 107:                     } // end of for

  108. 108:

  109. 109:                     nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

  110. 110:

  111. 111:                     // 安排下一次任务

  112. 112:                     ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);

  113. 113:

  114. 114:                     // 更新进度

  115. 115:                     ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);

  116. 116:                     return;

  117. 117:                 } finally {

  118. 118:                     bufferCQ.release();

  119. 119:                 }

  120. 120:             } // end of if (bufferCQ != null)

  121. 121:             else { // 消费队列已经被删除部分,跳转到最小的消费进度

  122. 122:                 long cqMinOffset = cq.getMinOffsetInQueue();

  123. 123:                 if (offset < cqMinOffset) {

  124. 124:                     failScheduleOffset = cqMinOffset;

  125. 125:                     log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="

  126. 126:                         + cqMinOffset + ", queueId=" + cq.getQueueId());

  127. 127:                 }

  128. 128:             }

  129. 129:         } // end of if (cq != null)

  130. 130:

  131. 131:         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);

  132. 132:     }

  133. 133:

  134. 134:     /**

  135. 135:      * 设置消息内容

  136. 136:      *

  137. 137:      * @param msgExt 消息

  138. 138:      * @return 消息

  139. 139:      */

  140. 140:     private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {

  141. 141:         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();

  142. 142:         msgInner.setBody(msgExt.getBody());

  143. 143:         msgInner.setFlag(msgExt.getFlag());

  144. 144:         MessageAccessor.setProperties(msgInner, msgExt.getProperties());

  145. 145:

  146. 146:         TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());

  147. 147:         long tagsCodeValue =

  148. 148:             MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());

  149. 149:         msgInner.setTagsCode(tagsCodeValue);

  150. 150:         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

  151. 151:

  152. 152:         msgInner.setSysFlag(msgExt.getSysFlag());

  153. 153:         msgInner.setBornTimestamp(msgExt.getBornTimestamp());

  154. 154:         msgInner.setBornHost(msgExt.getBornHost());

  155. 155:         msgInner.setStoreHost(msgExt.getStoreHost());

  156. 156:         msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());

  157. 157:

  158. 158:         msgInner.setWaitStoreMsgOK(false);

  159. 159:         MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

  160. 160:

  161. 161:         msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));

  162. 162:

  163. 163:         String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);

  164. 164:         int queueId = Integer.parseInt(queueIdStr);

  165. 165:         msgInner.setQueueId(queueId);

  166. 166:

  167. 167:         return msgInner;

  168. 168:     }

  169. 169: }

2.5 Broker 持久化定时发送进度

  • 🦅 定时消息发送进度存储在文件( ../config/delayOffset.json)里

  • 🦅 每 10s 定时持久化发送进度。

核心代码如下:

  1. 1: // ⬇️⬇️⬇️【ScheduleMessageService.java】

  2. 2: /**

  3. 3: public void start() {

  4. 4:     // 定时发送消息

  5. 5:     for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {

  6. 6:         Integer level = entry.getKey();

  7. 7:         Long timeDelay = entry.getValue();

  8. 8:         Long offset = this.offsetTable.get(level);

  9. 9:         if (null == offset) {

  10. 10:             offset = 0L;

  11. 11:         }

  12. 12:

  13. 13:         if (timeDelay != null) {

  14. 14:             this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);

  15. 15:         }

  16. 16:     }

  17. 17:

  18. 18:     // 定时持久化发送进度

  19. 19:     this.timer.scheduleAtFixedRate(new TimerTask() {

  20. 20:

  21. 21:         @Override

  22. 22:         public void run() {

  23. 23:             try {

  24. 24:                 ScheduleMessageService.this.persist();

  25. 25:             } catch (Exception e) {

  26. 26:                 log.error("scheduleAtFixedRate flush exception", e);

  27. 27:             }

  28. 28:         }

  29. 29:     }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());

  30. 30: }

3. 消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。

  • 🦅 Consumer 将消费失败的消息发回 Broker,进入延迟消息队列。即,消费失败的消息,不会立即消费。

核心代码如下:

  1. 1: // ⬇️⬇️⬇️【SendMessageProcessor.java】

  2. 2: /**

  3. 3:  * 消费者发回消息

  4. 4:  *

  5. 5:  * @param ctx ctx

  6. 6:  * @param request 请求

  7. 7:  * @return 响应

  8. 8:  * @throws RemotingCommandException 当远程调用异常

  9. 9:  */

  10. 10: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)

  11. 11:     throws RemotingCommandException {

  12. 12:     // ....(省略代码)

  13. 13:     // 处理 delayLevel(独有)。

  14. 14:     int delayLevel = requestHeader.getDelayLevel();

  15. 15:     int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();

  16. 16:     if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {

  17. 17:         maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();

  18. 18:     }

  19. 19:     if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//

  20. 20:     // ....(省略代码)

  21. 21:     } else {

  22. 22:         if (0 == delayLevel) {

  23. 23:             delayLevel = 3 + msgExt.getReconsumeTimes();

  24. 24:         }

  25. 25:         msgExt.setDelayTimeLevel(delayLevel);

  26. 26:     }

  27. 27:

  28. 28:     // ....(省略代码)

  29. 29:     return response;

  30. 30: }


推荐阅读

RocketMQ 源码分析:Message 顺序发送与消费

RocketMQ 源码分析:Message 拉取与消费(上)

RocketMQ 源码分析:Message 拉取与消费(下)

Dubbo将积极适配Spring Cloud生态

Spring Cloud微服务架构汇总

那些没说出口的研发之痛,做与不做微服务的几大理由

浅谈微服务基建的逻辑

Service Mesh:下一代微服务

微服务(Microservices)【翻译】

长按指纹

一键关注

点击 “阅读原文” 看看本号其他精彩内容

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存