查看原文
其他

一文带你深入吃透Spark的窗口函数

涤生-健哥 涤生大数据
2024-12-05

在Spark 中使用各种函数,例如 month (从日期返回月份)、 round (四舍五入值)和 floor (给出下限值)给定的输入)等,这将在每条记录上执行,并为每条记录返回一个值。然后还有各种聚合函数,这些函数将对一组数据执行并为每个组返回一个值,例如 sum 、 avg 、 min 、 max 和 count 。但是,如果我们想对一组数据执行操作并且希望每条记录都有一个值/结果怎么办?在这种情况下我们可以使用窗口函数。他们可以定义每条数据的排名、累积分布、移动平均值,或识别当前记录之前或之后的记录。

Spark中窗口函数汇总如下:

  • 聚合型: min 、 max 、 avg 、 count 和 sum 。

  • 排名型: rank 、 dense_rank 、 percent_rank 、 row_num 和 ntile

  • 分析型: cume_dist 、 lag 和 lead

  • 自定义边界:rangeBetween 和 rowsBetween

前置数据准备:

前置代码准备:

import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.expressions.Window
object WindowFunctionDemo { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("spark window function demo") .master("local") .getOrCreate()
// 用于隐式转换,如Seq调用toDF,一些如max、min函数等。 import spark.implicits._ import org.apache.spark.sql.functions._
val empsalary = Seq( Salary("sales", 1, 5000), Salary("personnel", 2, 3900), Salary("sales", 3, 4800), Salary("sales", 4, 4800), Salary("personnel", 5, 3500), Salary("develop", 7, 4200), Salary("develop", 8, 6000), Salary("develop", 9, 4500), Salary("develop", 10, 5200), Salary("develop", 11, 5200) ).toDF()
// 处理的业务逻辑 ………. }}
1.窗口聚合函数

现在在部门内(列:depname )可以应用各种聚合函数。因此,比如尝试找出每个部门的最高和最低工资。在这里,我们仅选择了所需的列( depName 、 max_salary 和 min_salary )并删除了重复的记录。

// 处理的业务逻辑// partitionBy用来根据指定的字段做分组val byDepName = Window.partitionBy("depName")
// over 用来制定需要使用到的窗口是哪一个val agg_sal = empsalary.withColumn("max_salary", max("salary").over(byDepName)) .withColumn("min_salary", min("salary").over(byDepName))
agg_sal.select("depname", "max_salary", "min_salary") .dropDuplicates() .show()

输出:

从上述结果可以看出,我们根据部门名称对数据进行了分区:

当执行聚合函数时,它将应用于每个分区内并返回聚合值(在示例中为 min 和 max )。

注意:可用的聚合函数有 max、min、sum、avg 和 count。

2.窗口排序功能

现在假设我们要根据部门内的工资对员工进行排名。工资最高的员工排名第一,工资最低的员工排名最后。在这里,我们将根据部门(列:depname )对数据进行分区,并且在部门内,我们将根据工资对数据进行降序排序。

val winSpec = Window.partitionBy("depName").orderBy($"salary".desc)

对于每个部门,记录按照工资降序排列。

2.1 排名函数:rank 

此函数将返回分区内每个记录的排名,并跳过任何重复排名之后的后续排名:

// 处理的业务逻辑val winSpec = Window.partitionBy("depName").orderBy($"salary".desc)val rank_df = empsalary.withColumn("rank", rank().over(winSpec))rank_df.show()

输出:

从上可以看到一些排名重复,一些排名缺失。例如,在 develop 部门,有 2 名具有 rank = 2 的员工,并且没有具有 rank = 3 的员工,因为rank函数将为相同的值保持相同的排名并跳过接下来的排名相应。

2.2 密集函数:dense_rank 

此函数将返回分区内每个记录的排名,但不会跳过任何排名。

// 处理的业务逻辑val winSpec = Window.partitionBy("depName").orderBy($"salary".desc)val dense_rank_df = empsalary.withColumn("dense_rank", dense_rank().over(winSpec))dense_rank_df.show()

输出:

从上可以看到一些排名是重复的,但排名并没有像使用 rank 函数时那样丢失。例如,在 develop 部门,有 2 名员工,等级 = 2。 dense_rank 函数将为相同的值保留相同的等级,但不会跳过下一个等级。

2.3 行号函数:row_number

该函数将分配窗口内的行号。如果 2 行的排序列值相同,则不确定将哪个行号分配给具有相同值的每行。

// 处理的业务逻辑val winSpec = Window.partitionBy("depName").orderBy($"salary".desc)val row_num_df = empsalary.withColumn("row_number", row_number().over(winSpec))row_num_df.show()

输出:

从上可以看到没有重复的排名,也没有跳过的排名, 这也是row_number 函数最大的特点。

2.4 百分比排名函数:row_number

此函数将返回分区内的相对百分位的排名。

// 处理的业务逻辑val winSpec = Window.partitionBy("depName").orderBy($"salary".desc)val percent_rank_df = empsalary.withColumn("percent_rank", percent_rank().over(winSpec))percent_rank_df.show()

输出:

从上可以看出,该函数和rank类似,会有重复,会有跳过,只是该函数是以百分比的形式展现排名。每行按照如下公式进行计算:(rank - 1) / (rows - 1),对于重复的数据直接沿用上一行的排名,比如上面的第二个0.25就是因为salary和上一行的一样,就沿用了上一行的0.25。

2.5 细化函数:ntile

该函数可以根据窗口规范或分区将窗口进一步细分为 n 组。例如,如果需要将部门进一步分为三组,可以将 ntile 指定为 3。

// 处理的业务逻辑val winSpec = Window.partitionBy("depName").orderBy($"salary".desc)val ntile_df = empsalary.withColumn("ntile", ntile(3).over(winSpec))ntile_df.show()

输出:

从上可以看出,ntile将窗口类的数据尽可能均匀的分为n组,如果组内的数据大于n,则前面的排名会依次重复,如果组内的数据小于等于n,则等效于row_number。

3.窗口分析函数
3.1 积累分布函数:cume_dist  

该函数给出窗口/分区值的累积分布。定义窗口规范并应用cume_dist 函数来获取累积分布。因此该函数实质就是把处理过的记录不断的累加,即小于等于当前row_number/分组内总行数(一般是按照某个字段排好序的)对于某个字段的相同的值,顺延到下一行

// 处理的业务逻辑val winSpec = Window.partitionBy("depName").orderBy("salary")val cume_dist_df = empsalary.withColumn("cume_dist",cume_dist().over(winSpec))cume_dist_df.show()

输出:

对于develop这个分组内有5条数据,对于第1条数据就是1/5=0.2,对于第二条数据就是2/5=0.4,对于第三条数据因为salary和下一行的salary的一样,所以该行用第4条数据计算,即:4/5=0.8,而对于第四条数据显然就是4/5=0.8,依次类推。

3.2 滞后函数:lag

此函数将返回分组中偏移行之前的值。lag 函数采用 3 个参数 ( lag(col, count = 1, default = None) ),其中:

