查看原文
其他

3 张图带你彻底理解 RocketMQ 事务消息

The following article is from 君哥聊技术 Author 朱晋君

点击关注公众号,一周多次包邮送书

来源:将经授权转 君哥聊技术(ID:gh_1f109b82d301)

作者:朱晋君

事务消息是分布式事务的一种解决方案,RocketMQ 有成熟的事务消息模型,今天就来聊一聊 RocketMQ 事务消息实现机制。

假如有一个电商场景,用户下单后,账户服务从用户账户上扣减金额,然后通知库存服务给用户发货,这两个服务需要在一个分布式事务内完成。

这时,账户服务作为 Producer,库存服务作为 Consumer,见下面消息流程:

  1. 账户服务作为 Producer 向 Broker 发送一条 half 消息;

  2. half 消息发送成功后,执行本地事务,执行成功则向 Broker 发送 commit 请求,否则发送 rollback 请求;

  3. 如果 Broker 收到的是 rollback 请求,则删除保存的 half 消息;

  4. 如果 Broker 收到的是 commit 请求,则保存扣减库存消息(这里的处理是把消息从 half 队列投递到真实的队列),然后删除保存的 half 消息

  5. 如果 Broker 没有收到请求,则会发送请求到 Producer 查询本地事务状态,然后根据 Producer 返回的本地状态做 commit/rollback 相关处理。

1 half 消息

上面电商的案例中,RocketMQ 解决分布式事务的第一步是账户服务发送 half 消息。

首先看官网一个发送事务消息的示例:

public static void main(String[] args) throws MQClientException, InterruptedException {
 TransactionListener transactionListener = new TransactionListenerImpl();
 TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
 ExecutorService executorService = new ThreadPoolExecutor(25100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
  @Override
  public Thread newThread(Runnable r) {
   Thread thread = new Thread(r);
   thread.setName("client-transaction-msg-check-thread");
   return thread;
  }
 });

 producer.setExecutorService(executorService);
 producer.setTransactionListener(transactionListener);
 producer.start();

 String[] tags = new String[] {"TagA""TagB""TagC""TagD""TagE"};
 for (int i = 0; i < 10; i++) {
  try {
   Message msg =
    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
   SendResult sendResult = producer.sendMessageInTransaction(msg, null);
   System.out.printf("%s%n", sendResult);

   Thread.sleep(10);
  } catch (MQClientException | UnsupportedEncodingException e) {
   e.printStackTrace();
  }
 }

 for (int i = 0; i < 100000; i++) {
  Thread.sleep(1000);
 }
 producer.shutdown();
}

上面的代码中 Producer 有一个 TransactionListener 属性,这个由开发者通过实现这个接口来自己定义。这个接口有两个方法:

  • 提交本地事务 executeLocalTransaction

  • 检查本地事务状态 checkLocalTransaction

下面代码是发送事务消息的方法:

//类 DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg,
 final LocalTransactionExecuter localTransactionExecuter, final Object arg)
 throws MQClientException 
{
 TransactionListener transactionListener = getCheckListener();
 //省略验证逻辑

 SendResult sendResult = null;
 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
 try {
  sendResult = this.send(msg);
 } catch (Exception e) {
  throw new MQClientException("send message Exception", e);
 }

 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
 //省略发送结果处理
 try {
  this.endTransaction(msg, sendResult, localTransactionState, localException);
 } catch (Exception e) {
 }

 TransactionSendResult transactionSendResult = new TransactionSendResult();
 //省略封装属性
 return transactionSendResult;
}

从这段代码中看到,在发送消息前,给消息封装了一个属性PROPERTY_TRANSACTION_PREPARED,通过这个属性可以找到 Broker 端的处理。

Broker 保存 half 消息时,把消息 topic 改为 RMQ_SYS_TRANS_HALF_TOPIC,然后把消息投递到 queueId 等于 0 的队列。投递成功后给 Producer 返回 PutMessageStatus.PUT_OK。代码如下:

public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
 return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
  String.valueOf(msgInner.getQueueId()));
 msgInner.setSysFlag(
  MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
 msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
 msgInner.setQueueId(0);
 msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
 return msgInner;
}

2 执行本地事务

上一节讲到,Producer 发送事务消息时,会给一个 transactionListener,发送 half 消息成功后,会通过 transactionListener 的 executeLocalTransactionBranch 提交本地事务,代码如下:

