查看原文
其他

终于搞定了 PSI 稳定性计算(Python代码)

东哥起飞 Python数据科学
2024-08-24

大家好,我是东哥。

在风控中,风险意味着不确定性,不确定性越强意味着越不可控,做数据化风控也是同理,追求的就是让确定性越来越强,转换成统计概率论来说就是不断提高我们的胜算的概率。当然,没有任何人可以做到100%的确定,因为没有人是上帝视角,所以在风控决策过程中总会产生错杀或者误放。

这是对风控宏观层面的理解,将视角缩小至风控模型上,也是如此。风控模型人员在做模型时可能更关注效果AUC/KS等评估指标,效果胜过一切。但其实对于模型而言,稳定性的重要程度要胜过效果。因为一个模型的开发周期并不短,且上线后不会轻易的更换,也就是说我们做的不是一个高频的事情。如果模型不稳定,即便离线效果比较好但线上稳定性差,那么对于整个风控决策结果而言就是致命的,因为此时你无法保证有大概率的胜算了。

本文将介绍风控中稳定性指标PSI的概念和理解,以及A卡模型上线后如何对模型分及入模变量进行稳定性观测。

PSI的理解

作为一名风控人员,相信对IV指标并不陌生,它可以代表一个变量的信息价值,或者可以理解为与目标变量的相关程度,是一个变量好坏的效果指标。

如果你是一个老风控人员,就会发现PSI和IV指标的计算公式是非常相似的,因为二者的本质是相同的,都是计算两个分布之间的距离

PSI衡量稳定性,希望两个分布越接近越好,而IV衡量变量的区分能力,希望两个分布越远离越好。

所以,弄清楚这点,PSI和IV的计算公式就相当容易理解了。

这就是PSI,全称为(Population Stability Index)。那么PSI的计算逻辑是怎样的呢?

PSI的计算逻辑

我们说一个模型分数稳定,一般指的是随着时间的推移,模型分的分布是稳定的。由此可以推测模型分肯定有一个起始的分布的,然后随着时间推移各个时间周期的分布与起始分布的差距。

更准确的说,PSI的稳定性计算是需要一个参照的。需要两个分布,一个是实际分布(actual),另一个是预期分布(expected)。

PSI的计算逻辑步骤如下:

step1:将变量预期分布(excepted)进行分箱(binning)离散化,统计各个分箱里的样本占比。

注意:

  • a) 分箱可以是等频、等距或其他方式,分箱方式不同,将导致计算结果略微有差异;

  • b) 对于连续型变量(特征变量、模型分数等),分箱数需要设置合理,一般设为10或20;对于离散型变量,如果分箱太多可以提前考虑合并小分箱;分箱数太多,可能会导致每个分箱内的样本量太少而失去统计意义;分箱数太少,又会导致计算结果精度降低。

step2: 按相同分箱区间,对实际分布(actual)统计各分箱内的样本占比。

step3:计 算各分箱内的A - E和Ln(A / E),计算index = (实际占比 - 预期占比)* ln(实际占比 / 预期占比) 。

step4: 将各分箱的index进行求和,即得到最终的PSI。

引自:https://zhuanlan.zhihu.com/p/79682292

下面是一个计算过程的结果示例。

PSI的使用场景

PSI的实际分布和预期分布在不同的场景、不同类型下是不尽相同的。

如果按照使用场景划分,有以下三个阶段:

  • 在建模时:以训练样本作为预期分布,以测试集或者跨时间样本(OOT)作为实际分布
  • 灰度上线:以离线建模的样本作为预期分布,以灰度的陪跑样本作为实际分布
  • 正式上线:以上线后第一个月的样本作为预期分布,以除第一个月后的每月样本作为实际分布

如果按照类型划分,有模型和入模变量两种:

  • 模型分数:对模型输出的概率结果或者分数结果进行PSI稳定性的检验,使用场景可以为以上三种
  • 变量值:对入模变量进行PSI稳定性的检验,使用场景同样为以上三种

举个例子,比如模型已经正式上线,我们通过监控报表发现近期通过率在不断地升高。这种波动或者异常说明模型可能在衰减,结果在发生偏移。为了验证我们的猜想,就需要计算模型逐月的PSI稳定性指标。

一般情况下,PSI稳定性指标的参考值如下:

PSI的Python实操

下面我们用Python代码来实操一下PSI指标的计算,以及PSI指标逐月的计算。这是PSI最常用的两个应用,可以直接替换参数即可完成计算。

以下是PSI的计算函数,参数需要预期分布和实际分布的列表,此外可以设置分箱的数量(默认为10)和最小样本数量(默认为10)。

df = pd.read_csv('../0-数据/var_sample.csv')

