查看原文
其他

嗨,我想和你分享一下千万级支付对账系统是怎么设计的。

why技术 2022-09-23

The following article is from 小黑十一点半 Author 楼下小黑哥

写在前面

你好呀,我是歪歪。

今天给大家分享一篇关于对账系统设计的文章,出自在支付行业摸爬滚打好几年的小黑哥之手。

如果你之前做过支付相关的业务一定多多少少都接触过“支付数据对账”的问题。

这个问题其实有非常多的解法,而不同的解法可以应对不同的交易规模。随着交易规模的增长,对账系统的设计也一定是在不断的进行迭代。

而本文就是探讨对于每日千万级数据量的时候,对应的对账系统大致应该是长什么样的。

如果你像我一样,之前做过支付、对账相关系统,在观看文章的过程中,一定能或多或少的看到自己做过的影子在里面。

因为不论是什么级别的数据对账,它们的“对账流程“这个底层逻辑都是不会发生太大变化的。

所以我希望你在学习的解题之道的时候,也能一眼看到它的底层逻辑,以不变应万变。

如果你觉得文章不错的话,也可以关注小黑哥的公众号(文首直达),看他分享更多关于支付领域的知识。

以下是正文。

支付对账

很早之前写过一篇支付对账相关文章,那时候负责对账系统日均处理数量比较小。

那最近正在接手现在的对账系统,由于当前系统日均数量都在千万级,所以对账系统架构与之前架构完全不一样。

那就这个话题,聊聊如何实现千万级数据支付的对账系统。

什么是对账?

我们先来回顾下什么是对账?

也许你对对账这个概念比较模糊,但是这个场景你肯定碰到过。

上班路上买了一个煎饼,加了根里脊与王中王,然后你扫了老板的二维码付了 10 元钱。

你跟老板说你已经付了 10 元钱,老板看了下手机,果然有一条 10 元支付记录,老板确认收到钱,然后就把煎饼给你。

这个过程,你说你付了 10 元,老板确认收到 10 元,这就是一只简单的对账过程。

回到我们支付场景,用户下单使用微信支付 100 元购买了一个狗头抱枕,这时我们这边会生成一条支付记录,同时微信支付也会生成记录。

那微信第二天就会生成一个账单记录,我们拿到之后把我们的交易记录跟微信记录逐笔核对,这就是支付对账。

为什么需要对账?

正常支付的情况下,两边(我们/第三方支付渠道)都会产生交易数据,那支付对账过程,两边数据一致,大家各自安好,不用处理什么。

但是有些异常情况下,可能由于网络问题,导致两边数据存在不一致的情况,支付对账就可以主动发现这些交易。

对账可以说支付系统最后一道安全防线,通过对账我们可及时的对之前支付进行纠错,避免订单差错越积越多,最后财务盘点变成一笔糊涂账。

支付对账系统

开篇先来一张图,先来看下整体对账系统架构图:

整个对账系统分为两个模块

  • 对账模块
  • 差错模块

对账模块,主要负责对账文件拉取,数据解析,数据核对,数据汇总等任务。

差错模块是对账模块后置任务,对账模块核对过程产生无法核对成功的数据,这类数据件将会推送给差错系统。

差错系统将会根据规则生成差错订单,运营人员可以在后台处理这列数据。

先简单的看一下之前的对账系统设计,了解下对账的整体流程。

对账系统设计

对账系统如果从流程上来讲,其实非常简单,引用一下之前文章流程图:

https://studyidea.cn/articles/2019/08/26/1566790305561.html

整体流程可以简单分为三个模块:

  • 本端数据处理
  • 对端数据处理
  • 本端数据与渠道端数据核对

本端数据指的是我们应用产生的支付记录,这里根据账期(交易日期)与渠道编号获取单一渠道的所有支付记录。

对端数据指的是第三方支付渠道支付记录,一般通过下载对账文件获取。

由于每个渠道下载方式,文件格式都不太一样,对端数据处理的时候需要将其转化统一数据格式,标准化在入库存储。

网上找了一份通用账单,可以参考:

对端数据转化存储之后,对账流程中,对端数据也需要跟本端数据一样,获取当前账期下所有记录。

两端数据都获取成功之后,接下来就是本地数据逐笔核对。

核对流程可以参考之前写的流程:

