消息队列中间件 RocketMQ 源码分析 —— Message 存储
原文地址:http://www.yunai.me/RocketMQ/message-store/
(建议使用原文地址阅读:1、阅读体验;2、代码排版混乱因而省略;)
RocketMQ
带注释源码地址 :https://github.com/YunaiV/incubator-rocketmq
😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号
1、概述
2、CommitLog 结构
3、CommitLog 存储消息
MappedFile#落盘
FlushRealTimeService
CommitRealTimeService
GroupCommitService
CommitLog#putMessage(...)
MappedFileQueue#getLastMappedFile(...)
MappedFile#appendMessage(...)
DefaultAppendMessageCallback#doAppend(...)
FlushCommitLogService
结尾
1、概述
本文接《RocketMQ 源码分析 —— Message 发送与接收》;
主要解析 CommitLog
存储消息部分。
2、CommitLog 结构
CommitLog
、MappedFileQueue
、MappedFile
的关系如下:
CommitLog
: MappedFileQueue
: MappedFile
= 1 : 1 : N。
反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296
CommitLog
、MappedFileQueue
、MappedFile
的定义如下:
MappedFile
:00000000000000000000、00000000001073741824、00000000002147483648等文件。MappedFileQueue
:MappedFile
所在的文件夹,对MappedFile
进行封装成文件队列,对上层提供可无限使用的文件容量。每个
MappedFile
统一文件大小。文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在
CommitLog
里默认为 1GB。CommitLog
:针对MappedFileQueue
的封装使用。
CommitLog
目前存储在 MappedFile
有两种内容类型:
MESSAGE :消息。
BLANK :文件不足以存储消息时的空白占位。
CommitLog
存储在 MappedFile
的结构:
MESSAGE[1] MESSAGE[2] ... MESSAGE[n - 1] MESSAGE[n] BLANK
MESSAGE
在 CommitLog
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | MsgLen | 消息总长度 | Int | 4 |
2 | MagicCode | MESSAGE_MAGIC_CODE | Int | 4 |
3 | BodyCRC | 消息内容CRC | Int | 4 |
4 | QueueId | 消息队列编号 | Int | 4 |
5 | Flag | flag | Int | 4 |
6 | QueueOffset | 消息队列位置 | Long | 8 |
7 | PhysicalOffset | 物理位置。在 CommitLog 的顺序存储位置。 | Long | 8 |
8 | SysFlag | MessageSysFlag | Int | 4 |
9 | BornTimestamp | 生成消息时间戳 | Long | 8 |
10 | BornHost | 生效消息的地址+端口 | Long | 8 |
11 | StoreTimestamp | 存储消息时间戳 | Long | 8 |
12 | StoreHost | 存储消息的地址+端口 | Long | 8 |
13 | ReconsumeTimes | 重新消费消息次数 | Int | 4 |
14 | PreparedTransationOffset | Long | 8 | |
15 | BodyLength + Body | 内容长度 + 内容 | Int + Bytes | 4 + bodyLength |
16 | TopicLength + Topic | Topic长度 + Topic | Byte + Bytes | 1 + topicLength |
17 | PropertiesLength + Properties | 拓展字段长度 + 拓展字段 | Short + Bytes | 2 + PropertiesLength |
BLANK
在 CommitLog
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | maxBlank | 空白长度 | Int | 4 |
2 | MagicCode | BLANK_MAGIC_CODE | Int | 4 |
3、CommitLog 存储消息
CommitLog#putMessage(...)
// 省略代码说明 :存储消息,并返回存储结果。
第 2 行 :设置存储时间等。
第 16 至 36 行 :事务消息相关,暂未了解。
第 45 & 97 行 :获取锁与释放锁。
第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。
第 55 至 62 行 :获取
MappedFile
,若不存在或已满,则进行创建。详细解析见:MappedFileQueue#getLastMappedFile(...)。第 65 行 :插入消息到
MappedFile
,解析解析见:MappedFile#appendMessage(...)。第 69 至 80 行 :
MappedFile
已满,创建新的,再次插入消息。第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。
第 143 至 173 行 :
Broker
主从同步。后面的文章会详细解析😈。
MappedFileQueue#getLastMappedFile(...)
// 省略代码说明 :获取最后一个
MappedFile
,若不存在或文件已满,则进行创建。第 5 至 11 行 :计算当文件不存在或已满时,新创建文件的
createOffset
。第 14 行 :计算文件名。从此处我们可
以得知,MappedFile
的文件命名规则:
> fileName[n] = fileName[n - 1] + n * mappedFileSize
> fileName[0] = startOffset - (startOffset % this.mappedFileSize)
目前 `CommitLog` 的 `startOffset` 为 0。
此处有个**疑问**,为什么需要 `(startOffset % this.mappedFileSize)`。例如:
| startOffset | mappedFileSize | createOffset |
| --- | :-- | :-- |
| 5 | 1 | 5 |
| 5 | 2 | 4 |
| 5 | 3 | 3 |
| 5 | 4 | 4 |
| 5 | > 5 | 0 |
_如果有知道的同学,麻烦提示下。😈_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 计算出来的是,以 `this.mappedFileSize` 为每个文件大小时,`startOffset` 所在文件的开始`offset`*
第 30 至 35 行 :设置
MappedFile
是否是第一个创建的文件。该标识用于ConsumeQueue
对应的MappedFile
,详见ConsumeQueue#fillPreBlank
。
MappedFile#appendMessage(...)
// 省略代码
说明 :插入消息到
MappedFile
,并返回插入结果。第 8 行 :获取需要写入的字节缓冲区。为什么会有
writeBuffer != null
的判断后,使用不同的字节缓冲区,见:FlushCommitLogService。第 9 至 11 行 :设置写入
position
,执行写入,更新wrotePosition
(当前写入位置,下次开始写入开始位置)。
DefaultAppendMessageCallback#doAppend(...)
// 省略代码说明 :插入消息到字节缓冲区。
第 45 行 :计算物理位置。在
CommitLog
的顺序存储位置。第 47 至 49 行 :计算
CommitLog
里的offsetMsgId
。这里一定要和msgId
区分开。
计算方式 | 长度 | |||
---|---|---|---|---|
offsetMsgId | Broker存储时生成 | Hex(storeHostBytes, wroteOffset) | 32 | |
msgId | Client发送消息时生成 | Hex(进程编号, IP, ClassLoader, startTime, currentTime, 自增序列) | 32 | 《RocketMQ 源码分析 —— Message 基础》 |
第 51 至 61 行 :获取队列位置(offset)。
第 78 至 95 行 :计算消息总长度。
第 98 至 112 行 :当文件剩余空间不足时,写入
BLANK
占位,返回结果。第 114 至 161 行 :写入
MESSAGE
。第 173 行 :更新队列位置(offset)。
FlushCommitLogService
线程服务 | 场景 | 插入消息性能 |
---|---|---|
CommitRealTimeService | 异步刷盘 && 开启内存字节缓冲区 | 第一 |
FlushRealTimeService | 异步刷盘 && 关闭内存字节缓冲区 | 第二 |
GroupCommitService | 同步刷盘 | 第三 |
MappedFile#落盘
方式 | |||
---|---|---|---|
方式一 | 写入内存字节缓冲区(writeBuffer) | 从内存字节缓冲区(write buffer)提交(commit)到文件通道(fileChannel) | 文件通道(fileChannel)flush |
方式二 | 写入映射文件字节缓冲区(mappedByteBuffer) | 映射文件字节缓冲区(mappedByteBuffer)flush |
flush相关代码
考虑到写入性能,满足 flushLeastPages * OS_PAGE_SIZE
才进行 flush
。
commit相关代码:
考虑到写入性能,满足 commitLeastPages * OS_PAGE_SIZE
才进行 commit
。
FlushRealTimeService
消息插入成功时,异步刷盘时使用。
// 省略代码
说明:实时
flush
线程服务,调用MappedFile#flush
相关逻辑。第 23 至 29 行 :每
flushPhysicQueueThoroughInterval
周期,执行一次flush
。因为不是每次循环到都能满足flushCommitLogLeastPages
大小,因此,需要一定周期进行一次强制flush
。当然,不能每次循环都去执行强制flush
,这样性能较差。第 33 行 至 37 行 :根据
flushCommitLogTimed
参数,可以选择每次循环是固定周期还是等待唤醒。默认配置是后者,所以,每次插入消息完成,会去调用commitLogService.wakeup()
。第 45 行 :调用
MappedFile
进行flush
。第 61 至 65 行 :
Broker
关闭时,强制flush
,避免有未刷盘的数据。
CommitRealTimeService
消息插入成功时,异步刷盘时使用。
和 FlushRealTimeService
类似,性能更好。
// 省略代码
GroupCommitService
消息插入成功时,同步刷盘时使用。
// 省略代码说明:批量写入线程服务。
第 16 至 25 行 :添加写入请求。方法设置了
sync
的原因:this.requestsWrite
会和this.requestsRead
不断交换,无法保证稳定的同步。第 27 至 34 行 :读写队列交换。
第 38 至 60 行 :循环写入队列,进行
flush
。第 43 行 :考虑到有可能每次循环的消息写入的消息,可能分布在两个
MappedFile
(写第N个消息时,MappedFile
已满,创建了一个新的),所以需要有循环2次。第 51 行 :唤醒等待写入请求线程,通过
CountDownLatch
实现第 61 至 66 行 :直接刷盘。此处是由于发送的消息的
isWaitStoreMsgOK
未设置成TRUE
,导致未走批量提交。第 73 至 80 行 :每 10ms 执行一次批量提交。当然,如果
wakeup()
时,则会立即进行一次批量提交。当Broker
设置成同步落盘 && 消息isWaitStoreMsgOK=true
,消息需要略大于 10ms 才能发送成功。当然,性能相对异步落盘较差,可靠性更高,需要我们在实际使用时去取舍。
结尾
写的第二篇与RocketMQ源码相关的博文,看到有阅读、点赞、收藏甚至订阅,很受鼓舞。
《Message存储》比起《Message发送&接收》从难度上说是更大的,当然也是更有趣的,如果存在理解错误或者表达不清晰,还请大家多多包含。如果可以的话,还请麻烦添加 QQ:7685413 进行指出,避免自己的理解错误,给大家造成困扰。
推荐《Kafka设计解析(六)- Kafka高性能架构之道》,作者站在的高度比我高的多的多,嗯,按照李小璐的说法:高一个喜马拉雅山。😈认真啃读《Linux内核设计与实现(原书第3版)》,day day up。
再次感谢大家的阅读、点赞、收藏。
下一篇:《RocketMQ 源码分析 —— Message 拉取与消费》 起航!