RocketMQ 5.x消息不丢失核心配置allAckInSyncStateSet详解
背景
最近在研究RocketMQ
消息不丢失的一些细节,所以关注到了5.x的一个新参数allAckInSyncStateSet
消息一个不丢的代价很大,我们需要再消息丢失和性能之间找到一个平衡,在保证性能的同时尽量减少消息丢失风险
具体消息的可靠度还是要依据具体业务来做决策
举个🌰
RocketMQ
数据刷盘的策略默认是使用异步刷盘,异步刷盘除非服务器真的宕机
我们的数据才会丢失.为了性能我们一般设置异步刷盘,因为消息的丢失风险在合理范围(业务可接受)
RocketMQ宕机并不会丢失,因为存在于操作系统的pageCache里面,而不是JVM内存
主从之间的数据同步,也是有很多配置供我们选择。 比如有如下一些配置
checkSyncStateSetPeriod:检查 SyncStateSet 的时间间隔,检查 SyncStateSet 可能会 shrink SyncState。默认5000(5s) haMaxTimeSlaveNotCatchup:表示 Slave 没有跟上 Master 的最大时间间隔,若在 SyncStateSet 中的 slave 超过该时间间隔会将其从 SyncStateSet 移除。默认为 15000(15s)。 inSyncReplicas:需保持同步的副本组数量,默认为1,allAckInSyncStateSet=true 时该参数无效。 minInSyncReplicas:最小需保持同步的副本组数量,若 SyncStateSet 中副本个数小于 minInSyncReplicas 则 putMessage 直接返回 PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH,默认为1。 allAckInSyncStateSet:若该值为 true,则一条消息需要复制到 SyncStateSet 中的每一个副本才会向客户端返回成功,可以保证消息不丢失。默认为 false。
参考官网 官网:https://rocketmq.apache.org/zh/docs/deploymentOperations/03autofailover/
合理的设置这些参数才能尽可能在保证性能的同时减少在宕机时消息的丢失
allAckInSyncStateSet设置的疑惑
其中我们可以看到一个很关键的参数allAckInSyncStateSet
官方的说法就是若该值为 true,则一条消息需要复制到 SyncStateSet 中的每一个副本才会向客户端返回成功,可以保证消息不丢失。默认为 false
那么就有如下几个疑惑了
每一个副本是指都在SyncStateSet
里面的副本吗
比如现在一个集群有三个节点
Broker-a-master Broker-a-slave broker-a-slave
此时的SyncStateSet
中的数据
在同步中的broker
Broker-a-master 192.168.1.1 Broker-a-slave 192.168.1.2 broker-a-slave 192.168.1.3
如果此时broker-a-master
宕机。随机选出一个slave
broker
切换成master
。
此时的SyncStateSet
数据
在同步中的broker
Broker-a-master 192.168.1.2 Broker-a-slave 192.168.1.3 不在同步中的broker broker-a-slave 192.168.1.1
那么此时宕机的master
(192.168.1.1)还需要返回ack才算消息消息写入成功吗
如果是,那么整个broker-a
集群处于无法写入的状态,也无法恢复,对整个集群的可用性影响太大了
最大需要间隔多久才能将宕机的master
移出到不在同步中的broker
呢?
所以疑惑就有亮点
返回ack是所有存活的 broker
还是在同步中的broker
什么时候将异常的 broker
移出在同步中的broker
下面我们就上面的2个问题我们结合源码分析来找到答案
allAckInSyncStateSet在何处使用
我们全局搜索allAckInSyncStateSet在何处使用
关键字能直接找到org.apache.rocketmq.store.ha.GroupTransferService#doWaitTransfer
中有使用
这里我们可以看到获取的同步副本syncStateSet
是通过autoSwitchHAService.getSyncStateSet()
获取的
所以判断需要返回多少个ack主要是基于syncStateSet
判断的。
即AutoSwitchHAService
的syncStateSet
属性
何时调用org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService#setSyncStateSet
方法来更新syncStateSet
呢
可以看到有三处调用
changeSyncStateSet schedulingSyncBrokerMetadata doReportSyncStateSetChanged
我们先看看doReportSyncStateSetChanged
这个方法
doReportSyncStateSetChanged
org.apache.rocketmq.broker.controller.ReplicasManager#doReportSyncStateSetChanged
可以看到这里主要是与controller
进行通信。
我们查看请求状态码为CONTROLLER_ALTER_SYNC_STATE_SET
然后拿取到controller
的元数据结果进行本地同步。应该是没有相关的同步状态下线操作。我们继续看看schedulingSyncBrokerMetadata
schedulingSyncBrokerMetadata
我们来看看org.apache.rocketmq.broker.controller.ReplicasManager#schedulingCheckSyncStateSet
方法
这里我们看到一个很关键的方法org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAConnection#maybeExpandInSyncStateSet
public Set<String> maybeShrinkInSyncStateSet() {
final Set<String> newSyncStateSet = getSyncStateSet();
final long haMaxTimeSlaveNotCatchup = this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
for (Map.Entry<String, Long> next : this.connectionCaughtUpTimeTable.entrySet()) {
final String slaveAddress = next.getKey();
if (newSyncStateSet.contains(slaveAddress)) {
final Long lastCaughtUpTimeMs = this.connectionCaughtUpTimeTable.get(slaveAddress);
if ((System.currentTimeMillis() - lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchup) {
newSyncStateSet.remove(slaveAddress);
}
}
}
return newSyncStateSet;
}
lastCaughtUpTimeMs
获取值为
long caughtUpTimeMs = this.haService.getDefaultMessageStore().getMaxPhyOffset() == slaveMaxOffset ? System.currentTimeMillis() : this.lastTransferTimeMs;
即如果slave
最大偏移量不等于master
最大偏移量,则不更新同步时间。相等则更新同步时间为最新时间
可以看到会与最大不同步时间进行判断haMaxTimeSlaveNotCatchup
(默认15s)
15s内slave
与master
最大偏移量不同步,则移出syncStateSet
这个检测任务是多久执行一次呢?
可以看到是3s执行一次
所以最多需要15s才能将异常的broker
移除在同步状态的broker
中
总结
总的来说开启allAckInSyncStateSet
最多会在master
宕机后15s内集群写入不可用,等异常的broker
被移出syncStateSet
后就可以继续正常写入了。
实际比如有三个集群
broker-a
broker-b
broker-c
一个集群比如broker-a
在15s内不可写入也是可以接受的,因为还有两个集群可以写入,product
也是有消息重试的。
所以如果不开启allAckInSyncStateSet
。一条消息写入master
后还没来得及同步给slave
就宕机,消息丢失的风险还是太大,线上还是推荐开启allAckInSyncStateSet