上面流程其实也比较简单,翻译一下:

查找本端数据/对端数据,然后转化存储到 Map 中,其中 key 为订单号,value 为本端/对端订单对象。

然后遍历本端数据 Map 对象,依次去对端数据 Map查找。如果能查找到,说明对端数据也有这笔。这笔核对成功,对端数据集中移除这笔。

如果查找不到,说明这笔数据为差异数据,它在本端存在,对端不存在 ,将其移动到差异数据集中。

最后,本端数据遍历结束,如果对端数据集还存在数据,那就证明这些数据也是差异数据,他们在对端存在,本端不存在 ,将其也移动到差异数据集中。

PS:上述流程存在瑕疵,只能核对出两边订单互有缺失的流程 ,但是实际情况下还会碰到两边订单都存在,但是订单金额却不一样的差异数据。这种情况有可能发现在系统 Bug,比如渠道端上送金额单位为元,但是实际上送金额单位为分,这就导致对账两端金额不一致。

之前对账系统日均处理的支付数据峰值在几十万,所以上面的流程没什么问题,还可以抗住,正常处理。

但是目前的支付数据日均在千万级,如果还是用这种方式对账,当前系统可能会直接崩了。。。

千万数据级带来的挑战

第一个,查询效率。

本端/对端数据通过分页查询业务数据表获取当天所有的数据。随着每天支付数据累计,业务表中数据将会越来越多,这就会导致数据查询变慢。

实际过程我们发现,单个渠道数据量很大的情况下,对账完成需要一两个小时。

虽然说对账是一个离线流程,允许对账完成时间可以久一点。但是对账流程是后续其他任务的前置流程,整个对账流程还是需要在中午之前完成,这样运营同学就可以在下午处理。

第二个问题,OOM。

上面流程中,我们把把全部数据加载到内存中,小数据量下没什么问题。

但是在千万级数据情况下,数据都加载到内存中,并且还是加载了两份数据(本端、对端),这就很容易吃完整个应用内存,从而导致 Full GC,甚至还有可能导致应用 OOM。

而且这还会导致级联反应,一个任务引发 Full GC,导致其他渠道对账收到影响。

第三个问题,性能问题。

原先系统设计上,单一渠道对账处理流程只能在单个机器上处理,无法并行处理。

这就导致系统设计伸缩性很差,服务器资源也被大量的浪费。

千万数据级对账解决办法

上面系统代码,实际上还是存在优化空间,可以利用单机多线程并行处理,但是大数据下其实带来效果不是很好。

那主要原因是因为发生在系统架构上,当前系统使用底层使用 MySQL 处理的。

传统的 MySQL 是 OLTP (on-line transaction processing),这个结构决定它适合用于高并发,小事务 业务数据处理。

但是对账业务特性动辄就是百万级,千万级数据,数据量处理非常大。但是对账数据处理大多是一次性,不会频繁更新。

上面业务特性决定了,MySQL 这种 OLTP 系统不太适合大数据级对账业务。

那专业的事应该交给专业的人去做,对账业务也一样,这种大数据级业务比较适合由 Hive、Spark SQL 等 OLAP去做。

千万数据级对账系统实现方案

首先我们先来看下对账整体时序图,先有个印象:

下面整篇文章将会围绕上面时序图开始讲解,由于文章篇幅过长,请各位扶好坐稳,准备发车。

数据平台

前面提到,千万级数据需要使用 Hive,Spark等相关大数据技术,这就离不开大数据平台的技术支持。

简单聊下我们这边大数据平台DP (Data Platform),它提供用户大数据离线任务开发所需要的环境、工具以及数据,具有入口统一性、一站式、简化hadoop本身的复杂性、数据安全等特点。

DP 平台提供功能如下:

  • 数据双向离线同步,MySQL 与 Hive 互相同步
  • 大数据离线计算,支持SQL(SparkSQL/HiveSQL/Presto)形式处理各类的数据清洗、转化、聚合操作,也支持使用MapReduce、Spark等形式,处理比较复杂的计算场景
  • 即时的SQL查询,允许用户即时的执行SQL、查看执行的日志和结果数以及进行结果数据的可视化分析
  • 数据报表

那本篇文章不会涉及具体的大数据技术相关的实现细节,相关原理(主要是咱也不会~),主要聊下对账系统如何联合 DP 平台实现完整数据对账方案。