  • col :定义需要应用函数的列。

  • count :我们需要回顾多少行。

  • default :定义默认值,一般不写。

// 处理的业务逻辑val winSpec = Window.partitionBy("depName").orderBy("salary")val lag_df = empsalary.withColumn("lag", lag("salary", 2).over(winSpec))lag_df.show()

输出:

例如,查找当前行之前两行的工资,对于 depname = develop ,薪水 = 4500 。该行之前 2 行不存在这样的行,所以它会变成空(null)。

而对于 deptname = develop ,薪水 = 6000 (以蓝色突出显示)。如果提前 2 行,将获得 5200 作为工资(以绿色突出显示)。

 

3.3 前置函数:lead

该函数将返回 DataFrame 中偏移行之后的值。Lead 函数需要 3 个参数 (lead(col, count = 1, default = None) ),其中参数和lag一样。

  • col :定义需要应用函数的列。

  • count :当前行之后查找多少行。

  • default :定义默认值。

// 处理的业务逻辑val winSpec = Window.partitionBy("depName").orderBy("salary")val lead_df = empsalary.withColumn("lead", lead("salary", 2).over(winSpec))lead_df.show()

输出:

例如查找当前行之后两行的工资,对于 depname = develop ,工资 = 4500 (以蓝色突出显示)。向后移动 2 行,将获得 5200 作为工资(以绿色突出显示)。

对于 depname =personnel ,薪水 = 3500 。该分区中不存在位于该行后 2 行的行。所以会得到null。

4.自定义窗口范围

默认情况下,窗口的边界有窗口函数决定,比如说lag/lead,那么窗口的大小就是我们指定的数值,再比如row_number,窗口的大小就是由读取分组内的数据逐渐变大的,但是我们如果想固定窗口或者说控制窗口的大小该怎么办呢?以下函数可用于定义每个分区内的窗口。

4.1 区间范围:rangeBetween

使用rangeBetween 函数,可以显式定义边界。例如,将当前工资的起始定义为 100,结束定义为 300 个单位,看看这意味着什么。Start as 100 表示窗口将从 100 个单位开始,到当前值的 300 个值结束(包括开始值和结束值)

// 处理的业务逻辑val winSpec = Window.partitionBy("depName") .orderBy("salary") .rangeBetween(100L, 300L)val range_between_df = empsalary.withColumn("max_salary", max("salary").over(winSpec))range_between_df.show()

输出:

对于 depname=develop ,salary = 4200 ,窗口的开始将是(当前值 + 开始),即 4200 + 100 = 4300。窗口的结束将是(当前值+ 结束)即 4200 + 300 = 4500。

由于只有一个薪资值在 4300 到 4500 之间(含),即develop 部门的 4500 ,因此得到 4500 作为 4200的max_salary。

同样,对于 depname=develop ,salary = 4500 ,窗口将为 ( start : 4500 + 100 = 4600, end : 4500 + 300 = 4800 )。但是 develop 部门没有 4600 到 4800 范围内的薪资值,因此最大值将为空(检查上面的输出)。

这里可以使用一些特殊的边界值:

