查看原文
其他

分布式锁&kafka事务提交等编码技巧

ImportNew 2022-11-14

(给ImportNew加星标,提高Java技能)

一、前言

在开发过程中,遇到了一些比较实用的编码技巧,故记录以加深印象。因为每个技巧的篇幅较短,故不做拆分,只合在一篇小文章中阐述。以下会涉及kafka的事务提交方法、redis分布式锁简化以及多key情况下应该怎么加锁、业务日志如何解耦。

二、kafka的事务提交方法

  • kafka我们常用于削峰填谷,以及系统间解耦等场景。这里我们会常遇到一种情况,就是上游系统在处理完成业务后,需要通知其它的系统,假如我们不考虑事务提交失败的情况下,就可以像下面这样写。但是假如出现网络异常或者数据库异常等情况,就会出现事务提交失败从而回滚,但是消息却已经发生给其它服务了,那么就会导致整条调用链的异常

@Autowired private KafkaTemplate kafkaTemplate;
@Transactional(rollbackFor = Exception.class) public void saveServiceOrder(ServiceOrder serviceOrder){ // do something
NoticeListDTO notice = NoticeListDTO.builder().build(); // 通知其它服务 kafkaTemplate.send(TopicNameConstants.SERVICE_ORDER_CHANGE_NOTIFY, JSONObject.toJSONString(notice)); }
  • 所以,我们可以进行进一步优化,就是将消息通知后置到事务提交后,这样系统的可靠度就会更高。我们增加一个kafka帮助类,如下:

@Component @Slf4j public class KafkaTemplateHelper { @Autowired private KafkaTemplate kafkaTemplate;
/** * 事务提交后发送kafka消息 * @param topic * @param data * @param <T> */ public <T> void send(String topic, Object data) { // 是否开启事务判断 if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { log.info("事务提交成功后发送消息:topic:{},data:{}",topic, JSONObject.toJSONString(data)); kafkaTemplate.send(topic,data); } }); } else { log.info("没有开启事务直接发送消息:topic:{},data:{}",topic, JSONObject.toJSONString(data)); kafkaTemplate.send(topic,data); } } }
 
  • kafka调用如下,它就会保证在事务结束后再通知其它系统,同理,很多需要后置的操作也可以这么玩。其实kafka还有一套可靠性应用方案可以分享,待有空再写

 
@Autowired private KafkaTemplateHelper kafkaTemplateHelper;
@Transactional(rollbackFor = Exception.class) public void saveServiceOrder(ServiceOrder serviceOrder){ // do something
NoticeListDTO notice = NoticeListDTO.builder().build(); // 通知a服务 kafkaTemplateHelper.send(TopicNameConstants.SERVICE_ORDER_CHANGE_NOTIFY, JSONObject.toJSONString(notice)); }

三、redis分布式锁代码简化

  • 我们使用redis分布式锁就离不开redission组件,举个栗子,我们一般在服务集群的情况下,为了保证并发不出现问题,会如下加锁,用一段字符串加上入参中的唯一编号(如用户id、订单编号等等)来保证接口幂等性(PS:redissonDistributedLocker只是redission的简单封装)。这样写很好,没有问题,但是我们不禁会想,好像每个创建、更新等业务操作都得给接口加这些重复代码,那么有没有更加优雅的方式呢,没错,我们要追求的就是极致的优雅

@ApiOperation("服务单更新") @Transactional(rollbackFor = Exception.class) public ApiResult serviceOrderUpdate(@RequestBody @Validated ServiceOrder req){ log.info("服务单更新:time={},params={}", System.currentTimeMillis(), JSONObject.toJSONString(req)); String lockKey = "mh:scs:serviceOrderUpdate:"+req.getServiceOrderId(); boolean lock = redissonDistributedLocker.tryLock(lockKey,0L,10L); AssertUtil.businessInvalid(!lock,"操作过于频繁,请稍后再试"); try { // do something return ApiResult.success(); }finally { redissonDistributedLocker.unlock(lockKey); } }
  • 使用Aop为接口加锁,添加一个注解anno,并写个实现

 
/** * 为方法加锁,处理完成再释放 */ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface AopLock { /** * SpEL表达式,用于计算lockKey. */ String value(); /** * 单位秒 */ int waitTime() default 0; /** * 单位秒 */ int leaseTime() default 6; int errorCode() default 2733; String errorMsg() default "操作过于频繁,请稍后再试"; } /** * 为方法加锁,处理完成再释放 * */ @Slf4j @Aspect @Order(3) @ConditionalOnBean(RedissonClient.class) public class AopLockAspect { @Autowired private RedissonClient redissonClient; @Value("${spring.application.name}") private String lockKeyPrefix; @Around("@annotation(common.aop.annos.AopLock)") public Object lock(ProceedingJoinPoint joinPoint) throws Throwable { Object[] args = joinPoint.getArgs(); MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); EvaluationContext context = initEvaluationContext(joinPoint); AopLock aopLock = method.getAnnotation(AopLock.class); String spEl = aopLock.value(); String expressionValue = lockKeyPrefix + ":" + PARSER.parseExpression(spEl).getValue(context); RLock lock = redissonClient.getLock(expressionValue); try { boolean getterLock = lock.tryLock(aopLock.waitTime(), aopLock.leaseTime(), TimeUnit.SECONDS); if (!getterLock) { throw new ServiceException(aopLock.errorCode(), aopLock.errorMsg()); } return joinPoint.proceed(args); } finally { try { lock.unlock(); } catch (Exception e) { log.warn("unlock error:" + e.getMessage() + "," + e.getClass().getName()); } } } }
  • 那么我们的加锁就可以简单的加个@AopLock注解就可以了,是不是很棒呢

@ApiOperation("服务单更新") @AopLock(value="'mh:scs:serviceOrderUpdate:' + #req.serviceOrderId",leaseTime = 60*30) @Transactional(rollbackFor = Exception.class) public ApiResult serviceOrderUpdate(@RequestBody @Validated ServiceOrder req){ log.info("服务单更新:time={},params={}", System.currentTimeMillis(), JSONObject.toJSONString(req)); // do something return ApiResult.success(); }

四、redission在多key情况下应该怎么加锁

  • 上面的例子很好地将简单的分布式锁代码简化,但是我们会有一些场景是无法这样加锁的,比如一些批处理的场景,用户A批量操作了单据a、b、c,同一时间,用户B批量操作了单据b、c、d,这时bc单据就会有并发问题,在这种场景下,我们是不能简单地根据某个单据的订单编号进行加锁的,要思考换一种方式,如下:

  • 订单实体类

@Data public class UpdateServiceOrdersReq implements Serializable { private static final long serialVersionUID = 1L; @Valid private List<ServiceOrder> serviceOrderList; }
  • 接口实现,对每个订单的id都加锁,假如有其中一个订单的锁获取失败的话则返回重试信息,在更新操作结束后释放所有的锁

@ApiOperation("批量更新服务单信息")@PostMapping("/xxxx/updateServiceOrders")public ResponseBean updateServiceOrders(@RequestBody @Validated UpdateServiceOrdersReq req) { List<String> redisKeys = new ArrayList<>(); List<ServiceOrder> list = new ArrayList<>(); for (ServiceOrder serviceOrder : list) { redisKeys.add("mh:scs:updateServiceOrders:" + serviceOrder.getServiceOrderId()); } try { for (String redisKey : redisKeys) { boolean lock = redissonDistributedLocker.tryLock(redisKey, 5L, 30L); if(!lock){ AssertUtil.businessInvalid("批量更新服务单获取锁失败,请稍后尝试!"); } } ResponseBean responseBean = ResponseBean.success(); // do something return responseBean; } catch (Exception ex){ throw ex; } finally { redisKeys.forEach(redisKey->{ try { redissonDistributedLocker.unlock(redisKey); } catch (Exception e) { log.error("updateServiceOrders:释放redis锁失败:{}", redisKey, e); } }); }}

五、业务日志如何解耦

  • 在写业务系统的过程中,我们难免要进行一些业务日志操作记录,这里就会涉及业务日志字符串的数据组装,比如产品要求记录车辆出发时间、更新日期时间巴拉巴拉之类的,但同时会存在一个问题,因为业务日志记录是非主业务流程操作(类似消息通知之类的),故不可因为复杂的日志数据拼接去影响接口的响应速度,从而影响用户体验;这里就要思考如何解耦的问题。我思考了两种场景下的处理方案,可以分享出来给大家分享

  • 情况一,假如只是简单的进行文字记录,我们可以使用线程池的方式去对日志记录进行解耦

    • 使用线程池创建其它线程进行日志操作,这样就不会影响到主线程了


      /** * @author ppz * @date 2022年04月12日 17:00 * @Description 服务单日志操作工具类 */public class ServiceOrderLogUtils { private static ScsServiceOrderLogService logService = ApplicationContextUtils.getBean(ScsServiceOrderLogService.class); private static int corePoolSize = Runtime.getRuntime().availableProcessors(); private static ExecutorService executor = new ThreadPoolExecutor( corePoolSize, corePoolSize*2, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(Integer.MAX_VALUE), new ThreadPoolExecutor.CallerRunsPolicy());
      private ServiceOrderLogUtils() { throw new IllegalStateException("Utility class"); }
      /** * 日志公共线程池中执行线程 * @param runnable 可运行对象 */ public static void execute(Runnable runnable) { executor.execute(runnable); }
      /** * 保存订单操作日志 * @param serviceOrderId 订单编号 * @param operType 日志操作类型 * @param operContent 操作内容 * @return: void */ public static void saveLog(Long serviceOrderId, OperTypeEnum operType, String operContent){ saveLog(createLog(serviceOrderId, operType, operContent)); }
      /** * 保存订单操作日志 * @param serviceOrderId 订单编号 * @param operType 日志操作类型 * @param operContent 操作内容 * @param operUserId 操作人登录名 * @param operUserName 操作人名称 * @return: void */ public static void saveLog(Long serviceOrderId, OperTypeEnum operType, String operContent, Integer operUserId, String operUserName){ saveLog(createLog(serviceOrderId, operType, operContent, operUserId, operUserName)); }
      public static ScsServiceOrderLog createLog(Long serviceOrderId, OperTypeEnum operType, String operContent) { AuthUser userInfo = WebUtils.getCurrentUser(); return createLog(serviceOrderId, operType, operContent, StringUtil.toInt(userInfo.getLoginName()), userInfo.getName()); }
      /** * 封装订单日志实体 * @param serviceOrderId * @param operType * @param operContent * @param operUserId * @param operUserName * @return: ScsServiceOrderLog */ public static ScsServiceOrderLog createLog(Long serviceOrderId, OperTypeEnum operType, String operContent, Integer operUserId, String operUserName){ ScsServiceOrderLog log = new ScsServiceOrderLog(); log.setServiceOrderId(serviceOrderId); log.setOperContent(operContent); log.setOperType(operType.getCode()); log.setOperatorId(operUserId); log.setOperatorName(operUserName); return log; }
      /** * 保存订单操作日志 * @param log 日志对象 * @return: void */ public static void saveLog(ScsServiceOrderLog log){ List<ScsServiceOrderLog> list = Lists.newArrayList(); list.add(log); saveLog(list); }
      /** * 批量保存订单操作日志 * @param list * @return: void */ public static void saveLog(List<ScsServiceOrderLog> list){ if(CollectionUtils.isEmpty(list)) { return; } Date now = new Date(); for(ScsServiceOrderLog log : list) { if(log.getOperatorTime() == null) { log.setOperatorTime(now); } if(StrUtil.length(log.getOperContent()) > 512) { log.setOperContent(StrUtil.subWithLength(log.getOperContent(), 0, 512)); } } if(!list.isEmpty()) { execute(new SaveLogThread(list)); } }
      /** * 订单日志保存线程 * @author: xiecy * @date: 2019年4月29日 下午12:03:35 */ static class SaveLogThread implements Runnable { private List<ScsServiceOrderLog> list = null;
      public SaveLogThread(List<ScsServiceOrderLog> list) { super(); this.list = list; }
      @Override public void run() { if(list != null && !list.isEmpty()) { logService.batchInsert(list); } } }
      /** * 同步批量保存日志 * @param list * @return: void */ public static void saveLogSync(List<ScsServiceOrderLog> list){ if(list.isEmpty()) { return; } Date now = new Date(); AuthUser userInfo = WebUtils.getCurrentUser(); for(ScsServiceOrderLog log : list) { if(log.getOperatorTime() == null) { log.setOperatorTime(now); } if(log.getOperatorId() == null && userInfo!=null) { log.setOperatorId(StringUtil.toInt(userInfo.getLoginName())); log.setOperatorName(userInfo.getName()); } if(StrUtil.length(log.getOperContent()) > 512) { log.setOperContent(StrUtil.subWithLength(log.getOperContent(), 0, 512)); } } if(list != null && !list.isEmpty()) { logService.batchInsert(list); } }
      }
    • 业务代码中进行使用


      @Transactional(rollbackFor = Exception.class)public boolean updateShippingDemandStatus(UpdateShippingDemandStatusReq req) { // todo something ServiceOrderLogUtils.saveLog(serviceOrderId, OperTypeEnum.CANCEL_SHIPPING_DEMAND,"用户取消运输需求");}
  • 情况二:假如日志记录需要对数据进行复杂组件的话,可以把使用到的数据组装到一个实体,然后通过发送给kafka或者redis进行解耦,在另外的线程中进行数据组装,具体就不展示了


转自:撸猫的代码

链接:https://juejin.cn/post/7112237932815056932




- EOF -

推荐阅读  点击标题可跳转

掘地三尺搞定 Redis 与 MySQL 数据一致性问题

搞透 Kafka 的存储架构,看这篇就够了



看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

点赞和在看就是最大的支持❤️


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存