我说分布式事务之消息最终一致性事务(二):RocketMQ的实现
来源:https://0x9.me/A76YN
号外:最近整理了一下以前编写的一系列Spring Boot内容,整了个《Spring Boot基础教程》的PDF,关注我,回复:001,快来领取吧~!更多内容持续整理中,帮助大家更好的学习Spring相关的系列内容!
上一篇《我说分布式事务之消息最终一致性事务(一):原理及实现》中,我们讲解了可靠消息最终一致性的实现原理及如何基于一款开源的消息中间件,实现一个可靠消息服务的思路。
本文,我们讲解如何利用开源消息中间件RocketMQ的特性–事务消息,实现基于消息一致性的最终一致的分布式事务。
RocketMQ是阿里巴巴开源的一款高性能、高可靠的消息中间件,经历过双11等大流量高并发的大考,是国内开源界的翘楚,在业界有着广泛的应用。
我假设你对RocketMQ有着一定的了解,就不对它的基础概念及使用做进一步的展开,如果需要,请参考官方文档做进一步的学习了解RocketMQ官网。
按照我们的套路,先上图。
原理简介
RocketMQ提供了类似X/Open XA的分布事务功能,通过MQ的事务消息能达到分布式事务的最终一致。
发送方在业务执行开始会先向消息队列中投递 “半消息” ,半消息即暂时不会真正投递的消息,当发送方(即生产者)将消息成功发送给了MQ服务端且并未将该消息的二次确认结果返回,此时消息状态是“暂时不可投递”状态(可以认为是状态未知)。该状态下的消息即半消息。
如果出现网络闪断、生产者应用重启等原因导致事务消息二次确认丢失,MQ服务端会通过扫描发现某条消息长期处于 “半消息” 状态,MQ服务端会主动向生产者查询该消息的最终状态是处于Commit(消息提交)还是Rollback(消息回滚)。这个过程称为消息回查。
有了上述的概念,我们详细解释一下事务消息交互的过程。
首先,MQ发送方向MQ服务(即RocketMQ的Broker)发送半消息。
MQ服务端会将消息做持久化处理,并发送ACK确认消息已经发送成功。
MQ发送方执行本地事务
MQ发送方根据本地事务执行的结果向MQ服务提交二次确认:如果本地事务执行成功,则提交消息状态为Commit,否则为Rollback。MQ服务端收到Commit状态的消息将消息标记为可投递状态,订阅方最终会收到该条消息。如果收到的是Rollback,最终MQ服务端会删除该条半消息,订阅方不会接收到这条消息。
如果出现网络闪断、应用重启等情况,4阶段替提交的二次确认最终并未能到达MQ服务端,一定时间之后,MQ服务端会对此消息发起回查操作,确认发送方本地事务的执行状态。
发送方需要实现服务回查逻辑供MQ服务端进行回调。当发送方收到回查后,需要检查对应消息的本地事务执行的最终结果,此处也需要根据本地事务的成功或失败返回Commit或者Rollback,即再次提交消息状态的二次确认,MQ服务端仍会按照步骤4对该半消息进行操作。
注意 1-4 为事务消息的发送过程, 5-6 为事务消息的回查过程。
如何使用
此处我引用官网的demo,进行简单说明。
事务状态:rocketmq定义了三种事务状态
TransactionStatus.CommitTransaction:消息提交,当消息状态为 CommitTransaction,表示允许消费者允许消费当前消息
TransactionStatus.RollbackTransaction:消息回滚,表示MQ服务端将会删除当前半消息,不允许消费者消费。
TransactionStatus.Unknown:中间状态,表示MQ服务需要发起回查操作,检测当前发送方本地事务的执行状态。
发送事务消息
(1)创建事务消息生产者
使用TransactionMQProducer创建消息发送客户端。并指定一个唯一的生产者组producerGroup,当执行完本地事务,需要返回给MQ服务端执行结果,返回上面的三种事务状态。CommitTransaction、RollbackTransaction、Unknown
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
hread.sleep(1000);
}
producer.shutdown();
}
}
(2) 实现TransactionListener接口
实现executeLocalTransaction方法。消息生产者需要在executeLocalTransaction中执行本地事务当事务半消息提交成功,执行完毕后需要返回事务状态码。 实现checkLocalTransaction方法,该方法用于进行本地事务执行情况回查,并回应事务状态给MQ的broker,执行完成之后需要返回对应的事务状态码。
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
对于消费者,需要通过业务参数保证消费的幂等。
附录:RocketMQ事务消息实现原理
有些同学想要深入了解RocketMQ实现事务消息操作的原理,这里我引用一下官方的一段博客的内容,具体的地址为:里程碑|Apache RocketMQ 正式开源分布式事务消息
RocketMQ事务消息在实现上充分利用了RocketMQ本身机制,在实现零依赖的基础上,同样实现了高性能、可扩展、全异步等一系列特性。
在具体实现上,RocketMQ通过使用Half Topic 以及Operation Topic 两个内部队列来存储事务消息推进状态,如下图所示:
其中,Half Topic对应队列中存放着prepare消息,Operation Topic对应的队列则存放了prepare message对应的commit/rollback消息,消息体中则是prepare message对应的offset,服务端通过比对两个队列的差值来找到尚未提交的超时事务,进行回查。
在具体实现上,事务消息作为普通消息的一个应用场景,在实现过程中进行了分层抽象,从而避免了对RocketMQ原有存储机制的修改,如下图所示:
从用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可;而在service层,则对事务消息的两阶段提交进行了抽象,同时针对超时事务实现了回查逻辑,通过不断扫描当前事务推进状态,来不断反向请求Producer端获取超时事务的执行状态,在避免事务挂起的同时,也避免了Producer端的单点故障。而在存储层,RocketMQ通过Bridge封装了与底层队列存储的相关操作,用以操作两个对应的内部队列,用户也可以依赖其它他存储介质实现自己的service,RocketMQ会通过ServiceProvider加载进来。
从上述事务消息设计中可以看到,RocketMQ事务消息较好的解决了事务的最终一致性问题,事务发起方仅需要关注本地事务执行以及实现回查接口给出事务状态判定等实现,而且在上游事务峰值高时,可以通过消息队列,避免对下游服务产生过大压力。
事务消息不仅适用于上游事务对下游事务无依赖的场景,还可以与一些传统分布式事务架构相结合,而MQ的服务端作为天生的具有高可用能力的协调者,使得我们未来可以基于RocketMQ提供一站式轻量级分布式事务解决方案,用以满足各种场景下的分布式事务需求。
热门推荐:
·END·
近期热文:
Spring Cloud Stream 学习小清单
SpringBoot:自定义starter
重磅!Github 开放无数量限制的免费私有仓库!
百度面试题:求数组最大值
我说分布式事务之消息最终一致性事务(一):原理及实现
疑案追踪:Spring Boot内存泄露排查记
了解Java中的内存泄漏
Git 常用命令清单,掌握这些,轻松驾驭版本管理
优先级队列(头条面试题)
来谈下高并发和分布式中的幂等处理
你应该知道的7个写出更好的 Java 代码的技巧
看完,赶紧点个“好看”鸭
点鸭点鸭
↓↓↓↓