查看原文
其他

Spark数据倾斜之骚操作解决方案

初学者 进击吧大数据 2022-07-01

简单说几句

数据倾斜可以说是大数据开发中见怪不怪的事情了,那么什么情况下会出现数据倾斜?以及出现的时候会有什么症状呢?请接下往下看

症状以及原因

比如一个spark任务中,绝大多数task任务运行速度很快,但是就是有那么几个task任务运行极其缓慢,慢慢的可能就接着报内存溢出的问题了,那么这个时候我们就可以认定是数据倾斜了.

为什么会这种情况呢?其实这个一般发生在shuffle类的算子中,在进行shuffle的时候,必须将各个节点的相同的key拉到某个节点上的一个task来进行处理,比如按照key进行聚合和join操作等,这个时候其中某一个key数量特别大,于是就发生了数据倾斜

分组聚合逻辑中,需要把相同key的数据发往下游同一个task,如果某个或某几个key的数量特别大,则会导致下游的某个或某几个task所要处理的数据量特别大,也就是要处理的任务负载特别大

join计算中,A表和B表中相同key的数据,需要发往下游同一个task,如果A表中或B表中,某个key或某几个key的数量特别大,则会导致下游的某个或某几个task所要处理的数据量特别大,也就是要处理的任务负载特别大

如何定位数据倾斜?

上面简单说明了一下数据倾斜的症状以及原理,那么如果定位到具体哪块的代码出现了数据倾斜呢?

  1. 首先我们知道了数据倾斜是发生在shuffle阶段,那么肯定就要先找shuffle类的算子,比如distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup等

  2. 我们知道一个application会拆分为多个job,一个job又会划分多个stage,stage的划分就是根据shuffle类的算子,也可以说是宽依赖来划分的

  3. 这个时候就可以在spark ui界面上查看stage,如下图:

可以看到94这一行和91这一行,执行时间明显比其他的执行时间要长太多了,我们就可以肯定一定是这里发生了数据倾斜,然后我们就找到了发生数据倾斜的stage了,然后根据stage划分原理,我们就可以推算出来发生倾斜的那个stage对应的代码中的哪一部分了。这个时候我们找到了数据倾斜发生的地方了,但是我们还需要知道到底是哪个key数据量特别大导致的数据倾斜,于是接下来来聊一聊这个问题。

找到这个key的算法,我们可以使用采样的方式

代码如下:

  1. val sampledPairs = pairs.sample(false, 0.1)

  2. val sampledWordCounts = sampledPairs.countByKey()

  3. sampledWordCounts.foreach(println(_))


  4. //简单说一下原理,从所有key中,把其中每一个key随机取出来一部分,然后进行一个百分比的推算,学过采样算法的都知道,这是用局部取推算整体,虽然有点不准确,但是在整体概率上来说,我们只需要大概就可以定位那个最多的key了


  5. //或者使用如下代码

  6. df.select("key").sample(false, 0.1) // 数据采样

  7. .(k => (k, 1)).reduceBykey(_ + _) // 统计 key 出现的次数

  8. .map(k => (k._2, k._1)).sortByKey(false) // 根据 key 出现次数进行排序

  9. .take(10)

如何解决数据倾斜?

这里给出比较常用的几种解决方案,应该能解决90%的数据倾斜问题

1.过滤异常数据

  1. 空值或者异常值之类的,大多是这个原因引起

  2. 无效数据,大量重复的测试数据或是对结果影响不大的有效数据

2.提高shuffle 的并行度

我们知道在reduceBykey中有一个shuffle read task的值默认为200,也就是说用两百个task来处理任务

对于我们一个很大的集群来说,每个task的任务中需要处理的key也是比较多的,这个时候我们把这个数量给提高,比如设置reduceBykey(1000),这个时候task的数量就多了,然后分配到每个task中的key就少了,于是说并行度就提高了。但是总体来说,这种解决办法对于某一个数量特别大的key来说效果不明显,只能说key多的时候,我们可以有一定的程度上环境数据倾斜的问题,所以这种方法也不是我们要找到的最好的办法,有一定的局限性

3.两阶段聚合(加盐局部聚合+去盐全局聚合)

比如有一个rdd,某一个key数据量比较大,那么在进行shuffle的时候肯定会慢(除非贵司是土豪,不在乎这点成本).

那么先将这个key打上10以内的随记前缀。如

  1. 0_hello,1

  2. 1_hello,1

  3. 2_hello,1

  4. 3_hello,1

  5. 0_hello,1

  6. 2_hello,1

  7. 3_hello,1

  8. .....

然后这个时候进行局部的预聚合,比如reduceBykey,于是经过局部的聚合后,我们得到了下面这种

  1. 0_hello,3000

  2. 1_hello,2000

  3. 2_hello,2500

  4. 3_hello,2500

然后在去掉之前加的随机前缀,在进行聚合,reduceBykey

  1. hello,10000

于是通过这种把key进行拆分的方式,我们把key分配给了一些task去执行任务,经过实验数据表明,这种方法可以提高数倍效率

这种方法也是有局限性的,他适用于聚合类的shuffle操作,如果对于join操作,还是不行的

4.将reduce join转成map join

这种方法适用于一个大数据量和小数据量的关联。

具体操作是就是选择两个rdd中那个比较数据量小的,然后我们把它拉到driver端,再然后通过广播变量(broadcast)的方式给他广播出去,这个时候再进行join 的话,因为数据都是在同一Executor中,所以shuffle 中不会有数据的传输,也就避免了shuffle带来的数据倾斜。

  1. object MapJoinTest {


  2. def main(args: Array[String]): Unit = {

  3. val conf = new SparkConf().setMaster("local").setAppName("WordCount")

  4. val sc = new SparkContext(conf)

  5. val lista=Array(

  6. Tuple2("001","小红"),

  7. Tuple2("002","小明")

  8. )

  9. //数据量小一点

  10. val listb=Array(

  11. Tuple2("001","江西"),

  12. Tuple2("002","山东")

  13. )

  14. val listaRDD = sc.parallelize(lista)

  15. val listbRDD = sc.parallelize(listb)

  16. //val result: RDD[(String, (String, String))] = listaRDD.join(listbRDD)

  17. //设置广播变量

  18. val listbBoradcast = sc.broadcast(listbRDD.collect())

  19. listaRDD.map( tuple =>{

  20. val key = tuple._1

  21. val name = tuple._2

  22. val map = listbBoradcast.value.toMap

  23. val className = map.get(key)

  24. (key,(name,className))

  25. }).foreach( tuple =>{

  26. println("省编码"+tuple._1 + " 姓名:"+tuple._2._1 + " 省份名称:"+tuple._2._2.get)

  27. })

  28. }

  29. }

5.拆分 join 再 union

join因为热点值导致长尾,也可以将热点数据和非热点数据分开处理,最后合并

6.大表 key 加盐,小表扩大 N 倍 join

这种方法适用于倾斜的key比较多,没办法把这些key单独拿出来进行计算。

即将存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍)。

最后提几句

这里只是简单做了下数据倾斜的原因以及解决方案,大家在实际工作中肯定会遇到这种问题,如果以上方案均不能解决你的问题,可私信或者google进行解决。办法总比困难多!

如对更多的spark或者其他大数据技术感兴趣,请扫描二维码关注大佬一波~


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存