(给ImportNew加星标,提高Java技能)
作者:黄哥
分布式事务是微服务实践中一个比较棘手的问题,在笔者所实施的微服务实践方案中,都采用了折中或者规避强一致性的方案。参考Ebay多年前提出的本地消息表方案,基于RabbitMQ和MySQL(JDBC)做了轻量级的封装,实现了低入侵性的事务消息模块。本文的内容就是详细分析整个方案的设计思路和实施。环境依赖如下:
JDK1.8+
spring-boot-start-web:2.x.x
spring-boot-start-jdbc:2.x.x
spring-boot-start-amqp:2.x.x
HikariCP:3.x.x(spring-boot-start-jdbc自带)
mysql-connector-java:5.1.48
redisson:3.12.1
事务消息原则上只适合弱一致性(或者说最终一致性)的场景,常见的弱一致性场景如:
用户服务完成了注册动作,向短信服务推送一条营销相关的消息。
信贷体系中,订单服务保存订单完毕,向审批服务推送一条待审批的订单记录信息。
……
强一致性的场景一般不应该选用事务消息。
一般情况下,要求强一致性说明要严格同步,也就是所有操作必须同时成功或者同时失败,这样就会引入同步带来的额外消耗。
如果一个事务消息模块设计合理,补偿、查询、监控等等功能都完毕,由于系统交互是异步的,整体吞吐要比严格同步高。在笔者负责的业务系统中基于事务消息使用还定制了一条基本原则:消息内容正确的前提下,消费方出现异常需要自理。
简单来说就是:上游保证了自身的业务正确性,成功推送了正确的消息到RabbitMQ就认为上游义务已经结束。
为了降低代码的入侵性,事务消息需要借助Spring的编程式事务或者声明式事务。编程式事务一般依赖于TransactionTemplate,而声明式事务依托于AOP模块,依赖于注解@Transactional。
接着需要自定义一个事务消息功能模块,新增一个事务消息记录表(其实就是本地消息表),用于保存每一条需要发送的消息记录。事务消息功能模块的主要功能是:
保存消息记录。
推送消息到RabbitMQ服务端。
消息记录的查询、补偿推送等等。
在事务执行的逻辑单元里面,需要进行待推送的事务消息记录的保存,也就是:本地(业务)逻辑和事务消息记录保存操作绑定在同一个事务。
发送消息到RabbitMQ服务端这一步需要延后到事务提交之后,这样才能保证事务提交成功和消息成功发送到RabbitMQ服务端这两个操作是一致的。
为了把保存待发送的事务消息和发送消息到RabbitMQ两个动作从使用者感知角度合并为一个动作,这里需要用到Spring特有的事务同步器TransactionSynchronization,这里分析一下事务同步器的主要方法的回调位置,主要参考AbstractPlatformTransactionManager#commit()或者AbstractPlatformTransactionManager#processCommit()方法:
上图仅仅演示了事务正确提交的场景(不包含异常的场景)。这里可以明确知道,事务同步器TransactionSynchronization的afterCommit()和afterCompletion(int status)方法都在真正的事务提交点AbstractPlatformTransactionManager#doCommit()之后回调,因此可以选用这两个方法其中之一用于执行推送消息到RabbitMQ服务端,整体的伪代码如下:
@Transactionalpublic Dto businessMethod(){business transaction code block ...// 保存事务消息[]// 注册事务同步器 - 在afterCommit()方法中推送消息到RabbitMQ[]business transaction code block ...}
<properties><spring.boot.version>2.2.4.RELEASE</spring.boot.version><redisson.version>3.12.1</redisson.version><mysql.connector.version>5.1.48</mysql.connector.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.connector.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>${redisson.version}</version></dependency></dependencies>
CREATE TABLE `t_transactional_message`(id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,creator VARCHAR(20) NOT NULL DEFAULT 'admin',editor VARCHAR(20) NOT NULL DEFAULT 'admin',deleted TINYINT NOT NULL DEFAULT 0,current_retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '当前重试次数',max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT '最大重试次数',queue_name VARCHAR(255) NOT NULL COMMENT '队列名',exchange_name VARCHAR(255) NOT NULL COMMENT '交换器名',exchange_type VARCHAR(8) NOT NULL COMMENT '交换类型',routing_key VARCHAR(255) COMMENT '路由键',business_module VARCHAR(32) NOT NULL COMMENT '业务模块',business_key VARCHAR(255) NOT NULL COMMENT '业务键',next_schedule_time DATETIME NOT NULL COMMENT '下一次调度时间',message_status TINYINT NOT NULL DEFAULT 0 COMMENT '消息状态',init_backoff BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,单位为秒',backoff_factor TINYINT NOT NULL DEFAULT 2 COMMENT '退避因子(也就是指数)',INDEX idx_queue_name (queue_name),INDEX idx_create_time (create_time),INDEX idx_next_schedule_time (next_schedule_time),INDEX idx_business_key (business_key)) COMMENT '事务消息表';CREATE TABLE `t_transactional_message_content`(id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,message_id BIGINT UNSIGNED NOT NULL COMMENT '事务消息记录ID',content TEXT COMMENT '消息内容') COMMENT '事务消息内容表';
public class TransactionalMessage {private Long id;private LocalDateTime createTime;private LocalDateTime editTime;private String creator;private String editor;private Integer deleted;private Integer currentRetryTimes;private Integer maxRetryTimes;private String queueName;private String exchangeName;private String exchangeType;private String routingKey;private String businessModule;private String businessKey;private LocalDateTime nextScheduleTime;private Integer messageStatus;private Long initBackoff;private Integer backoffFactor;}public class TransactionalMessageContent {private Long id;private Long messageId;private String content;}
然后定义dao接口(这里暂时不展开实现的细节代码,存储使用MySQL,如果要替换为其他类型的数据库,只需要使用不同的实现即可):
public interface TransactionalMessageDao {void insertSelective(TransactionalMessage record);void updateStatusSelective(TransactionalMessage record);List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime,LocalDateTime maxScheduleTime,int limit);}public interface TransactionalMessageContentDao {void insert(TransactionalMessageContent record);List<TransactionalMessageContent> queryByMessageIds(String messageIds);}
接着定义事务消息服务接口TransactionalMessageService:
// 对外提供的服务类接口public interface TransactionalMessageService {void sendTransactionalMessage(Destination destination, TxMessage message);}public enum ExchangeType {FANOUT("fanout"),DIRECT("direct"),TOPIC("topic"),DEFAULT(""),;private final String type;}// 发送消息的目的地public interface Destination {ExchangeType exchangeType();String queueName();String exchangeName();String routingKey();}public class DefaultDestination implements Destination {private ExchangeType exchangeType;private String queueName;private String exchangeName;private String routingKey;public ExchangeType exchangeType() {return exchangeType;}public String queueName() {return queueName;}public String exchangeName() {return exchangeName;}public String routingKey() {return routingKey;}}// 事务消息public interface TxMessage {String businessModule();String businessKey();String content();}public class DefaultTxMessage implements TxMessage {private String businessModule;private String businessKey;private String content;public String businessModule() {return businessModule;}public String businessKey() {return businessKey;}public String content() {return content;}}// 消息状态public enum TxMessageStatus {/*** 成功*/SUCCESS(1),/*** 待处理*/PENDING(0),/*** 处理失败*/FAIL(-1),;private final Integer status;}
TransactionalMessageService的实现类是事务消息的核心功能实现,代码如下:
4jpublic class RabbitTransactionalMessageService implements TransactionalMessageService {private final AmqpAdmin amqpAdmin;private final TransactionalMessageManagementService managementService;private static final ConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>();public void sendTransactionalMessage(Destination destination, TxMessage message) {String queueName = destination.queueName();String exchangeName = destination.exchangeName();String routingKey = destination.routingKey();ExchangeType exchangeType = destination.exchangeType();// 原子性的预声明QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> {Queue queue = new Queue(queueName);amqpAdmin.declareQueue(queue);Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType());amqpAdmin.declareExchange(exchange);Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();amqpAdmin.declareBinding(binding);return true;});TransactionalMessage record = new TransactionalMessage();record.setQueueName(queueName);record.setExchangeName(exchangeName);record.setExchangeType(exchangeType.getType());record.setRoutingKey(routingKey);record.setBusinessModule(message.businessModule());record.setBusinessKey(message.businessKey());String content = message.content();// 保存事务消息记录managementService.saveTransactionalMessageRecord(record, content);// 注册事务同步器TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {public void afterCommit() {managementService.sendMessageSync(record, content);}});}}
消息记录状态和内容持久化的管理统一放在TransactionalMessageManagementService中:
4jpublic class TransactionalMessageManagementService {private final TransactionalMessageDao messageDao;private final TransactionalMessageContentDao contentDao;private final RabbitTemplate rabbitTemplate;private static final LocalDateTime END = LocalDateTime.of(2999, 1, 1, 0, 0, 0);private static final long DEFAULT_INIT_BACKOFF = 10L;private static final int DEFAULT_BACKOFF_FACTOR = 2;private static final int DEFAULT_MAX_RETRY_TIMES = 5;private static final int LIMIT = 100;public void saveTransactionalMessageRecord(TransactionalMessage record, String content) {record.setMessageStatus(TxMessageStatus.PENDING.getStatus());record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF,DEFAULT_BACKOFF_FACTOR, 0));record.setCurrentRetryTimes(0);record.setInitBackoff(DEFAULT_INIT_BACKOFF);record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);messageDao.insertSelective(record);TransactionalMessageContent messageContent = new TransactionalMessageContent();messageContent.setContent(content);messageContent.setMessageId(record.getId());contentDao.insert(messageContent);}public void sendMessageSync(TransactionalMessage record, String content) {try {rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content);if (log.isDebugEnabled()) {log.debug("发送消息成功,目标队列:{},消息内容:{}", record.getQueueName(), content);}// 标记成功markSuccess(record);} catch (Exception e) {// 标记失败markFail(record, e);}}private void markSuccess(TransactionalMessage record) {// 标记下一次执行时间为最大值record.setNextScheduleTime(END);record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus());record.setEditTime(LocalDateTime.now());messageDao.updateStatusSelective(record);}private void markFail(TransactionalMessage record, Exception e) {log.error("发送消息失败,目标队列:{}", record.getQueueName(), e);record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);// 计算下一次的执行时间LocalDateTime nextScheduleTime = calculateNextScheduleTime(record.getNextScheduleTime(),record.getInitBackoff(),record.getBackoffFactor(),record.getCurrentRetryTimes());record.setNextScheduleTime(nextScheduleTime);record.setMessageStatus(TxMessageStatus.FAIL.getStatus());record.setEditTime(LocalDateTime.now());messageDao.updateStatusSelective(record);}/*** 计算下一次执行时间** @param base 基础时间* @param initBackoff 退避基准值* @param backoffFactor 退避指数* @param round 轮数* @return LocalDateTime*/private LocalDateTime calculateNextScheduleTime(LocalDateTime base,long initBackoff,long backoffFactor,long round) {double delta = initBackoff * Math.pow(backoffFactor, round);return base.plusSeconds((long) delta);}/*** 推送补偿 - 里面的参数应该根据实际场景定制*/public void processPendingCompensationRecords() {// 时间的右值为当前时间减去退避初始值,这里预防把刚保存的消息也推送了LocalDateTime max = LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF);// 时间的左值为右值减去1小时LocalDateTime min = max.plusHours(-1);Map<Long, TransactionalMessage> collect = messageDao.queryPendingCompensationRecords(min, max, LIMIT).stream().collect(Collectors.toMap(TransactionalMessage::getId, x -> x));if (!collect.isEmpty()) {StringJoiner joiner = new StringJoiner(",", "(", ")");collect.keySet().forEach(x -> joiner.add(x.toString()));contentDao.queryByMessageIds(joiner.toString()).forEach(item -> {TransactionalMessage message = collect.get(item.getMessageId());sendMessageSync(message, item.getContent());});}}}
4jpublic class ScheduleJobAutoConfiguration {private final TransactionalMessageManagementService managementService;/*** 这里用的是本地的Redis,实际上要做成配置*/private final RedissonClient redisson = Redisson.create();(fixedDelay = 10000)public void transactionalMessageCompensationTask() throws Exception {RLock lock = redisson.getLock("transactionalMessageCompensationTask");// 等待时间5秒,预期300秒执行完毕,这两个值需要按照实际场景定制boolean tryLock = lock.tryLock(5, 300, TimeUnit.SECONDS);if (tryLock) {try {long start = System.currentTimeMillis();log.info("开始执行事务消息推送补偿定时任务...");managementService.processPendingCompensationRecords();long end = System.currentTimeMillis();long delta = end - start;// 以防锁过早释放if (delta < 5000) {Thread.sleep(5000 - delta);}log.info("执行事务消息推送补偿定时任务完毕,耗时:{} ms...", end - start);} finally {lock.unlock();}}}}
基本代码编写完,整个项目的结构如下:
最后添加两个测试类:
public class MockBusinessRunner implements CommandLineRunner {private final MockBusinessService mockBusinessService;public void run(String... args) throws Exception {mockBusinessService.saveOrder();}}4jpublic class MockBusinessService {private final JdbcTemplate jdbcTemplate;private final TransactionalMessageService transactionalMessageService;private final ObjectMapper objectMapper;(rollbackFor = Exception.class)public void saveOrder() throws Exception {String orderId = UUID.randomUUID().toString();BigDecimal amount = BigDecimal.valueOf(100L);Map<String, Object> message = new HashMap<>();message.put("orderId", orderId);message.put("amount", amount);jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (?,?)", p -> {p.setString(1, orderId);p.setBigDecimal(2, amount);});String content = objectMapper.writeValueAsString(message);transactionalMessageService.sendTransactionalMessage(DefaultDestination.builder().exchangeName("tm.test.exchange").queueName("tm.test.queue").routingKey("tm.test.key").exchangeType(ExchangeType.DIRECT).build(),DefaultTxMessage.builder().businessKey(orderId).businessModule("SAVE_ORDER").content(content).build());log.info("保存订单:{}成功...", orderId);}}
看完本文有收获?请转发分享给更多人
关注「ImportNew」,提升Java技能
好文章,我在看❤️
Go to "Discover" > "Top Stories" > "Wow"