对账系统概览

开头的时序图,我们可以看到整个对账过程设计好几个业务流程,那在这里对账系统内部将会维护一个流程状态机,当前一个流程处理结束之后,下一个流程才能被触发。

由于当前对账系统实现方案,涉及对账系统与 DP 平台,对账系统目前没办法调用 DP 平台触发任务,但是 DP 平台可以通过通过 HTTP 接口调用对账系统。

所以当前流程触发的方式使用的是定时任务的方案,每个流程有一个单独的定时任务。

对账系统内的定时任务触发的时候,将会判断当前流程是否已经到达执行条件,即判断一下当前任务的状态。

每个定时任务触发时间人为设置的时候,岔开一两分钟,防止同时运行。

DP 平台使用自带调度任务,对账系统无法控制 DP 任务的运行。

DP 平台定时任务可以通过运行 Scala 脚本代码,调用对账系统提供 HTTP  查询接口,通过这种方式判断当前流程是否已经到达执行条件。

下面详细解释一下每个流程。

初始化对账任务

对账系统依靠对账任务记录推动流转,目前每天凌晨将会初始化生成对账任务记录,后续任务流转就可以从这里开始。

对账系统维护一张对账核对规则表:

对账核对规则表关键字段含义如下:

  • channel_code  渠道编码,每个支付渠道将会分配一个唯一渠道编码,例如微信,支付宝
  • biz_type 业务类型,例如支付,退款,提现等
  • status 是否生效

每次对接新的支付渠道,对账配置规则需要新增核对规则。

初始化对账定时任务将会查找核对规则表中所有的生效的配置规则,依次生成当天的对账任务记录:

对账任务记录部分字段与核对规则表含义一样,不再赘述,其他字段含义如下:

  • bill_date 账期,一般 D 日对账任务核对 D-1 数据,所以账期为 D-1 日
  • batch_no 对账批次,生成规则如下:账期+渠道编码+ 001
  • phase,当前对账任务处于阶段,根据上面对账流程可以分为
    • 初始化
    • 数据收集
    • 存疑处理
    • 数据核对
    • 二次存疑处理
    • 数据汇总
    • 差错数据推送
  • error_reason 错误原因

初始化对账任务结束之后,对账任务流程推动到第二阶段,数据收集。

数据收集

数据收集阶段,收集两端待核对的数据,为后面的数据核对任务提供核对数据。

数据收集阶段分为两部分:

  • 本端数据收集,即自己方产生的支付数据
  • 对端数据收集,即三方渠道产生支付数据

本端数据收集

本端数据,是自己业务产生的支付数据,这些数据原本存在各个业务的数据库中。

对账系统获取这些支付数据,一般有两种方式:

  • 查询,对账系统主动拉取
  • 推送,对账系统监听获取数据

查询数据方式前面也聊到过,数据量小的情况下,没什么问题。一旦数据量变大,查询效率就会变低。

所以这里我们采用推送的方式,对账系统监听各个业务数据表 binlog,每当业务数据发生变动,对账系统就可以接受到 binlog 消息。

对账系统接受到 binlog 消息,将会判断当前消息是否需要过滤,是否已经支付成功等等,满足条件之后,binlog 消息将会插入本端数据表中,表结构如下:

本端记录表关键字段含义如下:

  • channel_code  渠道编码,每个支付渠道将会分配一个唯一渠道编码,例如微信,支付宝
  • biz_order_no 本端支付流水号
  • bill_date 账期
  • status 状态
  • is_check 对账状态,0-未核对,1-已核对
  • trade_amount 支付金额
  • channel_order_no 三方渠道支付单号
  • merchant_no 商户号
  • sub_merchant_no 子商户号

上面展示的支付记录表结构,根据业务类型不同,本端其实还有退款记录表,提现记录表等。

这里设计的时候,实际上也可以将所有业务数据放在一张表中,然后根据业务类型字段区分。

对端数据收集

对端数据,就是第三方支付渠道产生支付数据,一般 D 日产生交易之后,D+1 日第三方渠道将会生成一个对账文件。

对账系统需要从对端提供的对账文件获取对端数据。

渠道的对账文件,下载方式,文件类型存在很大的差异,每次接入新的支付渠道,这里需要经过新的开发。

