分布式消息队列 RocketMQ 源码分析 —— Message 顺序发送与消费
本文主要基于 RocketMQ 4.0.x 正式版
1. 概述
2.
Producer
顺序发送3.
Consumer
严格顺序消费3.1 获得(锁定)消息队列
3.2 移除消息队列
3.3 消费消息队列
3.1.1 消费消息
3.1.2 处理消费结果
3.1.3 消息处理队列核心方法
1. 概述
建议前置阅读内容:
当然对 Message
发送与消费已经有一定了解的同学,可以选择跳过。
RocketMQ
提供了两种顺序级别:
普通顺序消息 :
Producer
将相关联的消息发送到相同的消息队列。完全严格顺序 :在
普通顺序消息
的基础上,Consumer
严格顺序消费。
绝大部分场景下只需要用到普通顺序消息。
例如说:给用户发送短信消息 + 发送推送消息,将两条消息发送到不同的消息队列,若其中一条消息队列消费较慢造成堵塞,用户可能会收到两条消息会存在一定的时间差,带来的体验会相对较差。当然类似这种场景,即使有一定的时间差,不会产生系统逻辑上BUG。另外, 普通顺序消息
性能能更加好。
那么什么时候使用使用完全严格顺序?如下是来自官方文档的说明:
目前已知的应用只有数据库
binlog
同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息
😈上代码!!!
2. Producer
顺序发送
官方发送顺序消息的例子:
1: package org.apache.rocketmq.example.ordermessage;
2:
3: import java.io.UnsupportedEncodingException;
4: import java.util.List;
5: import org.apache.rocketmq.client.exception.MQBrokerException;
6: import org.apache.rocketmq.client.exception.MQClientException;
7: import org.apache.rocketmq.client.producer.DefaultMQProducer;
8: import org.apache.rocketmq.client.producer.MQProducer;
9: import org.apache.rocketmq.client.producer.MessageQueueSelector;
10: import org.apache.rocketmq.client.producer.SendResult;
11: import org.apache.rocketmq.common.message.Message;
12: import org.apache.rocketmq.common.message.MessageQueue;
13: import org.apache.rocketmq.remoting.common.RemotingHelper;
14: import org.apache.rocketmq.remoting.exception.RemotingException;
15:
16: public class Producer {
17: public static void main(String[] args) throws UnsupportedEncodingException {
18: try {
19: MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
20: producer.start();
21:
22: String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
23: for (int i = 0; i < 100; i++) {
24: int orderId = i % 10;
25: Message msg =
26: new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
27: ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
28: SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
29: @Override
30: public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
31: Integer id = (Integer) arg;
32: int index = id % mqs.size();
33: return mqs.get(index);
34: }
35: }, orderId);
36:
37: System.out.printf("%s%n", sendResult);
38: }
39:
40: producer.shutdown();
41: } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
42: e.printStackTrace();
43: }
44: }
45: }
第 28 至 35 行 :实现了根据
id%mqs.size()
来进行消息队列的选择。当前例子,我们传递orderId
作为参数,那么相同的orderId
能够进入相同的消息队列。
MessageQueueSelector
接口的源码:
1: public interface MessageQueueSelector {
2:
3: /**
4: * 选择消息队列
5: *
6: * @param mqs 消息队列
7: * @param msg 消息
8: * @param arg 参数
9: * @return 消息队列
10: */
11: MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
12: }
Producer
选择队列发送消息方法的源码:
16: private SendResult sendSelectImpl(//
17: Message msg, //
18: MessageQueueSelector selector, //
19: Object arg, //
20: final CommunicationMode communicationMode, //
21: final SendCallback sendCallback, final long timeout//
22: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
23: this.makeSureStateOK();
24: Validators.checkMessage(msg, this.defaultMQProducer);
25:
26: TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
27: if (topicPublishInfo != null && topicPublishInfo.ok()) {
28: MessageQueue mq = null;
29: try {
30: mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
31: } catch (Throwable e) {
32: throw new MQClientException("select message queue throwed exception.", e);
33: }
34:
35: if (mq != null) {
36: return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
37: } else {
38: throw new MQClientException("select message queue return null.", null);
39: }
40: }
41:
42: throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
43: }
第 30 行 :选择消息队列。
第 36 行 :发送消息。
3. Consumer
严格顺序消费
Consumer
在严格顺序消费时,通过 三 把锁保证严格顺序消费。
Broker
消息队列锁(分布式锁) :集群模式下,
Consumer
从Broker
获得该锁后,才能进行消息拉取、消费。广播模式下,
Consumer
无需该锁。Consumer
消息队列锁(本地锁) :Consumer
获得该锁才能操作消息队列。Consumer
消息处理队列消费锁(本地锁) :Consumer
获得该锁才能消费消息队列。
可能同学有疑问,为什么有 Consumer
消息队列锁还需要有 Consumer
消息队列消费锁呢?😈让我们带着疑问继续往下看。
3.1 获得(锁定)消息队列
集群模式下, Consumer
更新属于自己的消息队列时,会向 Broker
锁定该消息队列(广播模式下不需要)。如果锁定失败,则更新失败,即该消息队列不属于自己,不能进行消费。核心代码如下:
1: // ⬇️⬇️⬇️【RebalanceImpl.java】
2: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
3: // ..... 此处省略部分代码
4: // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。
5: List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息请求数组
6: for (MessageQueue mq : mqSet) {
7: if (!this.processQueueTable.containsKey(mq)) {
8: if (isOrder && !this.lock(mq)) { // 顺序消息锁定消息队列
9: log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
10: continue;
11: }
12:
13: this.removeDirtyOffset(mq);
14: ProcessQueue pq = new ProcessQueue();
15: long nextOffset = this.computePullFromWhere(mq);
16: if (nextOffset >= 0) {
17: ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
18: if (pre != null) {
19: log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
20: } else {
21: log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
22: PullRequest pullRequest = new PullRequest();
23: pullRequest.setConsumerGroup(consumerGroup);
24: pullRequest.setNextOffset(nextOffset);
25: pullRequest.setMessageQueue(mq);
26: pullRequest.setProcessQueue(pq);
27: pullRequestList.add(pullRequest);
28: changed = true;
29: }
30: } else {
31: log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
32: }
33: }
34: }
35:
36: // ..... 此处省略部分代码
37: }
38:
39: // ⬇️⬇️⬇️【RebalanceImpl.java】
40: /**
41: * 请求Broker获得指定消息队列的分布式锁
42: *
43: * @param mq 队列
44: * @return 是否成功
45: */
46: public boolean lock(final MessageQueue mq) {
47: FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
48: if (findBrokerResult != null) {
49: LockBatchRequestBody requestBody = new LockBatchRequestBody();
50: requestBody.setConsumerGroup(this.consumerGroup);
51: requestBody.setClientId(this.mQClientFactory.getClientId());
52: requestBody.getMqSet().add(mq);
53:
54: try {
55: // 请求Broker获得指定消息队列的分布式锁
56: Set<MessageQueue> lockedMq =
57: this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
58:
59: // 设置消息处理队列锁定成功。锁定消息队列成功,可能本地没有消息处理队列,设置锁定成功会在lockAll()方法。
60: for (MessageQueue mmqq : lockedMq) {
61: ProcessQueue processQueue = this.processQueueTable.get(mmqq);
62: if (processQueue != null) {
63: processQueue.setLocked(true);
64: processQueue.setLastLockTimestamp(System.currentTimeMillis());
65: }
66: }
67:
68: boolean lockOK = lockedMq.contains(mq);
69: log.info("the message queue lock {}, {} {}",
70: lockOK ? "OK" : "Failed",
71: this.consumerGroup,
72: mq);
73: return lockOK;
74: } catch (Exception e) {
75: log.error("lockBatchMQ exception, " + mq, e);
76: }
77: }
78:
79: return false;
80: }
⬆️⬆️⬆️
第 8 至 11 行 :顺序消费时,锁定消息队列。如果锁定失败,新增消息处理队列失败。
Broker
消息队列锁会过期,默认配置 30s。因此, Consumer
需要不断向 Broker
刷新该锁过期时间,默认配置 20s 刷新一次。核心代码如下:
1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: public void start() {
3: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
4: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
5: @Override
6: public void run() {
7: ConsumeMessageOrderlyService.this.lockMQPeriodically();
8: }
9: }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
10: }
11: }
3.2 移除消息队列
集群模式下, Consumer
移除自己的消息队列时,会向 Broker
解锁该消息队列(广播模式下不需要)。核心代码如下:
1: // ⬇️⬇️⬇️【RebalancePushImpl.java】
2: /**
3: * 移除不需要的队列相关的信息
4: * 1. 持久化消费进度,并移除之
5: * 2. 顺序消费&集群模式,解锁对该队列的锁定
6: *
7: * @param mq 消息队列
8: * @param pq 消息处理队列
9: * @return 是否移除成功
10: */
11: @Override
12: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
13: // 同步队列的消费进度,并移除之。
14: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
15: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
16: // 集群模式下,顺序消费移除时,解锁对队列的锁定
17: if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
18: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
19: try {
20: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
21: try {
22: return this.unlockDelay(mq, pq);
23: } finally {
24: pq.getLockConsume().unlock();
25: }
26: } else {
27: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
28: mq, //
29: pq.getTryUnlockTimes());
30:
31: pq.incTryUnlockTimes();
32: }
33: } catch (Exception e) {
34: log.error("removeUnnecessaryMessageQueue Exception", e);
35: }
36:
37: return false;
38: }
39: return true;
40: }
41:
42: // ⬇️⬇️⬇️【RebalancePushImpl.java】
43: /**
44: * 延迟解锁 Broker 消息队列锁
45: * 当消息处理队列不存在消息,则直接解锁
46: *
47: * @param mq 消息队列
48: * @param pq 消息处理队列
49: * @return 是否解锁成功
50: */
51: private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
52: if (pq.hasTempMessage()) { // TODO 疑问:为什么要延迟移除
53: log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
54: this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
55: @Override
56: public void run() {
57: log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
58: RebalancePushImpl.this.unlock(mq, true);
59: }
60: }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
61: } else {
62: this.unlock(mq, true);
63: }
64: return true;
65: }
⬆️⬆️⬆️
第 20 至 32 行 :获取消息队列消费锁,避免和消息队列消费冲突。如果获取锁失败,则移除消息队列失败,等待下次重新分配消费队列时,再进行移除。如果未获得锁而进行移除,则可能出现另外的
Consumer
和当前Consumer
同时消费该消息队列,导致消息无法严格顺序消费。第 51 至 64 行 :解锁
Broker
消息队列锁。如果消息处理队列存在剩余消息,则延迟解锁Broker
消息队列锁。❓为什么消息处理队列存在剩余消息不能直接解锁呢?😈我也不知道,百思不得其解。如果有知道的同学麻烦教育下俺。
3.3 消费消息队列
😏本节会类比并发消费消费队列,建议对照 PushConsumer并发消费消息 一起理解。
3.1.1 消费消息
1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: class ConsumeRequest implements Runnable {
3:
4: /**
5: * 消息处理队列
6: */
7: private final ProcessQueue processQueue;
8: /**
9: * 消息队列
10: */
11: private final MessageQueue messageQueue;
12:
13: @Override
14: public void run() {
15: if (this.processQueue.isDropped()) {
16: log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
17: return;
18: }
19:
20: // 获得 Consumer 消息队列锁
21: final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
22: synchronized (objLock) {
23: // (广播模式) 或者 (集群模式 && Broker消息队列锁有效)
24: if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
25: || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
26: final long beginTime = System.currentTimeMillis();
27: // 循环
28: for (boolean continueConsume = true; continueConsume; ) {
29: if (this.processQueue.isDropped()) {
30: log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
31: break;
32: }
33:
34: // 消息队列分布式锁未锁定,提交延迟获得锁并消费请求
35: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
36: && !this.processQueue.isLocked()) {
37: log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
38: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
39: break;
40: }
41: // 消息队列分布式锁已经过期,提交延迟获得锁并消费请求
42: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
43: && this.processQueue.isLockExpired()) {
44: log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
45: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
46: break;
47: }
48:
49: // 当前周期消费时间超过连续时长,默认:60s,提交延迟消费请求。默认情况下,每消费1分钟休息10ms。
50: long interval = System.currentTimeMillis() - beginTime;
51: if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
52: ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
53: break;
54: }
55:
56: // 获取消费消息。此处和并发消息请求不同,并发消息请求已经带了消费哪些消息。
57: final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
58: List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
59: if (!msgs.isEmpty()) {
60: final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
61:
62: ConsumeOrderlyStatus status = null;
63:
64: // ....省略代码:Hook:before
65:
66: // 执行消费
67: long beginTimestamp = System.currentTimeMillis();
68: ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
69: boolean hasException = false;
70: try {
71: this.processQueue.getLockConsume().lock(); // 锁定队列消费锁
72:
73: if (this.processQueue.isDropped()) {
74: log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
75: this.messageQueue);
76: break;
77: }
78:
79: status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
80: } catch (Throwable e) {
81: log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
82: RemotingHelper.exceptionSimpleDesc(e), //
83: ConsumeMessageOrderlyService.this.consumerGroup, //
84: msgs, //
85: messageQueue);
86: hasException = true;
87: } finally {
88: this.processQueue.getLockConsume().unlock(); // 锁定队列消费锁
89: }
90:
91: // ....省略代码:解析消费结果状态
92:
93: // ....省略代码:Hook:after
94:
95: ConsumeMessageOrderlyService.this.getConsumerStatsManager()
96: .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
97:
98: // 处理消费结果
99: continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
100: } else {
101: continueConsume = false;
102: }
103: }
104: } else {
105: if (this.processQueue.isDropped()) {
106: log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
107: return;
108: }
109:
110: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
111: }
112: }
113: }
114:
115: }
⬆️⬆️⬆️
第 20 行 :获得
Consumer
消息队列锁。第 58 行 :从消息处理队列顺序获得消息。和并发消费获得消息不同。并发消费请求在请求创建时,已经设置好消费哪些消息。
第 71 行 :获得
Consumer
消息处理队列消费锁。相比【Consumer
消息队列锁】,其粒度较小。这就是上文提到的❓为什么有Consumer
消息队列锁还需要有 Consumer 消息队列消费锁呢的原因。第 79 行 :执行消费。
第 99 行 :处理消费结果。
3.1.2 处理消费结果
顺序消费消息结果 ( ConsumeOrderlyStatus
) 有四种情况:
SUCCESS
:消费成功但不提交。ROLLBACK
:消费失败,消费回滚。COMMIT
:消费成功提交并且提交。SUSPEND_CURRENT_QUEUE_A_MOMENT
:消费失败,挂起消费队列一会会,稍后继续消费。
考虑到 ROLLBACK
、 COMMIT
暂时只使用在 MySQLbinlog
场景,官方将这两状态标记为 @Deprecated
。当然,相应的实现逻辑依然保留。
在并发消费场景时,如果消费失败, Consumer
会将消费失败消息发回到 Broker
重试队列,跳过当前消息,等待下次拉取该消息再进行消费。
但是在完全严格顺序消费消费时,这样做显然不行。也因此,消费失败的消息,会挂起队列一会会,稍后继续消费。
不过消费失败的消息一直失败,也不可能一直消费。当超过消费重试上限时, Consumer
会将消费失败超过上限的消息发回到 Broker
死信队列。
让我们来看看代码:
1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: /**
3: * 处理消费结果,并返回是否继续消费
4: *
5: * @param msgs 消息
6: * @param status 消费结果状态
7: * @param context 消费Context
8: * @param consumeRequest 消费请求
9: * @return 是否继续消费
10: */
11: public boolean processConsumeResult(//
12: final List<MessageExt> msgs, //
13: final ConsumeOrderlyStatus status, //
14: final ConsumeOrderlyContext context, //
15: final ConsumeRequest consumeRequest//
16: ) {
17: boolean continueConsume = true;
18: long commitOffset = -1L;
19: if (context.isAutoCommit()) {
20: switch (status) {
21: case COMMIT:
22: case ROLLBACK:
23: log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue());
24: case SUCCESS:
25: // 提交消息已消费成功到消息处理队列
26: commitOffset = consumeRequest.getProcessQueue().commit();
27: // 统计
28: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
29: break;
30: case SUSPEND_CURRENT_QUEUE_A_MOMENT:
31: // 统计
32: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
33: if (checkReconsumeTimes(msgs)) { // 计算是否暂时挂起(暂停)消费N毫秒,默认:10ms
34: // 设置消息重新消费
35: consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
36: // 提交延迟消费请求
37: this.submitConsumeRequestLater(//
38: consumeRequest.getProcessQueue(), //
39: consumeRequest.getMessageQueue(), //
40: context.getSuspendCurrentQueueTimeMillis());
41: continueConsume = false;
42: } else {
43: commitOffset = consumeRequest.getProcessQueue().commit();
44: }
45: break;
46: default:
47: break;
48: }
49: } else {
50: switch (status) {
51: case SUCCESS:
52: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
53: break;
54: case COMMIT:
55: // 提交消息已消费成功到消息处理队列
56: commitOffset = consumeRequest.getProcessQueue().commit();
57: break;
58: case ROLLBACK:
59: // 设置消息重新消费
60: consumeRequest.getProcessQueue().rollback();
61: this.submitConsumeRequestLater(//
62: consumeRequest.getProcessQueue(), //
63: consumeRequest.getMessageQueue(), //
64: context.getSuspendCurrentQueueTimeMillis());
65: continueConsume = false;
66: break;
67: case SUSPEND_CURRENT_QUEUE_A_MOMENT: // 计算是否暂时挂起(暂停)消费N毫秒,默认:10ms
68: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
69: if (checkReconsumeTimes(msgs)) {
70: // 设置消息重新消费
71: consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
72: // 提交延迟消费请求
73: this.submitConsumeRequestLater(//
74: consumeRequest.getProcessQueue(), //
75: consumeRequest.getMessageQueue(), //
76: context.getSuspendCurrentQueueTimeMillis());
77: continueConsume = false;
78: }
79: break;
80: default:
81: break;
82: }
83: }
84:
85: // 消息处理队列未dropped,提交有效消费进度
86: if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
87: this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
88: }
89:
90: return continueConsume;
91: }
92:
93: private int getMaxReconsumeTimes() {
94: // default reconsume times: Integer.MAX_VALUE
95: if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
96: return Integer.MAX_VALUE;
97: } else {
98: return this.defaultMQPushConsumer.getMaxReconsumeTimes();
99: }
100: }
101:
102: /**
103: * 计算是否要暂停消费
104: * 不暂停条件:存在消息都超过最大消费次数并且都发回broker成功
105: *
106: * @param msgs 消息
107: * @return 是否要暂停
108: */
109: private boolean checkReconsumeTimes(List<MessageExt> msgs) {
110: boolean suspend = false;
111: if (msgs != null && !msgs.isEmpty()) {
112: for (MessageExt msg : msgs) {
113: if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
114: MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
115: if (!sendMessageBack(msg)) { // 发回失败,中断
116: suspend = true;
117: msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
118: }
119: } else {
120: suspend = true;
121: msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
122: }
123: }
124: }
125: return suspend;
126: }
127:
128: /**
129: * 发回消息。
130: * 消息发回broker后,对应的消息队列是死信队列。
131: *
132: * @param msg 消息
133: * @return 是否发送成功
134: */
135: public boolean sendMessageBack(final MessageExt msg) {
136: try {
137: // max reconsume times exceeded then send to dead letter queue.
138: Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
139: String originMsgId = MessageAccessor.getOriginMessageId(msg);
140: MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
141: newMsg.setFlag(msg.getFlag());
142: MessageAccessor.setProperties(newMsg, msg.getProperties());
143: MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
144: MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
145: MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
146: newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
147:
148: this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
149: return true;
150: } catch (Exception e) {
151: log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
152: }
153:
154: return false;
155: }
⬆️⬆️⬆️
第 21 至 29 行 :消费成功。在自动提交进度(
AutoCommit
)的情况下,COMMIT
、ROLLBACK
、SUCCESS
逻辑已经统一。第 30 至 45 行 :消费失败。当消息重试次数超过上限(默认 :16次)时,将消息发送到
Broker
死信队列,跳过这些消息。此时,消息队列无需挂起,继续消费后面的消息。第 85 至 88 行 :提交消费进度。
3.13 消息处理队列核心方法
😈涉及到的四个核心方法的源码:
1: // ⬇️⬇️⬇️【ProcessQueue.java】
2: /**
3: * 消息映射
4: * key:消息队列位置
5: */
6: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>(); /**
7: * 消息映射临时存储(消费中的消息)
8: */
9: private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<>();
10:
11: /**
12: * 回滚消费中的消息
13: * 逻辑类似于{@link #makeMessageToCosumeAgain(List)}
14: */
15: public void rollback() {
16: try {
17: this.lockTreeMap.writeLock().lockInterruptibly();
18: try {
19: this.msgTreeMap.putAll(this.msgTreeMapTemp);
20: this.msgTreeMapTemp.clear();
21: } finally {
22: this.lockTreeMap.writeLock().unlock();
23: }
24: } catch (InterruptedException e) {
25: log.error("rollback exception", e);
26: }
27: }
28:
29: /**
30: * 提交消费中的消息已消费成功,返回消费进度
31: *
32: * @return 消费进度
33: */
34: public long commit() {
35: try {
36: this.lockTreeMap.writeLock().lockInterruptibly();
37: try {
38: // 消费进度
39: Long offset = this.msgTreeMapTemp.lastKey();
40:
41: //
42: msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
43:
44: //
45: this.msgTreeMapTemp.clear();
46:
47: // 返回消费进度
48: if (offset != null) {
49: return offset + 1;
50: }
51: } finally {
52: this.lockTreeMap.writeLock().unlock();
53: }
54: } catch (InterruptedException e) {
55: log.error("commit exception", e);
56: }
57:
58: return -1;
59: }
60:
61: /**
62: * 指定消息重新消费
63: * 逻辑类似于{@link #rollback()}
64: *
65: * @param msgs 消息
66: */
67: public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
68: try {
69: this.lockTreeMap.writeLock().lockInterruptibly();
70: try {
71: for (MessageExt msg : msgs) {
72: this.msgTreeMapTemp.remove(msg.getQueueOffset());
73: this.msgTreeMap.put(msg.getQueueOffset(), msg);
74: }
75: } finally {
76: this.lockTreeMap.writeLock().unlock();
77: }
78: } catch (InterruptedException e) {
79: log.error("makeMessageToCosumeAgain exception", e);
80: }
81: }
82:
83: /**
84: * 获得持有消息前N条
85: *
86: * @param batchSize 条数
87: * @return 消息
88: */
89: public List<MessageExt> takeMessags(final int batchSize) {
90: List<MessageExt> result = new ArrayList<>(batchSize);
91: final long now = System.currentTimeMillis();
92: try {
93: this.lockTreeMap.writeLock().lockInterruptibly();
94: this.lastConsumeTimestamp = now;
95: try {
96: if (!this.msgTreeMap.isEmpty()) {
97: for (int i = 0; i < batchSize; i++) {
98: Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
99: if (entry != null) {
100: result.add(entry.getValue());
101: msgTreeMapTemp.put(entry.getKey(), entry.getValue());
102: } else {
103: break;
104: }
105: }
106: }
107:
108: if (result.isEmpty()) {
109: consuming = false;
110: }
111: } finally {
112: this.lockTreeMap.writeLock().unlock();
113: }
114: } catch (InterruptedException e) {
115: log.error("take Messages exception", e);
116: }
117:
118: return result;
119: }