查看原文
其他

RocketMQ msgId与offsetMsgId释疑(实战篇)

丁威 中间件兴趣圈 2022-11-10

点击上方“中间件兴趣圈”选择“设为星标”

做积极的人,越努力越幸运!

本文将详细介绍消息发送、消息消费、RocketMQ queryMsgById 命令以及 rocketmq-console 等使用场景中究竟是用的哪一个ID。

1、抛出问题


1.1 从消息发送看消息ID

package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
    public static void main(String[] args)  {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
            Message msg = new Message("TestTopic" /* Topic */,null /* Tag */, ("Hello RocketMQ test1" ).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
            producer.shutdown();
        } catch (Throwable e) {
            e.printStackTrace();
        }

    }
}

执行效果如图所示:

即消息发送会返回 msgId 与 offsetMsgId。

1.2 从消息消费看消息ID

package org.apache.rocketmq.example.quickstart;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TestTopic""*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) 
{
                System.out.println("MessageExt msg.getMsgId():" +  msgs.get(0).getMsgId());
                System.out.println("-------------------分割线-----------------");
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

执行效果如图所示:

不知道大家是否有注意到,调用 msgs.get(0).getMsgId()返回的msgId 与直接输出msgs中的 msgId 不一样,那这又是为什么呢?答案在本文的第二部分有详细分析。

2、消息ID释疑


从消息发送的结果可以得知,RocketMQ 发送的返回结果会返回 msgId 与 offsetMsgId,那这两个 msgId 分别是代表什么呢?

  • msgId:该 ID 是消息发送者在消息发送时会首先在客户端生成,全局唯一,在 RocketMQ 中该 ID 还有另外的一个叫法:uniqId,无不体现其全局唯一性。

  • offsetMsgId:消息偏移ID,该 ID 记录了消息所在集群的物理地址,主要包含所存储 Broker 服务器的地址( IP 与端口号)以及所在commitlog 文件的物理偏移量。

2.1 msgId 即全局唯一 ID 构建规则

从这张图可以看出,msgId确实是客户端生成的,接下来我们详细分析一下其生成算法。

MessageClientIDSetter#createUniqID

public static String createUniqID() {
    StringBuilder sb = new StringBuilder(LEN * 2);
    sb.append(FIX_STRING);    // @1
    sb.append(UtilAll.bytes2string(createUniqIDBuffer()));  // @2
    return sb.toString();
}

一个 uniqID 的构建主要分成两个部分:FIX_STRING 与唯一 ID 生成算法,顾名思义,FIX_STRING 就是一个客户端固定一个前缀,那接下来先看一下固定字符串的生成规则。

2.1.1 FIX_STRING

MessageClientIDSetter静态代码块

static {
    byte[] ip;
    try {
        ip = UtilAll.getIP();
    } catch (Exception e) {
        ip = createFakeIP();
    }
    LEN = ip.length + 2 + 4 + 4 + 2;
    ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
    tempBuffer.position(0);
    tempBuffer.put(ip);
    tempBuffer.position(ip.length);
    tempBuffer.putInt(UtilAll.getPid());
    tempBuffer.position(ip.length + 2);
    tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
    FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
    setStartTime(System.currentTimeMillis());
    COUNTER = new AtomicInteger(0);
}

从这里可以看出 FIX_STRING 的主要由:客户端的IP、进程ID、加载 MessageClientIDSetter 的类加载器的 hashcode。

2.1.2 唯一性算法

msgId 的唯一性算法由 MessageClientIDSetter 的createUniqIDBuffer 方法实现。

private static byte[] createUniqIDBuffer() {
    ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
    long current = System.currentTimeMillis();
    if (current >= nextStartTime) {
        setStartTime(current);
    }
    buffer.position(0);
    buffer.putInt((int) (System.currentTimeMillis() - startTime));
    buffer.putShort((short) COUNTER.getAndIncrement());
    return buffer.array();
}

可以得出 msgId 的后半段主要由:当前时间与系统启动时间的差值,以及自增序号。

2.2 offsetMsgId构建规则

在消息 Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的文件,然后会再次生成一个id,代码中虽然也叫 msgId,其实这里就是我们常说的 offsetMsgId,即记录了消息的物理偏移量,故我们重点来看一下其具体生成规则:
MessageDecoder#createMessageIdpublic static String createMessageId(final ByteBuffer input ,
            final ByteBuffer addr, final long offset) 
{
    input.flip();
    int msgIDLength = addr.limit() == 8 ? 16 : 28;
    input.limit(msgIDLength);
    input.put(addr);
    input.putLong(offset);
    return UtilAll.bytes2string(input.array());
}