对端数据这里维护了一张渠道下载配置表,对端数据收集的时候将会获取所有可用配置:

渠道下载配置表关键字段含义如下:

  • mch_id 三方渠道分配的商户号
  • type 下载类型
    • FTP
    • SFTP
    • HTTP
  • download_param 下载的配置参数,比如 ftp 的地址,登录密码,下载地址等。

对账文件下载成功之后,需要根据文件类型进行解析,最后转化自己的需要的对账数据入库。

对端数据表结构如下:

上面关键字段与本端记录表类似,额外新增字段:

  • channel_fee 渠道手续费,用于统计渠道收的手续费

同样渠道记录表根据根据业务类型也分为退款渠道记录表,提现渠道记录表等,同样也可以合并成一张表,根据业务类型区分。

对端数据收集阶段,由于拉取三方渠道的对账文件,那有时候渠道端存在异常,将会导致对账文件下载延迟,从而导致其他任务也出现的相应的延迟。

这一点是整个对账流程中,相对不可控的问题。我们需要在对账流程设计中考虑这一点。

对账文件下载解析成功入库之后,对账流程将会流转到下一个流程存疑数据处理。

存疑数据处理

讲解这个流程之前,先给大家解释一下什么是存疑数据?

正常支付过程中,会存在一个两边账期不一致的问题,比如说本端数据支付时间是 2021 年 12 月 28 日 23 点 59 分 59 秒,那么本端认为这笔支付交易账期是 2021 年 12 月 28 日。

然而这笔支付发送给三方渠道之后,三方渠道支付成功的时间已经是 2021 年 12 月 29 日 0 点 0 分 2 秒,三方渠道支付账期记为2021 年 12 月 29 日。

这种情况下我们这边记录账期是2021 年 12 月 28 日,但是第三方渠道这笔记录是 2021 年 12 月 29 日,所以 2021 年 12 月 28 日对账单上没有这笔支付记录,这就导致一笔差异数据(一端有/一端无)的情况。

上面这种情况就是典型因为日切问题导致差异。

但是我们知道 2021 年 12 月 29 日对账单上肯定会包含这笔,所以我们可以先把这笔差异数据挂起,当做存疑数据,等到2021 年 12 月 29 日账期对账的时候,对方账单包含这笔,当天就能核对成功,这就解决这笔差异数据。

所以说存疑数据,就跟其字面意思一样,当这笔数据当前处理不了的时候,那就现放着,不做定论,过一天我再尝试处理一下。

除了上面日切问题导致的差异数据以外,还有一些情况:

  • 网络问题,导致两边订单状态不一致。
  • 测试环境与生产环境共用一个三方渠道商户号,测试环境产生的交易出现在对账单里

存疑数据分为三种类型:

  • 本端有,渠道无,即本端存在订单信息,渠道账单记录没有订单信息,可能是日切导致的问题
  • 渠道有,本端无,即本端不存在订单信息,渠道端账单记录却有订单信息,可能是测试环境与生产环境共用渠道参数
  • 金额不平,即双方都存在订单信息,但是双方订单金额不一致

了解完存疑数据的定义,我们再来看下存疑数据处理的流程。

存疑数据将会由下面的流程中产生,这里先来看下存疑表结构:

关键字段如下:

  • batch_no 批次号
  • biz_id 业务单号
  • biz_amount 金额
  • status  0-未处理,1-已处理
  • biz_date 账期
  • biz_type 业务类型
  • channel_code 渠道类型
  • delayed_times 延迟天数
  • merchant_no 商户号
  • sub_merchant_no 子商户号
  • buffer_type 存疑类型,0-本端存疑,1-渠道存疑

存疑处理过程将会捞起所有存疑表中还未处理的存疑数据,根据存疑类型反向查找对账数据表。例如:

  • 渠道存疑(第一天对账,本端有,渠道无),查找对端数据
  • 本端存疑(第一天对账,本端无,渠道有),查找本端数据

查找对端/本端数据,都是根据支付流水号加业务类型查找定位。

如果在本端/对端数据中找到,这里还需要再对比一下金额

  • 如果金额不相等,代表单号相同,但是金额不等,将这笔移动到支付差异表
  • 如果金额相等,代表这两笔核平,存疑表将这笔数据更新为核对成功,本端/对端数据更新为对账成功