def calculate_psi(base_list, test_list, bins=10, min_sample=10):
    # @东哥的风控小密圈
    try:
        base_df = pd.DataFrame(base_list, columns=['score'])
        test_df = pd.DataFrame(test_list, columns=['score']) 
        
        # 1.去除缺失值后,统计两个分布的样本量
        base_notnull_cnt = len(list(base_df['score'].dropna()))
        test_notnull_cnt = len(list(test_df['score'].dropna()))

        # 空分箱
        base_null_cnt = len(base_df) - base_notnull_cnt
        test_null_cnt = len(test_df) - test_notnull_cnt
        
        # 2.最小分箱数
        q_list = []
        if type(bins) == int:
            bin_num = min(bins, int(base_notnull_cnt / min_sample))
            q_list = [x / bin_num for x in range(1, bin_num)]
            break_list = []
            for q in q_list:
                bk = base_df['score'].quantile(q)
                break_list.append(bk)
            break_list = sorted(list(set(break_list))) # 去重复后排序
            score_bin_list = [-np.inf] + break_list + [np.inf]
        else:
            score_bin_list = bins
        
        ...
        
        # 5.汇总统计结果    
        stat_df = pd.DataFrame({"bucket": bucket_list, "base_cnt": base_cnt_list, "test_cnt": test_cnt_list})
        stat_df['base_dist'] = stat_df['base_cnt'] / len(base_df)
        stat_df['test_dist'] = stat_df['test_cnt'] / len(test_df)
        
        def sub_psi(row):
            # 6.计算PSI
            base_list = row['base_dist']
            test_dist = row['test_dist']
            # 处理某分箱内样本量为0的情况
            if base_list == 0 and test_dist == 0:
                return 0
            elif base_list == 0 and test_dist > 0:
                base_list = 1 / base_notnull_cnt   
            elif base_list > 0 and test_dist == 0:
                test_dist = 1 / test_notnull_cnt
                
            return (test_dist - base_list) * np.log(test_dist / base_list)
        
        stat_df['psi'] = stat_df.apply(lambda row: sub_psi(row), axis=1)
        stat_df = stat_df[['bucket''base_cnt''base_dist''test_cnt''test_dist''psi']]
        psi = stat_df['psi'].sum()
        
    except:
        print('error!!!')
        psi = np.nan 
        stat_df = None
    return psi, stat_df

举例,现在我们想对LoanAmount借款金额这个单变量,以5月为预期分布6月为实际分布进行稳定性计算。

var = 'LoanAmount'
base = df.loc[df['date']=='2023-05',var]
test = df.loc[df['date']=='2023-06',var]
calculate_psi(base_list=list(base),test_list=list(test))

对其他任何变量、或者模型分数的使用方法也是同理。

上面是一个单变量或者模型分的PSI计算。将以上计算函数calculate_psi封装造出以逐月计算PSI的函数,代码如下。

def psi_month_calc(train_df:pd.DataFrame, oot_df:pd.DataFrame, col_list:list, dt_name:str):
    """
    @ 东哥的风控小密圈
    描述:逐月计算oot数据集变量的psi
    输入参数:
    :param train_df: train期望DataFrame
    :param oot_df: oot实际DataFrame
    :param col_list: 变量列表
    :param dt_name: 月份变量的名称
    输出:
    :psi_month_table: 变量在oot上逐月的psi, dataframe
    :psi_month_detail_total: 变量在oot上逐月的psi分箱细节, dict
  """

    month_list = sorted(oot_df[dt_name].unique())
    psi_array = []
    psi_month_detail_total = {}
    for mt in month_list:
        sub_df = oot_df.loc[oot_df[dt_name]==mt]
        psi_month_detail_each = []
        col_psi_dict = {}
        ...
        # 所有变量psi分箱情况,便于后续查看
        psi_month_detail_each.append(stat_df)
        psi_month_detail_total[mt] = pd.concat(psi_month_detail_each)
        # oot每月所有变量的psi值
        psi_array.append(col_psi_dict)
    # oot上逐月变量psi汇总表
    psi_month_table = pd.DataFrame(psi_array).T
    psi_month_table.columns = month_list
    return psi_month_table, psi_month_detail_total

举例,下面我们以5月为参照,以5月以后所有月份为实际分布,计算所有变量的PSI稳定性指标。

base = df.loc[df['date']=='2023-05']
test = df.loc[~(df['date']=='2023-05')]
col_list = df.columns.difference(['target','date','uid']).tolist()
psi_month_table, psi_month_detail_total = psi_month_calc(base,test,col_list,'date')

然后得到了两个结果。

psi_month_table

一个是所有变量逐月的PSI最终结果。

psi_month_detail_total['2023-06']

另一个是所有变量在逐月的分箱情况,即得到了中间计算过程。

从0到1《100天风控专家》👇

--end--

100天风控专家历史文章
风控模型策略考点&大厂面经手册
迁徙率报表逻辑和开发(Python代码)
Vintage分析表计算过程详解

CART决策树暴力生成风控规则

基于交叉表制定风控规则全流程(Python)

风控规则的决策树可视化(升级版)
继续滑动看下一个
Python数据科学
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存