查看原文
其他

大揭秘!RocketMQ如何管理消费进度

博文视点 博文视点Broadview 2021-07-06

在企业实践RocketMQ时基本上80%都是不消费问题,而由于消费进度问题导致不消费的问题又是最难确认的和排查的。RocketMQ的消费进度分为本地消费进度管理和远程消费进度管理,分别对应的消费模式是广播消费和集群消费。 

本文选自《RocketMQ分布式消息中间件:核心原理与最佳实践》一书,带你层层揭秘RocketMQ如何管理消费进度。 

什么是消费进度

消费进度,也就是由Broker管理每一个消费者消费Topic的进度,包含正常提交消费进度和重置消费进度,如下:

上图表示一个消费者组A,部署了2个消费者实例consumer instance1和consumer instance2。

- consumer instance1消费queue1和queue2

- consumer instance2消费queue3和queue4

这里的消费进度是指consumer instance1分别消费到queue1和queue2第多少条消息,consumer instance2分别消费到queue3和queue4第多少条消息。 

在集群消费时,消费进度由消费者主动“上报”给Broker,广播消费时由消费者自己本地保存。


为什么需要消费进度

消费进度管理的目的是保证消费者在正常运行状态、重启、异常关闭等状态下都能准确续接“上一次”未处理的消息。 

在RocketMQ中,实现的消费语义叫“至少投递一次”,也就是所有的消息至少有一次机会消费不用担心会丢消息。用户需要实现消费幂等来避免重复投递对业务实际数据的影响。 


什么时候“上报”消费进度

消费者一般在两种情况下“上报”消费进度,消费成功后(包含正常消费成功、重试消费成功)和重置消费进度。如下图2展示了,图3展示了:

消费成功后提交消费进度的过程

重置消费进度的过程
二者共同点:

• 都是由Broker统一管理消费者的消费进度

• 都需要由消费者“主动上报”最新的消费进度

二者的差异点:

• 正常消费时提交消费进度,一般消费进度是向前推进

• 重置消费进度时提交消费进度,消费进度可能向前推进,也可能向后回溯


消费进度管理代码分析

在RocketMQ中,将消费进度管理抽象为消费进度管理接口OffsetStore,该接口有两个实现类: RemoteBrokerOffsetStore和LocalFileOffsetStore,他们分别实现了集群消费、广播消费的消费进度管理。

下图描述了OffsetStore、RemoteBrokerOffsetStore和LocalFileOffsetStore三者的类图关系:

OffsetStore接口定义了消费进度管理的基本方法,具体方法列表如下(方法参数已省略):

load(): 加载全部消费者的消费进度信息

updateOffset(): 更新一个queue的消费进度

readOffset(): 读取一个queue的消费进度

persistAll(): 持久化全部消费进度

persist(): 持久化一个queue的消费进度

removeOffset(): 移除一个queue的消费进度

cloneOffsetTable(): 克隆一个topic的消费进度

updateConsumeOffsetToBroker(): 更新消费进度到Broker


RemoteBrokerOffsetStore的实现是将消费进度信息保存到Broker中;LocalFileOffsetStore的实现是将消费进度信息保存到本地文件中。 

 / 彩蛋1 / 

updateConsumeOffsetToBroker() 这个方法是将消费进度更新到Broker中,想必在LocalFileOffsetStore是没有实现该方法的。通过看源码,也印证了我们的猜想:

接来下以用Push的方式消费普通消息(非顺序消息)为例,具体讲解如何消费成功、重置消费位点整个过程是如何的。

▊ 消费成功,如何提交消费进度?

在RocketMQ中,消费者是一批一批的消费的,Push消费方式默认每批16条消息,消费完成后会调用ConsumeMessageConcurrentlyService。

processConsumeResult()方法处理消费结果,该方法会更新这批消息中对应Topic的queue的消费进度,具体核心代码片段如下:

1long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
2        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
3this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
4        }

以上代码主要涉及3个核心方法removeMessage()、isDropped()、updateOffset()。

removeMessage()方法是将成功消费的消息从本地缓queue中删除,并返回这个queue的消费位点。

isDropped()这个方法是判断这些消息所在的本地queue是否被drop了,如果被drop了消费进度就不更新。一般由于有消费者上线、下线、broker宕机等引发消费者负载均衡,导致这个queue已经分配给其他消费者。