上面这一步比较重要,因为下面对账核对过程主要核对要素是支付流水号+支付金额,通过这种方式收集单片账是无法知道是因为单号不存在,还是因为金额不存在原因,具体流程可以看下下面核对流程。

如果在本端/对端数据还是找不到,那就根据渠道配置的存疑规则,如果当前已经存疑的天数大于配置渠道存疑天数,则将数据直接移动到差错表。

如果存疑天数小于当前渠道配置天数,那就不要管,继续保存在存疑表,等待下一天存疑数据处理。

一般来说,日切导致的数据,存疑一天,就可以解决。但是有些渠道可能是 T+1 在对账,这种情况需要配置的存疑天数就要长一点了。

本地存疑数据处理结束之后,下面就要开始 DP 数据处理。

数据导入DP

在 DP 核对之前,我们需要将对账系统收集的数据,从  MySQL 导入 DP Hive 表中。

DP 任务调度开始,DP 平台定时检测对账系统提供 HTTP 接口,判断本次存疑流程是否处理完成。

如果完成,自动触发将数据从 MYSQL 导入 DP Hive 表中。

数据导入之后,将会开始 DP 核对规程。这个过程就是整个对账流程最关键的部分,这个流程核对两端数据,检查两端是否存在差异数据。

DP 核对

数据导入结束,DP 平台开始核对数据,这个过程分为两个核对任务:

  • 成功数据核对
  • 存疑数据核对

成功数据核对

成功数据核对任务,核对的目的是为了核对出本端与对端支付单号与金额一致的数据。

这里的核对任务使用了 Hive SQL ,整个 SQL 如下所示:

---- A
CREATE TABLE IF NOT EXISTS dp.pay_check_success (
    `batch_no` bigint comment '批次号',
    `merchant_no` string comment '三方商户号',
    `sub_merchant_no` string comment '三方子商户号',
    `biz_id` string comment '对账业务关联字段',
    `biz_amount` bigint comment '金额',
    `biz_date` string comment '业务日期',
    `biz_type` int comment '业务类型',
    `status` int comment '状态标识',
    `remark` string comment '备注',
    `create_time` string comment '创建时间',
    `update_time` string comment '修改时间',
    `trade_date` int comment '订单交易日期',
    `channel_code` int comment '渠道类型'
);

----B
insert
    overwrite table dp.pay_check_success
select
    tb1.batch_no as batch_no,
    tb1.merchant_no as merchant_no,
    tb1.sub_merchant_no as sub_merchant_no,
    tb1.biz_id as biz_id,
    tb1.biz_amount as biz_amount,
    tb1.biz_date as biz_date,
    tb1.biz_type as biz_type,
    tb1.status as status,
    tb1.remark as remark,
    tb1.trade_date as trade_date,
    tb1.channel_code as channel_code
from
    (
        select
            tb2.batch_no as batch_no,
            tb1.merchant_no as merchant_no,
            tb1.sub_merchant_no as sub_merchant_no,
            tb1.biz_order_no as biz_id,
            tb1.trader_amount as biz_amount,
            '${DP_1_DAYS_AGO_Ymd}' as biz_date,
            '0' as status,
            '' as remark,
            '${DP_1_DAYS_AGO_Ymd}' as trade_date,
            tb1.channel_code as channel_code
        from
            dp.pay_check_record tb1
            inner join dp.pay_check_channel_record tb2 on tb1.biz_order_no = tb2.biz_order_no
            and tb1.trader_amount = tb2.trader_amount
            and tb1.channel_code = tb2.channel_code
        where
            tb1.is_check = 0
            and tb2.is_check = 0
            and tb1.bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and tb2.bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and tb1.is_filter = 0
    ) tb1

整个 SQL 分为两部分,第一部分将会在 DP 中创建一张 pay_check_success,记录核对成功的数据。

第二部分,将核对成功的数据插入上面创建的 pay_check_success 表中。