首先结合该方法的调用上下文,先解释一下该方法三个入参的含义:

  • ByteBuffer input
    用来存放 offsetMsgId 的字节缓存区( NIO 相关的基础知识)

  • ByteBuffer addr
    当前 Broker 服务器的 IP 地址与端口号,即通过解析 offsetMsgId 从而得到消息服务器的地址信息。

  • long offset
    消息的物理偏移量。
    即构成 offsetMsgId 的组成部分:Broker 服务器的 IP 与端口号、消息的物理偏移量。

温馨提示:即在 RocketMQ 中,只需要提供 offsetMsgId,可以不必知道该消息所属的 topic 信息即可查询该条消息的内容。

2.3 消息发送与消息消费返回的消息ID信息

消息发送时会在 SendSesult 中返回 msgId、offsetMsgId,在了解了这个两个 ID 的含义时则问题不大,接下来重点介绍一下消息消费时返回的 msgId 到底是哪一个。

在消息消费时,我们更加希望因为 msgId (即客户端生成的全局唯一性ID),因为该全局性 ID 非常方便实现消费端的幂等。

在本文的1.2节我们也提到一个现象,为什么如下图代码中输出的 msgId 会不一样呢?


在客户端返回的 msg 信息,其最终返回的对象是  MessageClientExt ,继承自 MessageExt。
那我们接下来分别看一下其 getMsgId() 方法与 toString 方法即可。@Override
public String getMsgId() {
    String uniqID = MessageClientIDSetter.getUniqID(this);
    if (uniqID == null) {
        return this.getOffsetMsgId();
    } else {
        return uniqID;
    }
}

原来在调用 MessageClientExt 中的 getMsgId 方法时,如果消息的属性中存在其唯一ID,则返回消息的全局唯一ID,否则返回消息的 offsetMsgId。

而 MessageClientExt 方法并没有重写 MessageExt 的 toString 方法,其实现如图所示:

故返回的是 MessageExt中 的 msgId,该 msgId 存放的是 offsetMsgId,所以才造成了困扰。

温馨提示:如果消息消费失败需要重试,RocketMQ 的做法是将消息重新发送到 Broker 服务器,此时全局 msgId 是不会发送变化的,但该消息的 offsetMsgId 会发送变化,因为其存储在服务器中的位置发生了变化。

3、实践经验


在解答了消息发送与消息消费关于msgId与offsetMsgId的困扰后,再来介绍一下如果根据 msgId 去查询消息。

想必大家对 rocketmq-console ,那在消息查找界面,展示的消息列表中返回的 msgId 又是哪一个呢?


这里的 Message ID 返回的是消息的全局唯一ID。

其实 RokcetMQ 也提供了 queryMsgById 命令来查看消息的内容,不过这里的 msgId 是 offsetMsgId,我们首先将全局唯一ID传入命令,其执行效果如下:

发现报错,那我们将 offsetMsgId 传入其执行效果如图所示:
但在 rocketmq-console 的根据消息ID去查找消息,无论传入哪个msgId,下图该功能都能返回正确的结果:
这是因为 rocketmq-console 做了兼容,首先将传入的 msgId 用 queryMsgById 该命令去查,如果报错,则当成 uniqID(全局ID)去查,首先全局ID会存储在消息的属性中,并会创建 Hash 索引,即可用通过 indexfile 快速定位到该条消息。

点【在看】与转发是一种美德,期待您的认可与鼓励,越努力越幸运。


欢迎加入我的知识星球,一起交流源码,探讨架构,打造高质量的技术交流圈,长按如下二维码


中间件兴趣圈 知识星球 正在对如下话题展开如火如荼的讨论:


1、【让天下没有难学的Netty-网络通道篇】

1、Netty4 Channel概述(已发表
2、Netty4 ChannelHandler概述(已发表
3、Netty4事件处理传播机制(已发表
4、Netty4服务端启动流程(已发表
5、Netty4 NIO 客户端启动流程
6、Netty4 NIO线程模型分析
7、Netty4编码器、解码器实现原理
8、Netty4 读事件处理流程
9、Netty4 写事件处理流程
10、Netty4 NIO Channel其他方法详解

2、Java 并发框架(JUC) 探讨【面试神器
3、源码分析Alibaba Sentienl 专栏背后的写作与学习技巧

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存