分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
本文主要基于 RocketMQ 4.0.x 正式版
1、概述
2、ConsumeQueue 结构
3、ConsumeQueue 存储
DefaultMessageStore#doDispatch(...)
ConsumeQueue#putMessagePositionInfoWrapper(...)
ReputMessageService
FlushConsumeQueueService
4、Broker 提供[拉取消息]接口
PullMessageRequestHeader
PullMessageProcessor#processRequest(...)
MessageStore#getMessage(...)
DefaultMessageFilter#isMessageMatched(...)
PullRequestHoldService
PullMessageProcessor#executeRequestWhenWakeup(...)
5、Broker 提供[更新消费进度]接口
MixAll#string2File(...)
BrokerController#initialize(...)
ConfigManager
ConsumerOffsetManager
6、Broker 提供[发回消息]接口
SendMessageProcessor#consumerSendMsgBack(...)
7、结尾
1、概述
本章主要解析 消费 逻辑涉及到的源码。 因为篇幅较长,分成上下两篇:
上篇:
Broker
相关源码。下篇:
Consumer
相关源码。
本文即是上篇。
ok,先看第一张关于消费逻辑的图:
再看消费逻辑精简的顺序图(实际情况会略有差别):
2、ConsumeQueue 结构
ConsumeQueue
、 MappedFileQueue
、 MappedFile
的关系如下:
ConsumeQueue
:MappedFileQueue
:MappedFile
= 1 : 1 : N。
反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd
/Users/yunai/store/consumequeue
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -ls
total 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:52 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 1
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 2
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 3
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/
Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -ls
total 11720
11720 -rw-r--r-- 1 yunai staff 6000000 4 27 21:55 00000000000000000000
ConsumeQueue
、 MappedFileQueue
、 MappedFile
的定义如下:
MappedFile
:00000000000000000000等文件。MappedFileQueue
:MappedFile
所在的文件夹,对MappedFile
进行封装成文件队列,对上层提供可无限使用的文件容量。每个
MappedFile
统一文件大小。文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在
ConsumeQueue
里默认为 6000000B。ConsumeQueue
:针对MappedFileQueue
的封装使用。Store:ConsumeQueue=ConcurrentHashMap<String/* topic */,ConcurrentHashMap<Integer/* queueId */,ConsumeQueue>>
。
ConsumeQueue
存储在 MappedFile
的内容必须大小是 20B( ConsumeQueue.CQ_STORE_UNIT_SIZE
),有两种内容类型:
MESSAGE_POSITION_INFO
:消息位置信息。BLANK
: 文件前置空白占位。当历史Message
被删除时,需要用BLANK
占位被删除的消息。
MESSAGE_POSITION_INFO
在 ConsumeQueue
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | offset | 消息 CommitLog 存储位置 | Long | 8 |
2 | size | 消息长度 | Int | 4 |
3 | tagsCode | 消息tagsCode | Long | 8 |
BLANK
在 ConsumeQueue
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | 0 | Long | 8 | |
2 | Integer.MAX_VALUE | Int | 4 | |
3 | 0 | Long | 8 |
3、ConsumeQueue 存储
主要有两个组件:
ReputMessageService
:write ConsumeQueue。FlushConsumeQueueService
:flush ConsumeQueue。
ReputMessageService
1: class ReputMessageService extends ServiceThread {
2:
3: /**
4: * 开始重放消息的CommitLog物理位置
5: */
6: private volatile long reputFromOffset = 0;
7:
8: public long getReputFromOffset() {
9: return reputFromOffset;
10: }
11:
12: public void setReputFromOffset(long reputFromOffset) {
13: this.reputFromOffset = reputFromOffset;
14: }
15:
16: @Override
17: public void shutdown() {
18: for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
19: try {
20: Thread.sleep(100);
21: } catch (InterruptedException ignored) {
22: }
23: }
24:
25: if (this.isCommitLogAvailable()) {
26: log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
27: DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
28: }
29:
30: super.shutdown();
31: }
32:
33: /**
34: * 剩余需要重放消息字节数
35: *
36: * @return 字节数
37: */
38: public long behind() {
39: return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
40: }
41:
42: /**
43: * 是否commitLog需要重放消息
44: *
45: * @return 是否
46: */
47: private boolean isCommitLogAvailable() {
48: return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
49: }
50:
51: private void doReput() {
52: for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
53:
54: // TODO 疑问:这个是啥
55: if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
56: && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
57: break;
58: }
59:
60: // 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBuffer
61: SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
62: if (result != null) {
63: try {
64: this.reputFromOffset = result.getStartOffset();
65:
66: // 遍历MappedByteBuffer
67: for (int readSize = 0; readSize < result.getSize() && doNext; ) {
68: // 生成重放消息重放调度请求
69: DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
70: int size = dispatchRequest.getMsgSize(); // 消息长度
71: // 根据请求的结果处理
72: if (dispatchRequest.isSuccess()) { // 读取成功
73: if (size > 0) { // 读取Message
74: DefaultMessageStore.this.doDispatch(dispatchRequest);
75: // 通知有新消息
76: if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
77: && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
78: DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
79: dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
80: dispatchRequest.getTagsCode());
81: }
82: // FIXED BUG By shijia
83: this.reputFromOffset += size;
84: readSize += size;
85: // 统计
86: if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
87: DefaultMessageStore.this.storeStatsService
88: .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
89: DefaultMessageStore.this.storeStatsService
90: .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
91: .addAndGet(dispatchRequest.getMsgSize());
92: }
93: } else if (size == 0) { // 读取到MappedFile文件尾
94: this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
95: readSize = result.getSize();
96: }
97: } else if (!dispatchRequest.isSuccess()) { // 读取失败
98: if (size > 0) { // 读取到Message却不是Message
99: log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
100: this.reputFromOffset += size;
101: } else { // 读取到Blank却不是Blank
102: doNext = false;
103: if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
104: log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
105: this.reputFromOffset);
106:
107: this.reputFromOffset += result.getSize() - readSize;
108: }
109: }
110: }
111: }
112: } finally {
113: result.release();
114: }
115: } else {
116: doNext = false;
117: }
118: }
119: }
120:
121: @Override
122: public void run() {
123: DefaultMessageStore.log.info(this.getServiceName() + " service started");
124:
125: while (!this.isStopped()) {
126: try {
127: Thread.sleep(1);
128: this.doReput();
129: } catch (Exception e) {
130: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
131: }
132: }
133:
134: DefaultMessageStore.log.info(this.getServiceName() + " service end");
135: }
136:
137: @Override
138: public String getServiceName() {
139: return ReputMessageService.class.getSimpleName();
140: }
141:
142: }
说明:重放消息线程服务。
该服务不断生成 消息位置信息 到 消费队列(ConsumeQueue)
该服务不断生成 消息索引 到 索引文件(IndexFile)
第 75 至 81 行 :当
Broker
是主节点 &&Broker
开启的是长轮询,通知消费队列有新的消息。NotifyMessageArrivingListener
会 调用PullRequestHoldService#notifyMessageArriving(...)
方法,详细解析见:PullRequestHoldService第 61 行 :获取
reputFromOffset
开始的CommitLog
对应的MappedFile
对应的MappedByteBuffer
。第 67 行 :遍历
MappedByteBuffer
。第 69 行 :生成重放消息重放调度请求 (
DispatchRequest
) 。请求里主要包含一条消息 (Message
) 或者 文件尾 (BLANK
) 的基本信息。第 72 至 96 行 :请求是有效请求,进行逻辑处理。
第 73 至 92 行 :请求对应的是
Message
,进行调度,生成ConsumeQueue
和IndexFile
对应的内容。详细解析见:第 93 至 96 行 :请求对应的是
Blank
,即文件尾,跳转指向下一个MappedFile
。第 97 至 110 行 :请求是无效请求。出现该情况,基本是一个BUG。
第 127 至 128 行 :每 1ms 循环执行重放逻辑。
第 18 至 30 行 :
shutdown
时,多次sleep(100)
直到CommitLog
回放到最新位置。恩,如果未回放完,会输出警告日志。
DefaultMessageStore#doDispatch(...)
1: /**
2: * 执行调度请求
3: * 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
4: * 2. 建立 索引信息 到 IndexFile
5: *
6: * @param req 调度请求
7: */
8: public void doDispatch(DispatchRequest req) {
9: // 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
10: final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
11: switch (tranType) {
12: case MessageSysFlag.TRANSACTION_NOT_TYPE:
13: case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
14: DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
15: req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
16: break;
17: case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
18: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
19: break;
20: }
21: // 建立 索引信息 到 IndexFile
22: if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
23: DefaultMessageStore.this.indexService.buildIndex(req);
24: }
25: }
26:
27: /**
28: * 建立 消息位置信息 到 ConsumeQueue
29: *
30: * @param topic 主题
31: * @param queueId 队列编号
32: * @param offset commitLog存储位置
33: * @param size 消息长度
34: * @param tagsCode 消息tagsCode
35: * @param storeTimestamp 存储时间
36: * @param logicOffset 队列位置
37: */
38: public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
39: long logicOffset) {
40: ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
41: cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
42: }
ConsumeQueue#putMessagePositionInfoWrapper(...)
1: /**
2: * 添加位置信息封装
3: *
4: * @param offset commitLog存储位置
5: * @param size 消息长度
6: * @param tagsCode 消息tagsCode
7: * @param storeTimestamp 消息存储时间
8: * @param logicOffset 队列位置
9: */
10: public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
11: long logicOffset) {
12: final int maxRetries = 30;
13: boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
14: // 多次循环写,直到成功
15: for (int i = 0; i < maxRetries && canWrite; i++) {
16: // 调用添加位置信息
17: boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
18: if (result) {
19: // 添加成功,使用消息存储时间 作为 存储check point。
20: this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
21: return;
22: } else {
23: // XXX: warn and notify me
24: log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
25: + " failed, retry " + i + " times");
26:
27: try {
28: Thread.sleep(1000);
29: } catch (InterruptedException e) {
30: log.warn("", e);
31: }
32: }
33: }
34:
35: // XXX: warn and notify me 设置异常不可写入
36: log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
37: this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
38: }
39:
40: /**
41: * 添加位置信息,并返回添加是否成功
42: *
43: * @param offset commitLog存储位置
44: * @param size 消息长度
45: * @param tagsCode 消息tagsCode
46: * @param cqOffset 队列位置
47: * @return 是否成功
48: */
49: private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
50: final long cqOffset) {
51: // 如果已经重放过,直接返回成功
52: if (offset <= this.maxPhysicOffset) {
53: return true;
54: }
55: // 写入位置信息到byteBuffer
56: this.byteBufferIndex.flip();
57: this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
58: this.byteBufferIndex.putLong(offset);
59: this.byteBufferIndex.putInt(size);
60: this.byteBufferIndex.putLong(tagsCode);
61: // 计算consumeQueue存储位置,并获得对应的MappedFile
62: final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
63: MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
64: if (mappedFile != null) {
65: // 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位
66: if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑问:为啥这个操作。目前能够想象到的是,一些老的消息很久没发送,突然发送,这个时候刚好满足。
67: this.minLogicOffset = expectLogicOffset;
68: this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
69: this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
70: this.fillPreBlank(mappedFile, expectLogicOffset);
71: log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
72: + mappedFile.getWrotePosition());
73: }
74: // 校验consumeQueue存储位置是否合法。TODO 如果不合法,继续写入会不会有问题?
75: if (cqOffset != 0) {
76: long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
77: if (expectLogicOffset != currentLogicOffset) {
78: LOG_ERROR.warn(
79: "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
80: expectLogicOffset,
81: currentLogicOffset,
82: this.topic,
83: this.queueId,
84: expectLogicOffset - currentLogicOffset
85: );
86: }
87: }
88: // 设置commitLog重放消息到ConsumeQueue位置。
89: this.maxPhysicOffset = offset;
90: // 插入mappedFile
91: return mappedFile.appendMessage(this.byteBufferIndex.array());
92: }
93: return false;
94: }
95:
96: /**
97: * 填充前置空白占位
98: *
99: * @param mappedFile MappedFile
100: * @param untilWhere consumeQueue存储位置
101: */
102: private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
103: // 写入前置空白占位到byteBuffer
104: ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
105: byteBuffer.putLong(0L);
106: byteBuffer.putInt(Integer.MAX_VALUE);
107: byteBuffer.putLong(0L);
108: // 循环填空
109: int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
110: for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
111: mappedFile.appendMessage(byteBuffer.array());
112: }
113: }
#putMessagePositionInfoWrapper(...)
说明 :添加位置信息到ConsumeQueue
的封装,实际需要调用#putMessagePositionInfo(...)
方法。第 13 行 :判断
ConsumeQueue
是否允许写入。当发生Bug时,不允许写入。第 17 行 :调用
#putMessagePositionInfo(...)
方法,添加位置信息。第 18 至 21 行 :添加成功,使用消息存储时间 作为 存储检查点。
StoreCheckpoint
的详细解析见:Store初始化与关闭。第 22 至 32 行 :添加失败,目前基本可以认为是BUG。
第 35 至 37 行 :写入失败时,标记
ConsumeQueue
写入异常,不允许继续写入。#putMessagePositionInfo(...)
说明 :添加位置信息到ConsumeQueue
,并返回添加是否成功。这块比较有疑问,如果计算出来的存储位置不合法,不返回添加失败,继续进行添加位置信息,会不会有问题???
第 51 至 54 行 :如果
offset
(存储位置) 小于等于maxPhysicOffset
(CommitLog
消息重放到ConsumeQueue
最大的CommitLog
存储位置),表示已经重放过,此时,不再重复写入,直接返回写入成功。第 55 至 60 行 :写 位置信息到byteBuffer。
第 62 至 63 行 :计算
ConsumeQueue
存储位置,并获得对应的MappedFile。第 65 至 73 行 :当
MappedFile
是ConsumeQueue
当前第一个文件 &&MappedFile
未写入内容 && 重放消息队列位置大于0,则需要进行MappedFile
填充前置BLANK
。这块比较有疑问,什么场景下会需要。猜测产生的原因:一个
Topic
长期无消息产生,突然N天后进行发送,Topic
对应的历史消息以及和消费队列数据已经被清理,新生成的MappedFile
需要前置占位。第 74 至 87 行 :校验
ConsumeQueue
存储位置是否合法,不合法则输出日志。第 89 行 :设置
CommitLog
重放消息到ConsumeQueue
的最大位置。第 91 行 :插入消息位置到
MappedFile
。
FlushConsumeQueueService
1: class FlushConsumeQueueService extends ServiceThread {
2: private static final int RETRY_TIMES_OVER = 3;
3: /**
4: * 最后flush时间戳
5: */
6: private long lastFlushTimestamp = 0;
7:
8: private void doFlush(int retryTimes) {
9: int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
10:
11: // retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。
12: if (retryTimes == RETRY_TIMES_OVER) {
13: flushConsumeQueueLeastPages = 0;
14: }
15: // 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flush
16: long logicsMsgTimestamp = 0;
17: int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
18: long currentTimeMillis = System.currentTimeMillis();
19: if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
20: this.lastFlushTimestamp = currentTimeMillis;
21: flushConsumeQueueLeastPages = 0;
22: logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
23: }
24: // flush消费队列
25: ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
26: for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
27: for (ConsumeQueue cq : maps.values()) {
28: boolean result = false;
29: for (int i = 0; i < retryTimes && !result; i++) {
30: result = cq.flush(flushConsumeQueueLeastPages);
31: }
32: }
33: }
34: // flush 存储 check point
35: if (0 == flushConsumeQueueLeastPages) {
36: if (logicsMsgTimestamp > 0) {
37: DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
38: }
39: DefaultMessageStore.this.getStoreCheckpoint().flush();
40: }
41: }
42:
43: public void run() {
44: DefaultMessageStore.log.info(this.getServiceName() + " service started");
45:
46: while (!this.isStopped()) {
47: try {
48: int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
49: this.waitForRunning(interval);
50: this.doFlush(1);
51: } catch (Exception e) {
52: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
53: }
54: }
55:
56: this.doFlush(RETRY_TIMES_OVER);
57:
58: DefaultMessageStore.log.info(this.getServiceName() + " service end");
59: }
60:
61: @Override
62: public String getServiceName() {
63: return FlushConsumeQueueService.class.getSimpleName();
64: }
65:
66: @Override
67: public long getJointime() {
68: return 1000 * 60;
69: }
70: }
说明 :flush
ConsumeQueue
(消费队列) 线程服务。第 11 至 14 行 :当
retryTimes==RETRY_TIMES_OVER
时,进行强制flush。用于shutdown
时。第 15 至 23 行 :每 flushConsumeQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushConsumeQueueLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。
第 24 至 33 行 :flush
ConsumeQueue
(消费队列)。flush 逻辑:MappedFile#落盘。
第 34 至 40 行 :flush
StoreCheckpoint
。StoreCheckpoint
的详细解析见:Store初始化与关闭。第 43 至 59 行 :每 1000ms 执行一次
flush
。如果 wakeup() 时,则会立即进行一次flush
。目前,暂时不存在 wakeup() 的调用。
4、Broker 提供[拉取消息]接口
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/
5、Broker 提供[更新消费进度]接口
Yunai-MacdeMacBook-Pro-2:config yunai$ pwd
/Users/yunai/store/config
Yunai-MacdeMacBook-Pro-2:config yunai$ ls -ls
total 40
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json.bak
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json.bak
8 -rw-r--r-- 1 yunai staff 1401 4 27 21:51 topics.json
Yunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json
{
"offsetTable":{
"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
},
"TopicRead3@please_rename_unique_group_name_4":{1:5
}
}
}
consumerOffset.json
:消费进度存储文件。consumerOffset.json.bak
:消费进度存储文件备份。每次写入
consumerOffset.json
,将原内容备份到consumerOffset.json.bak
。实现见:MixAll#string2File(...)。
BrokerController#initialize(...)
1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
2: @Override
3: public void run() {
4: try {
5: BrokerController.this.consumerOffsetManager.persist();
6: } catch (Throwable e) {
7: log.error("schedule persist consumerOffset error.", e);
8: }
9: }
10:}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
说明 :每 5s 执行一次持久化逻辑。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/
6、Broker 提供[发回消息]接口
大部分逻辑和 Broker
提供[接收消息]接口 类似,可以先看下相关内容。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/
7、结尾
感谢同学们对本文的阅读、收藏、点赞。
😈如果解析存在问题或者表达误解的,表示抱歉。如果方便的话,可以加下 QQ:7685413。让我们来一场 1 :1 交流(搞基)。
再次表示十分感谢。