查找核对成功的数据 SQL 如下:

        select
            tb2.batch_no as batch_no,
            tb1.merchant_no as merchant_no,
            tb1.sub_merchant_no as sub_merchant_no,
            tb1.biz_order_no as biz_id,
            tb1.trader_amount as biz_amount,
            '${DP_1_DAYS_AGO_Ymd}' as biz_date,
            '0' as status,
            '' as remark,
            '${DP_1_DAYS_AGO_Ymd}' as trade_date,
            tb1.channel_code as channel_code
        from
            dp.pay_check_record tb1
            inner join dp.pay_check_channel_record tb2 on tb1.biz_order_no = tb2.biz_order_no
            and tb1.trader_amount = tb2.trader_amount
            and tb1.channel_code = tb2.channel_code
        where
            tb1.is_check = 0
            and tb2.is_check = 0
            and tb1.bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and tb2.bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and tb1.is_filter = 0

上述 SQL 存在一些 DP 平台系统变量。DP_1_DAYS_AGO_Ymd 代表当前日期的前一天

主要逻辑非常简单,利用 sql 内连接查询的功能,可以查找单号,金额,渠道编码一致的数据。

成功数据核对任务结束,将会把刚才在 DP 中创建的  pay_check_success 同步回对账系统的 MYSQL 数据库中。

存疑数据核对

存疑数据核对任务,核对的目的是为了核对出本端与对端支付单号或金额不一致的数据。

这些数据将会当做存疑数据,这些数据将会在第二阶段存疑数据处理。

这里的核对任务也是使用了 Hive SQL ,整个 SQL 跟上面比较类似,SQL 如下所示:

CREATE TABLE IF NOT EXISTS dp.check_dp_buffer_record (
    `biz_id` string comment '订单号',
    `order_type` string comment '订单类型 0本端订单 1渠道订单',
    `bill_date` int comment '对账日期',
    `biz_type` int comment '业务类型',
    `channel_code` int comment '渠道类型',
    `amount` string comment '金额',
    `merchant_no` string comment '商户号',
    `sub_merchant_no` string comment '三方子商户号',
    `trade_date` int comment '交易日期',
    `create_time` string comment '创建时间',
    `update_time` string comment '修改时间'
);

insert
    overwrite table dp.check_dp_buffer_record
select
    tb1.biz_id as biz_id,
    tb1.order_type as order_type,
    tb1.bill_date as bill_date,
    tb1.biz_type as biz_type,
    tb1.channel_code as channel_code,
    tb1.amount as amount,
    tb1.merchant_no as merchant_no,
    tb1.sub_merchant_no as sub_merchant_no,
    tb1.trade_date as trade_date,
    '${DP_0_DAYS_AGO_Y_m_d_HMS}',
    '${DP_0_DAYS_AGO_Y_m_d_HMS}'
FROM
    (
        select
            tb1.biz_order_no as biz_id,
            0 as order_type,
            tb1.bill_date as bill_date,
            10 as biz_type,
            tb1.channel_code as channel_code,
            tb1.trade_amount as amount,
            tb1.merchant_no as merchant_no,
            tb1.sub_merchant_no as sub_merchant_no,
            '${DP_1_DAYS_AGO_Ymd}' as trade_date
        FROM
            (
                select
                    biz_order_no,
                    bill_date,
                    channel_code,
                    trade_amount,
                    merchant_no,
                    sub_merchant_no
                from
                    ods.pay_check_record
                where
                    and bill_date = '${DP_1_DAYS_AGO_Ymd}'
                    and is_filter = 0
                    and is_check = 0
            ) tb1
            LEFT JOIN (
                select
                    biz_order_no,
                    trade_amount,
                    channel_code
                from
                    ods.pay_check_channel_record
                where
                    and bill_date = '${DP_1_DAYS_AGO_Ymd}'
                    and is_check = 0
            ) tb2 ON tb1.biz_order_no = tb2.biz_order_no
            and tb1.trade_amount = tb2.trade_amount
            and tb1.channel_code = tb2.channel_code
        where
            tb2.biz_order_no IS NULL
        union
        select
            tb1.biz_order_no as biz_id,
            1 as order_type,
            tb1.bill_date as bill_date,
            10 as biz_type,
            tb1.channel_code as channel_code,
            tb1.trade_amount as amount,
            tb1.merchant_no as merchant_no,
            tb1.sub_merchant_no as sub_merchant_no,
            '${DP_1_DAYS_AGO_Ymd}' as trade_date
        FROM
            (
                select
                    biz_order_no,
                    bill_date,
                    channel_code,
                    trade_amount,
                    merchant_no,
                    sub_merchant_no
                from
                    ods.pay_check_chnnel_bill
                where
                    and bill_date = '${DP_1_DAYS_AGO_Ymd}'
                    and is_check = 0
            ) tb1
            LEFT JOIN (
                select
                    biz_order_no,
                    channel_code,
                    trade_amount
                from
                    ods.pay_check_record
                where
                    and bill_date = '${DP_1_DAYS_AGO_Ymd}'
                    and is_filter = 0
                    and is_check = 0
            ) tb2 ON tb1.biz_order_no = tb2.biz_order_no
            and tb1.trade_amount = tb2.trade_amount
            and tb1.channel_code = tb2.channel_code
        where
            tb2.biz_order_no IS NULL
    ) tb1;