//类 DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg,
 final LocalTransactionExecuter localTransactionExecuter, final Object arg)
 throws MQClientException 
{
 //省略部分代码
 SendResult sendResult = null;
 //省略部分代码
 try {
  sendResult = this.send(msg);
 } catch (Exception e) {
  throw new MQClientException("send message Exception", e);
 }

 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
 Throwable localException = null;
 switch (sendResult.getSendStatus()) {
  case SEND_OK: {
   try {
    //省略部分代码
    if (null != localTransactionExecuter) {
        //这个分支已经废弃
     localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
    } else if (transactionListener != null) {
     //执行本地事务
     localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
    }
    //省略部分代码
   } catch (Throwable e) {
   }
  }
  break;
  //省略其他 case
 }

 try {
  this.endTransaction(msg, sendResult, localTransactionState, localException);
 } catch (Exception e) {
  log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
 }
    //省略部分代码
 return transactionSendResult;
}

从上面代码中可以看到,本地事务执行结束后,会调用一个 endTransaction 方法,这个就是向 Broker 发送 commit/rollback,也可能发送 UNKNOW,封装到 requestHeader 的 commitOrRollback 的属性中。这个请求的请求码是 END_TRANSACTION。

3 commit/rollback 处理

根据请求码 END_TRANSACTION 可以找到 Broker 端对事务消息的处理。代码如下:

//EndTransactionProcessor 类
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
 RemotingCommandException 
{
 //省略部分逻辑
 OperationResult result = new OperationResult();
 if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
     //查找出 half 消息
  result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
  if (result.getResponseCode() == ResponseCode.SUCCESS) {
      //检查 groupId 和消息偏移量是否合法
   RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
   if (res.getCode() == ResponseCode.SUCCESS) {
    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
    //删除PROPERTY_TRANSACTION_PREPARED属性
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
    RemotingCommand sendResult = sendFinalMessage(msgInner);
    if (sendResult.getCode() == ResponseCode.SUCCESS) {
        //删除 half 消息
     this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
    }
    return sendResult;
   }
   return res;
  }
 } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
  result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
  if (result.getResponseCode() == ResponseCode.SUCCESS) {
   RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
   if (res.getCode() == ResponseCode.SUCCESS) {
    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
   }
   return res;
  }
 }
 response.setCode(result.getResponseCode());
 response.setRemark(result.getResponseRemark());
 return response;
}

这段代码逻辑很清晰,首先查找出 half 消息,然后对查找出的消息进行检查(groupId 和消息偏移量是否合法),如果是 commit,则去除事务消息准备阶段属性,重新把消息投递到原始队列,然后删除 half 消息。如果是 rollback,则直接删除 half 消息。

注意:对于 UNKNOW 的类型,这里直接返回 null,上面代码没有贴出来。

4 check 事务状态

Broker 初始化的时候,会初始化一个 TransactionalMessageServiceImpl 线程,这个线程会定时检查过期的消息,通过向 Producer 发送 check 消息来获取事务状态。代码如下:

//TransactionalMessageCheckService
protected void onWaitEnd() {
    //超时时间,默认 6s
 long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
 //最大检查次数,默认 15
 int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
 long begin = System.currentTimeMillis();
 log.info("Begin to check prepare message, begin time:{}", begin);
 this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
 log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

这里有两个参数需要注意:

  • 事务消息超时时间,超时后会向 Producer 发送 check 消息检查本地事务状态,默认 6s;
  • 最大检查次数,Broker 每次向 Producer 发送 check 消息后检查次数加 1,超过最大检查次数后 half 消息被丢弃,默认最大检查次数是 15;
    注意:这里的丢弃是把消息写入了一个新的队列,Topic 为 TRANS_CHECK_MAX_TIME_TOPIC,queueId 为 0。
  • 文件保存时间,默认72 小时。

检查事务消息的流程如下:

Producer 收到 check 消息后,最终调用 TransactionListener 中定义的 checkLocalTransaction 方法,查询本地事务执行状态,然后发送给 Broker。

需要注意的是,check 消息发送给 Broker 时,会在请求 Header 中给 fromTransactionCheck 属性赋值为 true,以标记是 check 消息。

Broker 收到 check 响应消息后,处理逻辑跟第 3 节的处理逻辑一样,唯一不同的是,这里针对 check 消息和非 check 消息打印了不同的日志。

5 总结

从上面代码的分析可以看到,RocketMQ 的事务消息实现机制非常简洁。使用事务消息时自己定义 TransactionListener,实现执行本地事务 executeLocalTransaction 和检查本地事务状态 checkLocalTransaction 这两个方法,然后使用 TransactionMQProducer 进行发送。

最后,附一张 Producer 端的 UML 类图:

·················END·················

推荐阅读

• 面试突击:说一下MySQL事务隔离级别?• 5张图带你理解 RocketMQ 顺序消息实现机制• 聊聊异步编程的 7 种实现方式• 从-1开始实现一个中间件• 各大主流编程语言性能PK,结果出乎意料• HTTP 新增的 103 状态码,这次终于派上用场了!• 吐血推荐17个提升开发效率的“轮

👇更多内容请点击👇

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存