  • Window.currentRow :指定行中的当前值。

  • Window.unboundedPreceding :这可用于为窗口提供无限制的开始。

  • Window.unboundedFollowing :这可用于使窗口具有无界末端。

例如,需要从员工工资中找到比当前工资大于300的最高工资。因此,可以将起始值定义为 300L,并将结束值定义为 Window.unboundedFollowing:

// 处理的业务逻辑val winSpec = Window.partitionBy("depName") .orderBy("salary") .rangeBetween(300L, Window.unboundedFollowing)val range_unbounded_df = empsalary.withColumn("max_salary", max("salary").over(winSpec))range_unbounded_df.show()

输出:

因此,对于 depname =personnel 、salary = 3500. ,窗口将为 ( start : 3500 + 300 = 3800, end : unbounded )。所以这个范围内的最大值是 3900(检查上面的输出)。

同样,对于 depname =sales ,salary = 4800 ,窗口将为 ( start : 4800 + 300 = 5100, end : unbounded )。由于 sales department 没有大于 5100 的值,因此 null 结果。

备注:默认情况下。我们可以发现,对于rangeBetween,窗口的大小是整个分区的大小,是在整个分区内查找合适的区间范围。

4.2 行间:rowBetween

通过 rangeBetween,使用排序列的值定义了窗口的开始和结束。但是,还可以使用相对行位置来定义窗口的开始和结束。例如,想创建一个窗口,其中窗口的开始位置是当前行之前的一行,结束位置是当前行之后的一行。

// 处理的业务逻辑val winSpec = Window.partitionBy("depName") .orderBy("salary") .rowsBetween(-1, 1)val rows_between_df = empsalary.withColumn("max_salary", max("salary").over(winSpec))rows_between_df.show()

输出:

对于 depname =develop ,salary = 4500 ,将定义一个窗口,其中当前行之前和之后各一行(以绿色突出显示)。因此窗口内的工资是( 4200, 4500, 5200 ),最大值是 5200 (检查上面的输出)。

备注:如果没有前/后一行,那就和当前行比较:

类似地,对于 depname =sales ,salary = 5000 ,将定义一个窗口,其中当前行的前后各有一个。由于该行之后没有行,因此窗口将只有 2 行(以绿色突出显示),其工资为 ( 4800, 5000 ),最大值为 5000 (检查上面的输出)。

还可以像之前使用 rangeBetween 一样使用特殊边界:

  • Window.unboundedPreceding ;

  • Window.unboundedFollowing ;

  • Window.currentRow 。

注意:对于 rowsBetween 来说,排序不是必需的。

涤生大数据往期精彩推荐


1.企业数仓DQC数据质量管理实践篇

2.企业数据治理实战总结--数仓面试必备

3.OneData理论案例实战—企业级数仓业务过程

4.中大厂数仓模型规范与度量指标有哪些?

5.手把手教你搭建用户画像系统(入门篇上)

6.手把手教你搭建用户画像系统(入门篇下)

7.SQL优化之诊断篇:快速定位生产性能问题实践

8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!

9.新能源趋势下一个简单的数仓项目,助力理解数仓模型

10.基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践

11.开发实战角度:distinct实现原理及具体优化总结

12.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)

13.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(二)

14.5分钟了解实时车联网,车联网(IoV)OLAP 解决方案是怎样的?

15.企业级Apache Kafka集群策略:Kakfa最佳实践总结

16.玩转Spark小文件合并与文件读写提交机制

17.一文详解Spark内存模型原理,面试轻松搞定


继续滑动看下一个
涤生大数据
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存