整个 SQL 分为两部分,第一部分将会在 DP 中创建一张 check_dp_buffer_record,记录核对差异的的数据。

第二部分,将核对差异的数据插入上面创建的 check_dp_buffer_record 表中。

查找差异数据较为麻烦,需要分成两部分收集::

  • 本端单边账,即本端存在数据,但是对端不存在数据
  • 渠道端单边账,即对端存在数据,本端不存在数据

两边数据查找到之后,使用  SQL union 功能,将两端数据联合。

我们先来看下本端单边张的逻辑的:

select
    tb1.biz_order_no as biz_id,
    0 as order_type,
    tb1.bill_date as bill_date,
    10 as biz_type,
    tb1.channel_code as channel_code,
    tb1.trade_amount as amount,
    tb1.merchant_no as merchant_no,
    tb1.sub_merchant_no as sub_merchant_no,
    '${DP_1_DAYS_AGO_Ymd}' as trade_date
FROM
    (
        select
            biz_order_no,
            bill_date,
            channel_code,
            trade_amount,
            merchant_no,
            sub_merchant_no
        from
            ods.pay_check_record
        where
            and bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and is_filter = 0
            and is_check = 0
    ) tb1
    LEFT JOIN (
        select
            biz_order_no,
            trade_amount,
            channel_code
        from
            ods.pay_check_channel_record
        where
            and bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and is_check = 0
    ) tb2 ON tb1.biz_order_no = tb2.biz_order_no
    and tb1.trade_amount = tb2.trade_amount
    and tb1.channel_code = tb2.channel_code
where
    tb2.biz_order_no IS NULL

SQL 看起来比较复杂,实际逻辑可以简化为下面 SQL :

select
    *
from
    innerTab t1
    LEFT JOIN channelTab t2 ON t1.biz_order_no = t2.biz_order_no
    and t1.trade_amount = t2.trade_amount
    and t1.channel_code = t2.channel_code
where
    t2.biz_order_no is null;

这里主要利用 SQL 左连接的功能,本端数据 left join 渠道数据,如果渠道单号不存在,则认为本端数据存在,渠道数据不存在,当然也有可能是两端数据都存在,但是金额不相等。

这种情况记为本端数据存疑,orderType 为 0。

渠道端单边账收集逻辑:

select
    tb1.biz_order_no as biz_id,
    1 as order_type,
    tb1.bill_date as bill_date,
    10 as biz_type,
    tb1.channel_code as channel_code,
    tb1.trade_amount as amount,
    tb1.merchant_no as merchant_no,
    tb1.sub_merchant_no as sub_merchant_no,
    '${DP_1_DAYS_AGO_Ymd}' as trade_date
FROM
    (
        select
            biz_order_no,
            bill_date,
            channel_code,
            trade_amount,
            merchant_no,
            sub_merchant_no
        from
            ods.pay_check_chnnel_bill
        where
            and bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and is_check = 0
    ) tb1
    LEFT JOIN (
        select
            biz_order_no,
            channel_code,
            trade_amount
        from
            ods.pay_check_record
        where
            and bill_date = '${DP_1_DAYS_AGO_Ymd}'
            and is_filter = 0
            and is_check = 0
    ) tb2 ON tb1.biz_order_no = tb2.biz_order_no
    and tb1.trade_amount = tb2.trade_amount
    and tb1.channel_code = tb2.channel_code
where
    tb2.biz_order_no IS NULL