updateOffset(): 更新本地内存中的消费位点。

实现代码如下:

1public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly){
2    if (mq != null) {
3        AtomicLong offsetOld = this.offsetTable.get(mq);
4        if (null == offsetOld) {
5           offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
6        }
7    if (null != offsetOld) {
8        if (increaseOnly) {
9            MixAll.compareAndIncreaseOnly(offsetOld, offset);
10        } else {
11            offsetOld.set(offset);
12               }
13        }
14    }
15}


代码中this.offsetTable的类型是ConcurrentMap<MessageQueue,AtomicLong>,表示一个本地queue和其消费位点的对应关系,看到这里大家不禁心中会冒起疑问: 不是更新位点到Broker中嘛? 是的,确实不是。在RocketMQ的设计中,本地消费位点和Broker位点同步是异步的。大家如果顺着persistAll()方法找调用关系,会发现RocketMQ客户端在启动时会初始化一个定时任务调用persistAll()方法,将offsetTable中的本地位点信息更新到Broker中。

persistAll()方法主要是通过调用updateConsumeOffsetToBroker()方法将消费进度更新到Broker的,核心代码片段如下:

1public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
2        MQBrokerException, InterruptedException, MQClientException {
3       ...
4            if (isOneway) {
5                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
6                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
7            } else {
8                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
9                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
10            }
11        }
12...
13    }

updateConsumeOffsetToBroker()方法将一个queue的消费进度信息封装为一个RPC请求的requestHeader,再加上请求代码RequestCode.UPDATE_CONSUMER_OFFSET一起封装成为一个RPC的请求命令RemotingCommand,最后调用网络层方法invokeOneway()将该RPC请求发送给Broker。 

 / 彩蛋2 /  

这里特别注意,RocketMQ默认是通过invokeOneway()方法将该请求发送出去的,也就是说客户端只管发请求。不管Broker的返回结果。如果网络不好或者Broker处理慢,可能发现一个现象: 消费者一直在正常消费,而Broker的消费进度信息更新很慢。

重置消费进度如何生效?

RocketMQ目前支持重置消费进度到某个具体时间,重置消费位点逻辑中客户端部分和正常消费一致,只是消费进度更新发起者是RocketMQ Console,具体过程如下图6所示:

第一步,用户可以在RocketMQ Console的Topic页面,重置一个Topic的某一个消费者组的消费进度到某个时刻。 

第二步,当Broker收到Console发送的重置消费进度请求后,会根据重置时间查找该时间对应的每个queue的消费位点,然后将这些信息封装后发送给每一个消费者实例。

第三步,消费者收到Broker发送的重置位点请求后,更新本地消费进度。

 / 彩蛋3 /  

这里有个坑,除了java客户端之外,如果是CPP/Python/Go等基于CPP客户端封装的多语言客户端会重置失败,原因时Broker在封装请求时,只是按照java协议封装了请求包,该包其他语言会解析失败,导致重置位点失败。目前笔者已经提PR(pr id=1930)处理了。 

第四步,消费者本地的定时任务定时将本地位点信息同步到Broker。(逻辑和成功消费时一致)


通过我们大量的实践发现,何时提交消费进度、如何提交消费进度是排查问题的主要依据,在掌握了这两点后,问题基本迎刃而解。

想要了解更多关于RocketMQ的原理实现可以阅读《RocketMQ分布式消息中间件:核心原理与最佳实践》一书。

这是一本讲解RocketMQ最佳实践的系统化书籍,作者有在RocketMQ在线高可靠场景下的深度开发和运营经验,踩过很多坑,总结出宝贵的经验。内容清晰易懂,又结合了最佳实践的经验,可以当作RocketMQ初学的参考书,也可以当作在线深度大规模使用的工具书。

关于作者

Apache RocketMQ北京社区联合发起人,RocketMQ项目Commiter,RocketMQ社区Python客户端项目负责人。目前就职于北京某在线教育公司,担任高级大数据工程师,曾负责公司消息与数据流平台,目前主要负责OLAP团队,对分布式存储计算系统设计有丰富经验,热衷于知识分享和社区活动。

▼扫码获取本书详情▼




如果喜欢本文
欢迎 在看留言分享至朋友圈 三连


 热文推荐  





▼点击阅读原文,获取本书详情

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存