查看原文
其他

详解Hive 排序和开窗函数





- Hive中的四种排序 -


排序操作是一个比较常见的操作,尤其是在数据分析的时候,我们往往需要对数据进行排序,hive 中和排序相关的有四个关键字,今天我们就看一下,它们都是什么作用。

数据准备

下面我们有一份温度数据,tab 分割:

2008 32.0
2008 21.0
2008 31.5
2008 17.0
2013 34.0
2015 32.0
2015 33.0
2015 15.9
2015 31.0
2015 19.9
2015 27.0
2016 23.0
2016 39.9
2016 32.0

建表加载数据:

create table ods_temperature(
`year` int,
temper float
)
row format delimited fields terminated by '\t';
load data local inpath '/Users/workspace/hive/temperature.data' overwrite into table ods_temperature;

1. order by(全局排序)

order by会对输入做全局排序,因此只有一个Reducer(多个Reducer无法保证全局有序),然而只有一个reducer,会导致当输入规模较大时,消耗较长的计算时间:

  • 降序:desc

  • 升序:asc 不需要指定,默认是升序

需要注意的是它受hive.mapred.mode的影响,在严格模式下,必须使用limit 对排序的数据量进行限制,因为数据量很大只有一个reducer的话,会出现OOM 或者运行时间超长的情况,所以严格模式下,不适用limit 则会报错,更多请参考Hive的严格模式和本地模式

Error: Error while compiling statement: FAILED: SemanticException 1:39 Order by-s without limit are disabled for safety reasons. If you know what you are doing, please set hive.strict.checks.orderby.no.limit to false and make sure that hive.mapred.mode is not set to 'strict' to proceed. Note that you may get errors or incorrect results if you make a mistake while using some of the unsafe features.. Error encountered near token 'year' (state=42000,code=40000)

接下来我们看一下order by的排序结果select * from ods_temperature order by year;

2. sort by(分区内排序)

不是全局排序,其在数据进入reducer前完成排序,也就是说它会在数据进入reduce之前为每个reducer都产生一个排序后的文件。因此,如果用sort by进行排序,并且设置mapreduce.job.reduces>1,则sort by只保证每个reducer的输出有序,不保证全局有序。

它不受Hive.mapred.mode属性的影响,sort by的数据只能保证在同一个reduce中的数据可以按指定字段排序。使用sort by你可以指定执行的reduce个数(通过set mapred.reduce.tasks=n来指定),对输出的数据再执行归并排序,即可得到全部结果。

set mapred.reduce.tasks=3;
select * from ods_temperature sort by year;

发现上面的输出好像看不出来啥,只能看到不是有序的,哈哈,那我们换一种方法,将数据输出到文件,因为我们设置了reduce数是3,那应该会有三个文件输出

可以看出这下就清楚多了,我们看到一个分区内的年份并不同意,那个年份的数据都有

sort by 和order by 的执行效率

首先我们看一个现象,一般情况下我们认为sort by 应该是比 order by 快的,因为 order by 只能使用一个reducer,进行全部排序,但是当数据量比较小的时候就不一定了,因为reducer 的启动耗时可能远远数据处理的时间长,就像下面的例子order by 是比sort by快的

sort by 中的limt

可以在sort by 用limit子句减少数据量,使用limit n 后,传输到reduce端的数据记录数就减少到 n * (map个数),也就是说我们在sort by 中使用limit 限制的实际上是每个reducer 中的数量,然后再根据sort by的排序字段进行order by,最后返回n 条数据给客户端,也就是说你在sort by 用limit子句,最后还是会使用order by 进行最后的排序

order by 中使用limit 是对排序好的结果文件去limit 然后交给reducer,可以看到sort by 中limit 子句会减少参与排序的数据量,而order by 中的不行,只会限制返回客户端数据量的多少。

从上面的执行效率,我们看到sort by limit 几乎是 order by limit 的两倍了 ,大概才出来应该是多了某个环节

接下来我们分别看一下order by limit 和 sort by limit 的执行计划

explain select * from ods_temperature order by year limit 2;

explain select * from ods_temperature sort by year limit 2;