逻辑与本端单边账收集类似,渠道数据 left join 本端数据,如果本端单号不存在,则为渠道数据存在,本端数据不存在。当然也有可能是两端数据都存在,但是金额不相等。

这里记为渠道存疑数据,orderType 为 1

成功数据核对以及存疑数据核对结束,DP 平台将会自动把数据从 Hive 表中导入到 MYSQL。

数据导出结束,DP 平台将会调用对账系统的相关接口,通知对账系统 DP 核对流程结束。

DP 核对流程是整个对账流程核心流程,目前千万级数据的情况下,大概能在一个小时之内搞定。

DP 核对流程结束之后,对账系统开始下个流程-二次存疑数据处理

二次存疑数据处理

前面流程我们讲到存疑处理,为什么这里还需要二次存疑数据处理呢?

这因为 DP 核对存疑数据收集的过程,我们使用业务单号与金额去互相匹配,那如果不存在,有可能是因为两端数据有一端不存在,还有可能是因为两端数据数据都存在,但是金额却不相等。

DP 核对过程是无法区分出这两种情况,所以增加一个二次存疑数据处理流程,单独区分出这两类数据。

回到二次存疑数据处理流程,当天产生的所有存疑数据都从 DP 中导入到 check_dp_buffer_record 表。

二次存疑数据处理流程将会查找 check_dp_buffer_record 表所有未核对的记录,然后依次遍历。

遍历过程中将会尝试在 check_dp_buffer_record 表中查找相反方向的存疑数据。

这个可能不好理解,举个例子:

假如有一笔订单,本端是 100 元,渠道端是 10 元。这种情况两笔记录都会出现在  check_dp_buffer_record 表。

遍历到本端这笔的时候,这笔类型是本端存疑,type为 0。使用者本端单号从check_dp_buffer_record 查找渠道端存疑(type 为 1)的数据。

上面的情况可以找到,证明这笔存疑数据其实是金额不相等,这里需要将数据移动到差错表。

那如果是正常一端缺失的数据,那自然去相反方向查找是找不到的,这种数据是正常存疑数据,移动内部存疑表。

对账系统二次存疑数据处理结束之后,开始下一个阶段数据汇总。

数据汇总

数据汇总阶段就是为了统计当天每个有多少成功功对账数据,多少存疑数据,统计结束通过看板给相关运营人员展示统计数据。

由于数据量大的问题,这里使用的是 DP 平台 Sprak 任务进行任务统计。

这里逻辑简单解释为,就是利用 Scala 脚本代码对数据进行相关求和,这里代码没有普遍性,就不展示具体的逻辑了。

差错数据推送

数据汇总结束之后,开始下一个阶段,差错数据推送给差错系统。

上面存疑数据处理的流程中转化的差错数据,当前存在对账系统内部差错数据表中。

目前我们差错数据是是另外一个差错系统单独处理,所以对账系统需要把差错数据表数据推送给差错系统。

这里的逻辑比较简单,查找所有待处理的差错数据,遍历发送 NSQ 消息给差错系统。

总结

千万级数据对账整个流程看起,其实相关操作流程都不是很难。

那我个人认为这里难点在于第一需要一套完整大数据平台体系,第二改变原有对账方式,思考如何将对账系统与大数据平台一起串起来。

希望这篇文章对正好碰到该类问题同仁起到相关帮助。

最后说一句(求关注)

好了,看到了这里了,转发、在看、点赞随便安排一个吧,要是你都安排上我也不介意。写文章很累的,需要一点正反馈。

给各位读者朋友们磕一个了:

推荐👍 :确实很优雅,所以我要扯下这个注解的神秘面纱。

推荐👍 :请问各位程序员,是我的思维方式有错误吗?

推荐👍 :知乎,火了。

推荐👍 :少部分程序员走的路。

推荐👍 :2021,我这一年。

··································

你好呀,我是歪歪。一个主要敲代码,经常怼文章,偶尔拍视频的成都人。

我没进过一线大厂,没创过业,也没写过书,更不是技术专家,所以也没有什么亮眼的title。

当年以超过二本线 13 分的“优异”成绩顺利进入某二本院校计算机专业,误打误撞,进入了程序员的行列,开始了运气爆棚的程序员之路。

说起程序员之路还是有点意思,可以看看。点击蓝字,查看我的程序员之路

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存