查看原文
其他

分布式事务从入门到放弃:数据一致性引擎概览

The following article is from Coder的技术之路 Author 成小灰

一、 项目背景简介

一般,广告检索系统都承载着公司很大比重的营收占比。

计费系统是广告系统的偏底层一环,承担着反作弊、计算费用、优惠扣减、费用实际扣除等职责。整个扣费流程涉及到了计费单、营销系统、支付账务系统、预算系统等的上下游数据一致性问题。

并且,由于存在CPT、CPA、CPC等不同类型的计费方式,而广告点击有流量不可回溯等特点(普通支付场景可以让用户重试),计费的数据一致性引擎的合理设计就变得尤为重要了。

二、一致性保障方案选择

支付业务一般的可以分为两种,一种是有支付牌照的公司,直接和账户和银行打交道;另一种是调用第三方支付服务,实现支付业务。

对于第一种场景,一般需要非常强硬的保障手段来实现分布式下数据的强一致性。比如TCC的类二阶段提交方式,我们知道,TCC之所有被大家熟知,也是因为蚂蚁不遗余力布道的结果。

对于第二种场景,相对来说,诉求可能就没有那么的变态了,有不少的解决方案可供选择,比如本地消息表、事务型消息等等。。。

当前项目的场景显然属于第二种。那么,应该怎么合理选择实现方式呢:

事务型消息

「实现方案轻量,改造成本小,适合为对实时性不是特别高的场景。」

咦~ 好像正合适,可惜,公司自研的消息中间件不支持!!!

当然,事务型消息的处理方式,也存在弊端,就是每个系统只能负责自己这一块,流程变得冗长,不利于问题排查。万一要改点东西,可能还得上下游一块来,业务耦合程度可能要高一些。

TCC模式

感觉如果阐述TCC的原理,可能需要单开一个系列来说了,这里简单说下.

因为之前的工作中用的就是TCC的分布式事务,说实话,系统实现真的是非常的重。

需要流程中的所有系统,都按照既定的规范来实现一套包含了try/commit/cancel三个处理逻辑的调用模板。需要各系统按规范创建主事务表和分支事务表,来记录事务状态和调用参数及路由。需要参与者创建事务幂等表,实现拒绝空回滚或拒绝后到达的资源扣减等的防悬挂逻辑。

光是让各系统配合实现几个接口,我觉得,如果没有非常大的资金风险压迫,没几个人会配合。

而且,TCC可能更适用于有用户直接参与的资源扣减场景,因为引擎的基本思路是失败时操作回滚,保证上下游一致。

但是,上面也说过了,广告点击流的特点是流量不可回放,这个点击,过去了也就过去了,用户不可能因为这次计费没成功,就帮我们再点一次。所以,我们的一致性引擎的恢复逻辑,不仅要支持回滚,还要支持重试。不可漏掉每一次点击计费。

saga模式

最终,我们参考saga模式,选择的是类saga的状态机引擎的补偿模式。

这种方式的优点是,对老系统改造成本友好,即使实现接口也比较方便,通过状态机编排执行节点链,并配置重试回滚方式、实时异步策略。

事务信息存储方式相对灵活,主要看自己公司的各种存储的可靠性和一致性的承诺。

详见下面实现。

三、数据一致性引擎效果一览

引擎架构图

图片

结构组成

  • 状态机 实现节点执行顺序编排及其他执行特性
  • 节点 业务需要实现的逻辑节点,比如计费的cpc扣费逻辑,需要有前置check、price调价、coupon优惠券、pmc扣费等执行节点
  • 补偿逻辑 属于节点的一部分实现,每个执行节点需要实现当前节点的补偿逻辑,以供执行异常时进行恢复操作
  • 钩子函数 在引擎执行前和执行后,允许业务系统执行自有的特殊操作
  • 定时任务 异常数据恢复的触发入口

其他特性

  • 补偿方式 可配置,有重试/回滚 两种补偿方式可选;重试补偿时,执行顺便和正常顺序一致,回滚补偿时,从最后一个执行节点往前回滚
  • 补偿触发时效 可配置,有实时/异步延时 两种触发策略可选,如果有资源悬挂的风险,建议选异步延时触发
  • 重试次数及时间衰减 可配置,按业务实际情况定制衰减序列

状态机配置实例

{
  "name""xxxx_xxxx_xxxx",
  "comment""cpc计费状态机",
  "firstNodeName""check",
  "nodes": {
    "check": {
      "nextNodeName""land",
      "preNodeName""",
      "skipRecover"true
    },
    "land": {
      "nextNodeName""antiFraud",
      "preNodeName""check",
      "skipRecover"false
    },
    "antiFraud": {
      "nextNodeName""realPrice",
      "preNodeName""land",
      "skipRecover"false
    },"...":{"..."}
  },
  #重试次数
  "retryCount""4"
  #重试时间衰减
  "timeDecaySeries": ["1","3","5","10"], 
  #补偿策略,重试/回滚
  "recoverType""Retry",
  #触发时效,实时触发/异步触发
  "compensateTimeliness""ASYNC"
}

引擎初始化

DTConfig.builder()
    .setAppName("billing")//配置appName
    .setLogStoreStrategy(StoreStrategyEnum.DEFAULT_STORE)//存储策略
    .setRedisConfig(redisConfigPath) //设置redis配置
    .setDBTableConfig(mysqlConfigPath) //mysql配置
    .setZKConfig(configPath) // wConfig 配置 ,切流灰度使用
    .setStateMachinePath(stateMachinePath)//状态机配置项地址
    .setNegligibleErrorCode(BillingDTConstants.serious_error_code_str) //当前系统关键异常code集合(不可忽略的致命异常,供恢复逻辑使用)
    .build();

引擎调用

//本次调用所使用的状态机名称
String stateMachineName="ecpm_state_machine"; //当前请求使用到的状态机名称(和状态机配置中的name一致)
//获取引擎实例
DTBizEngine dtBizEngine=new SagaDTBizEngine();
//组装入参执行调用
DTResponse response=  dtBizEngine.start(new DTEngineRequest(bizType,bizId,stateMachineName,originContext));
//打印结果
System.out.println(response.getData());

异步化(参考dubbo的异步化实现的)

主线程

try {
   //创建 DTFurure ,传入 (DTEngineRequest request , int timeout)
   DTFuture mFuture=new DTFuture(request, 1000);
   //将该future传递下去,也可以用其他方式传递,这里直接放到了request扩展字段做示例
   request.getExtendField().put("MY_KEY",mFuture);
   //异步线程调用 
   EcpmEventBus.getInstance().post(new EcpmBillingEvent(request));
   //有限时间超时等待,get到的结果是业务完成时设置进来的对象,业务系统可以按自己的场景转换
   Object future= mFuture.get(1000); //单位是毫秒
   //do something

  } catch (Exception e) {
 }

执行线程

//do something ...

DTFurure futrue=(DTFurure)request.getExtendField().get("MY_KEY");

DTFuture.received(futrue.getId(),response);

四、后记


本篇介绍了一个数据一致性项目的背景和方案选择过程,并给出了一个参考saga模式的状态机实现的一致性引擎。

- EOF -


推荐阅读  点击标题可跳转

1、一举拿下高可用与分布式协调系统设计!

2、深度剖析:Redis 分布式锁到底安全吗?看完这篇文章彻底懂了!

3、OceanBase 开源,11 张图带你了解分布式数据库的核心知识



看完本文有收获?请转发分享给更多人

推荐关注「数据分析与开发」,提升数据技能

点赞和在看就是最大的支持❤️

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存