从上面截图我圈出来的地方可以看到

  1. sort by limit 比 order by limit 多出了一个stage(order limit)

  2. sort by limit 实际上执行了两次limit ,减少了参与排序的数据量

3. distribute by(数据分发)

distribute by是控制在map端如何拆分数据给reduce端的。类似于MapReduce中分区partationer对数据进行分区

hive会根据distribute by后面列,将数据分发给对应的reducer,默认是采用hash算法+取余数的方式。

sort by为每个reduce产生一个排序文件,在有些情况下,你需要控制某写特定的行应该到哪个reducer,这通常是为了进行后续的聚集操作。distribute by刚好可以做这件事。因此,distribute by经常和sort by配合使用。

例如上面的sort by 的例子中,我们发现不同年份的数据并不在一个文件中,也就说不在同一个reducer 中,接下来我们看一下如何将相同的年份输出在一起,然后按照温度升序排序

首先我们尝试一下没有distribute by 的SQL的实现

insert overwrite local directory '/Users/workspace/hive/sort' row format delimited fields terminated by '\t' select * from ods_temperature sort by temper ;

发现结果并没有把相同年份的数据分配在一起,接下来我们使用一下distribute by

insert overwrite local directory '/Users/workspace/hive/sort' row format delimited fields terminated by '\t'
select * from ods_temperature distribute by year sort by temper ;

这下我们看到相同年份的都放在了一下,可以看出2013 和 2016 放在了一起,但是没有一定顺序,这个时候我们可以对 distribute by 字段再进行一下排序

insert overwrite local directory '/Users/workspace/hive/sort' row format delimited fields terminated by '\t'
select * from ods_temperature distribute by year sort by year,temper ;

4. cluster by

cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。

当分区字段和排序字段相同cluster by可以简化distribute by+sort by 的SQL 写法,也就是说当distribute by和sort by 字段相同时,可以使用cluster by 代替distribute by和sort by

insert overwrite local directory '/Users/workspace/hive/sort' row format delimited fields terminated by '\t'
select * from ods_temperature distribute by year sort by year ;

insert overwrite local directory '/Users/workspace/hive/sort' row format delimited fields terminated by '\t'
select * from ods_temperature cluster by year;

我们看到上面两种SQL写法的输出结果是一样的,这也就证明了我们的说法,当distribute by和sort by 字段相同时,可以使用cluster by 代替distribute by和sort by

当你尝试给cluster by 指定排序方向的时候,你就会得到如下错误。

Error: Error while compiling statement: FAILED: ParseException line 2:46 extraneous input 'desc' expecting EOF near '<EOF>' (state=42000,code=40000)




-  总结 -


  • order by 是全局排序,可能性能会比较差;

  • sort by分区内有序,往往配合distribute by来确定该分区都有那些数据;

  • distribute by 确定了数据分发的规则,满足相同条件的数据被分发到一个reducer;

  • cluster by 当distribute by和sort by 字段相同时,可以使用cluster by 代替distribute by和sort by,但是cluster by默认是升序,不能指定排序方向;

  • sort by limit 相当于每个reduce 的数据limit 之后,进行order by 然后再limit ;




-  开窗函数 -


基本语法
Function (arg1,..., argn) OVER ([PARTITION BY <...>] [ORDER BY <....>]
[<window_expression>])

Function (arg1,..., argn) 可以是下面的函数:

  • Aggregate Functions: 聚合函数,比如:sum(...)、 max(...)、min(...)、avg(...)等.

  • Sort Functions: 数据排序函数, 比如 :rank(...)、row_number(...)等.

  • Analytics Functions: 统计和比较函数, 比如:lead(...)、lag(...)、 first_value(...)等.

数据准备

样例数据

[职工姓名|部门编号|职工ID|工资|岗位类型|入职时间]

Michael|1000|100|5000|full|2014-01-29
Will|1000|101|4000|full|2013-10-02
Wendy|1000|101|4000|part|2014-10-02
Steven|1000|102|6400|part|2012-11-03
Lucy|1000|103|5500|full|2010-01-03
Lily|1001|104|5000|part|2014-11-29
Jess|1001|105|6000|part|2014-12-02
Mike|1001|106|6400|part|2013-11-03
Wei|1002|107|7000|part|2010-04-03
Yun|1002|108|5500|full|2014-01-29
Richard|1002|109|8000|full|2013-09-01

建表语句

CREATE TABLE IF NOT EXISTS employee (
name string,
dept_num int,
employee_id int,
salary int,
type string,
start_date date
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED as TEXTFILE;

加载数据

load data local inpath '/opt/datas/data/employee_contract.txt' into table employee;

窗口聚合函数

  1. 查询姓名、部门编号、工资以及部门人数

select
name,
dept_num as deptno ,
salary,
count(*) over (partition by dept_num) as cnt
from employee ;

结果输出

name deptno salary cnt
Lucy 1000 5500 5
Steven 1000 6400 5
Wendy 1000 4000 5
Will 1000 4000 5
Michael 1000 5000 5
Mike 1001 6400 3
Jess 1001 6000 3
Lily 1001 5000 3
Richard 1002 8000 3
Yun 1002 5500 3
Wei 1002 7000 3

窗口排序函数

简介:

窗口排序函数提供了数据的排序信息,比如行号和排名。在一个分组的内部将行号或者排名作为数据的一部分进行返回,最常用的排序函数主要包括:

  • row_number

根据具体的分组和排序,为每行数据生成一个起始值等于1的唯一序列数

  • rank

对组中的数据进行排名,如果名次相同,则排名也相同,但是下一个名次的排名序号会出现不连续。比如查找具体条件的topN行

  • dense_rank

dense_rank函数的功能与rank函数类似,dense_rank函数在生成序号时是连续的,而rank函数生成的序号有可能不连续。当出现名次相同时,则排名序号也相同。而下一个排名的序号与上一个排名序号是连续的。

  • percent_rank

排名计算公式为:(current rank - 1)/(total number of rows - 1)

  • ntile

将一个有序的数据集划分为多个桶(bucket),并为每行分配一个适当的桶数。它可用于将数据划分为相等的小切片,为每一行分配该小切片的数字序号。

使用案例

  1. 查询姓名、部门编号、工资、排名编号(按工资的多少排名)

select
name ,
dept_num as dept_no ,
salary,
row_number() over (order by salary desc ) rnum
from employee;

结果输出

name dept_no salary rnum
Richard 1002 8000 1
Wei 1002 7000 2
Mike 1001 6400 3
Steven 1000 6400 4
Jess 1001 6000 5
Yun 1002 5500 6
Lucy 1000 5500 7
Lily 1001 5000 8
Michael 1000 5000 9
Wendy 1000 4000 10
Will 1000 4000 11
  1. 查询每个部门工资最高的两个人的信息(姓名、部门、薪水)

select
name,
dept_num,
salary
from
(
select name ,
dept_num ,
salary,
row_number() over (partition by dept_num order by salary desc ) rnum
from employee) t1
where rnum <= 2;

结果输出

name dept_num salary
Steven 1000 6400
Lucy 1000 5500
Mike 1001 6400
Jess 1001 6000
Richard 1002 8000
Wei 1002 7000
  1. 查询每个部门的员工工资排名信息

select
name ,
dept_num as dept_no ,
salary,row_number() over (partition by dept_num order by salary desc ) rnum
from employee;

结果输出

name dept_no salary rnum
Steven 1000 6400 1
Lucy 1000 5500 2
Michael 1000 5000 3
Wendy 1000 4000 4
Will 1000 4000 5
Mike 1001 6400 1
Jess 1001 6000 2
Lily 1001 5000 3
Richard 1002 8000 1
Wei 1002 7000 2
Yun 1002 5500 3
  1. 使用rank函数进行排名

select
name,
dept_num,
salary,
rank() over (order by salary desc) rank
from employee;

结果输出

name dept_num salary rank
Richard 1002 8000 1
Wei 1002 7000 2
Mike 1001 6400 3
Steven 1000 6400 3
Jess 1001 6000 5
Yun 1002 5500 6
Lucy 1000 5500 6
Lily 1001 5000 8
Michael 1000 5000 8
Wendy 1000 4000 10
Will 1000 4000 10
  1. 使用dense_rank进行排名

select
name,
dept_num,
salary,
dense_rank() over (order by salary desc) rank
from employee;

结果输出

name dept_num salary rank
Richard 1002 8000 1
Wei 1002 7000 2
Mike 1001 6400 3
Steven 1000 6400 3
Jess 1001 6000 4
Yun 1002 5500 5
Lucy 1000 5500 5
Lily 1001 5000 6
Michael 1000 5000 6
Wendy 1000 4000 7
Will 1000 4000 7

6.使用percent_rank()进行排名

select
name,
dept_num,
salary,
percent_rank() over (order by salary desc) rank
from employee;

结果输出

name dept_num salary rank
Richard 1002 8000 0.0
Wei 1002 7000 0.1
Mike 1001 6400 0.2
Steven 1000 6400 0.2
Jess 1001 6000 0.4
Yun 1002 5500 0.5
Lucy 1000 5500 0.5
Lily 1001 5000 0.7
Michael 1000 5000 0.7
Wendy 1000 4000 0.9
Will 1000 4000 0.9
  1. 使用ntile进行数据分片排名

SELECT
name,
dept_num as deptno,
salary,
ntile(4) OVER(ORDER BY salary desc) as ntile
FROM employee;

结果输出

name deptno salary ntile
Richard 1002 8000 1
Wei 1002 7000 1
Mike 1001 6400 1
Steven 1000 6400 2
Jess 1001 6000 2
Yun 1002 5500 2
Lucy 1000 5500 3
Lily 1001 5000 3
Michael 1000 5000 3
Wendy 1000 4000 4
Will 1000 4000 4

尖叫提示:从 Hive v2.1.0开始, 支持在OVER语句里使用聚集函数,比如:

SELECT
dept_num,
row_number() OVER (PARTITION BY dept_num ORDER BY sum(salary)) as rk
FROM employee
GROUP BY dept_num;

结果输出

dept_num rk
1000 1
1001 1
1002 1

窗口分析函数

常用的分析函数主要包括:

  • cume_dist

如果按升序排列,则统计:小于等于当前值的行数/总行数(number of rows ≤ current row)/(total number of rows)。如果是降序排列,则统计:大于等于当前值的行数/总行数。比如,统计小于等于当前工资的人数占总人数的比例 ,用于累计统计.

  • lead(value_expr[,offset[,default]])

用于统计窗口内往下第n行值。第一个参数为列名,第二个参数为往下第n行(可选,默认为1),第三个参数为默认值(当往下第n行为NULL时候,取默认值,如不指定,则为NULL.

  • lag(value_expr[,offset[,default]])

与lead相反,用于统计窗口内往上第n行值。第一个参数为列名,第二个参数为往上第n行(可选,默认为1),第三个参数为默认值(当往上第n行为NULL时候,取默认值,如不指定,则为NULL.

  • first_value

取分组内排序后,截止到当前行,第一个值

  • last_value

取分组内排序后,截止到当前行,最后一个值




-  使用案例 -


  1. 统计小于等于当前工资的人数占总人数的比例

SELECT
name,
dept_num as deptno,
salary,
cume_dist() OVER (ORDER BY salary) as cume
FROM employee;

结果输出

name deptno salary cume
Wendy 1000 4000 0.18181818181818182
Will 1000 4000 0.18181818181818182
Lily 1001 5000 0.36363636363636365
Michael 1000 5000 0.36363636363636365
Yun 1002 5500 0.5454545454545454
Lucy 1000 5500 0.5454545454545454
Jess 1001 6000 0.6363636363636364
Mike 1001 6400 0.8181818181818182
Steven 1000 6400 0.8181818181818182
Wei 1002 7000 0.9090909090909091
Richard 1002 8000 1.0

2.统计大于等于当前工资的人数占总人数的比例

SELECT
name,
dept_num as deptno,
salary,
cume_dist() OVER (ORDER BY salary desc) as cume
FROM employee;

结果输出

name deptno salary cume
Richard 1002 8000 0.09090909090909091
Wei 1002 7000 0.18181818181818182
Mike 1001 6400 0.36363636363636365
Steven 1000 6400 0.36363636363636365
Jess 1001 6000 0.45454545454545453
Yun 1002 5500 0.6363636363636364
Lucy 1000 5500 0.6363636363636364
Lily 1001 5000 0.8181818181818182
Michael 1000 5000 0.8181818181818182
Wendy 1000 4000 1.0
Will 1000 4000 1.0

3.按照部门统计小于等于当前工资的人数占部门总人数的比例

SELECT
name,
dept_num as deptno,
salary,
cume_dist() OVER (PARTITION BY dept_num ORDER BY salary) as cume
FROM employee;

结果输出

name deptno salary cume
Wendy 1000 4000 0.4
Will 1000 4000 0.4
Michael 1000 5000 0.6
Lucy 1000 5500 0.8
Steven 1000 6400 1.0
Lily 1001 5000 0.3333333333333333
Jess 1001 6000 0.6666666666666666
Mike 1001 6400 1.0
Yun 1002 5500 0.3333333333333333
Wei 1002 7000 0.6666666666666666
Richard 1002 8000 1.0

4.按部门分组,统计每个部门员工的工资以及大于等于该员工工资的下一个员工的工资

SELECT
name,
dept_num as deptno,
salary,
lead(salary,1) OVER (PARTITION BY dept_num ORDER BY salary) as lead
FROM employee;

结果输出

name deptno salary lead
Wendy 1000 4000 4000
Will 1000 4000 5000
Michael 1000 5000 5500
Lucy 1000 5500 6400
Steven 1000 6400 NULL
Lily 1001 5000 6000
Jess 1001 6000 6400
Mike 1001 6400 NULL
Yun 1002 5500 7000
Wei 1002 7000 8000
Richard 1002 8000 NULL

5.按部门分组,统计每个部门员工的工资以及小于等于该员工工资的上一个员工的工资

SELECT
name,
dept_num as deptno,
salary,
lag(salary,1) OVER (PARTITION BY dept_num ORDER BY salary) as lead
FROM employee;

结果输出

name deptno salary lead
Wendy 1000 4000 NULL
Will 1000 4000 4000
Michael 1000 5000 4000
Lucy 1000 5500 5000
Steven 1000 6400 5500
Lily 1001 5000 NULL
Jess 1001 6000 5000
Mike 1001 6400 6000
Yun 1002 5500 NULL
Wei 1002 7000 5500
Richard 1002 8000 7000

6.按部门分组,统计每个部门员工工资以及该部门最低的员工工资

SELECT
name,
dept_num as deptno,
salary,
first_value(salary) OVER (PARTITION BY dept_num ORDER BY salary) as fval
FROM employee;

结果输出

name deptno salary fval
Wendy 1000 4000 4000
Will 1000 4000 4000
Michael 1000 5000 4000
Lucy 1000 5500 4000
Steven 1000 6400 4000
Lily 1001 5000 5000
Jess 1001 6000 5000
Mike 1001 6400 5000
Yun 1002 5500 5500
Wei 1002 7000 5500
Richard 1002 8000 5500

7.按部门分组,统计每个部门员工工资以及该部门最高的员工工资

SELECT
name,
dept_num as deptno,
salary,
last_value(salary) OVER (PARTITION BY dept_num ORDER BY salary RANGE
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as lval
FROM employee;

结果输出

name deptno salary lval
Wendy 1000 4000 6400
Will 1000 4000 6400
Michael 1000 5000 6400
Lucy 1000 5500 6400
Steven 1000 6400 6400
Lily 1001 5000 6400
Jess 1001 6000 6400
Mike 1001 6400 6400
Yun 1002 5500 8000
Wei 1002 7000 8000
Richard 1002 8000 8000

注意: last_value默认的窗口是RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,表示当前行永远是最后一个值,需改成RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING。

  • RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

为默认值,即当指定了ORDER BY从句,而省略了window从句 ,表示从开始到当前行。

  • RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING

表示从当前行到最后一行

  • RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING

表示所有行

  • n PRECEDING m FOLLOWING

表示窗口的范围是:[(当前行的行数)- n, (当前行的行数)+m] row 。









大数据工程师,欢迎大家关注呀!



猜你喜欢:
Spark性能调优指北:性能优化和故障处理
一张图解释清楚大数据技术架构
Hive文件存储格式和Hive数据压缩总结
字节跳动,5面,终于拿下!

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存