数据本地性对 Spark 生产作业容错能力的负面影响
数据本地性是Spark等计算引擎从计算性能方面去考量的一个重要指标,对于某个数据分片的运算,Spark在调度侧会做数据本地性的预测,然后尽可能的将这个运算对应的Task调度到靠近这个数据分片的Executor上。
Spark计算作业依赖于整个物理计算集群的稳定性,抛开软件层,如资源管理层(YARN,Kubernetes),存储层(HDFS)本身的稳定性不说,Spark依赖于物理机器上的CPU、内存、磁盘和网络进行真正的计算作业。单个物理机的硬件故障是一个小概率的事件,但当集群的规模到达成百上千甚至过万台,那以集群为维度,大大小小的硬件故障将成为一个常态。
在 Spark 中数据本地性通过 TaskLocality 来表示,有如下几个级别:
PROCESS_LOCAL
NODE_LOCAL
NO_PREF
RACK_LOCAL
ANY
从上到下数据本地性依次递减。
spark.locality.wait
默认为3s,控制这个计算的过程。原理这里不细讲,简而言之就是重试。Spark规定了同一个job中同一个stage连续失败重试的上限(
spark.stage.maxConsecutiveAttempts
),默认为4,也规定了一个Stage中同一个Task可以失败重试的次数(spark.task.maxFailures
),默认为4。当其中任何一个阈值达到上限,Spark都会使整个job失败,停止可能的"无意义"的重试。
我们首先来看一个例子,如图所示,图为Spark Stage页面下Task Page的详细视图。
第一列表示该Task进行了4次重试,所以这个Task对应的Job也因此失败了
第三列表示该Task的数据本地性,都是NODE_LOCAL级别,对于一个从HDFS读取数据的任务,显然获得了最优的数据本地性
第四列表示的是Executor ID,我们可以看到我们任务的重试被分配到ID为5和6两个Executor 上
第五列表示我们运行这些重试的Task所在的Executor所在的物理机地址,我们可以看到他们都被调度到了同一个
最后列表示每次重试失败的错误栈
问题一:单个Task重试为什么失败?
结合硬件层面的排查,发现是NodeManager物理节点上挂在的/mnt/dfs/4,出现硬件故障导致盘只读,ShuffleMapTask在即将完成时,将index文件和data文件commit时,获取index的临时文件时候发生FileNotFoundException
。
java.io.FileNotFoundException: /mnt/dfs/4/yarn/local/usercache/da_haitao/appcache/application_1568691584183_1953115/blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/0a/shuffle_96_2685_0.index.82594412-1f46-465e-a067-2c5e386a978e (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:245)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
问题二:为什么该Task的4次重试都在同一个物理节点?
spark.locality.wait
默认为3s的时间约束内成功获得了NODE_LOCAL级别的数据本地性,故而都调度到了同一个NodeManger
物理节点。问题三:为什么总是“本地重试”,不是“异地重试”?
这个过程从逻辑上讲,其实已经不是“本地重试”,而恰恰是“异地重试”了。这我们可以从4次的重试的Executor ID上进行判断,第0、1和3次是在ID 6上进行的,而第2次是在ID 5上发生的。但由于ID 5和6都在同一个NodeManger节点,所以我们看起来像是“本地重试”。另一个原因就是上面所说的数据本地性的成功解析,所以这些Task的每次重试都高概率的来到这个节点。
所有Spark Task级别的重试从逻辑上都应该属于“异地重试”,他们都需要通过Driver重新调度到新的Executor进行重试。我们所观测到的"本地"和"异地"是属于"现象"而非"本质",影响这种现象的条件有比如下面几个(不一定全面):
1. 数据本地性
2. Executor由于NodeLabel限制,只在若干有限的物理机上分配
3. ResourceManager调度时刚好把所有的Executor都分配到某个节点上
问题四:为什么4次失败都操作同一个坏的盘?
该NodeManger实际上有/mnt/dfs/{0-11},一共12块盘,从物理检查上看,整个过程中也只有/mnt/dfs/4有异常告警,那为啥Spark这么傻?这么多好盘不用,专挑一块坏的盘死磕?
1. /mnt/dfs/4/yarn/local/
2. usercache/da_haitao/appcache/application_1568691584183_1953115/ blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/
3. 0a/
4. shuffle_96_2685_0.index
5. .82594412-1f46-465e-a067-2c5e386a978e
第一行,是Yarn NodeManger所配置的LOCAL_DIR的一部分,完整的应该包括12块盘
第二行,是Spark生成的BlockManger的根目录之一,其他盘符下也有类似的一个目录
第三行,是一个根目录下的一级子目录,数量由
spark.diskStore.subDirectories
默认为64控制第四行,Spark Shuffle过程产生的两个重要的文件之一,一个是数据文件
.data
结尾,另一个就是这个与之对应的.index
文件。96是 ShuffleID 表标识是哪个Shuffle 过程,2685是 MapID 对应的是 一个RDD所以有分区中其中一个的顺序号,而0是一个固定值,原本表示是ReduceID,Spark Sort Based Shuffle的实现不需要依赖这个值,所以被固定为了0。通过Shuffle ID和 MapId,Shufle Write 阶段就可以生成类似shuffle_96_2685_0.index这样的文件,而Shuffle Read 阶段也可以通过两个ID 定位到这个文件。第五行,是Index文件的对应临时文件的UUID标识。
基于这样的逻辑,对于某次Shuffle过程的某个分区(Partition)的最终输出文件名其实是可以预测的也是固定的,比如我们这个case中,第96次shuffle的第2685分区的index文件的文件名即为shuffle_96_2685_0.index。
Spark在写和读这个文件的时候,基于相同的定位逻辑(算法)来保证依赖关系,第一步确定根目录,Spark通过文件名的hash绝对值与盘符数的模,作为索引却确定根目录。
scala> math.abs("shuffle_96_2685_0.index".hashCode) % 12
res0: Int = 6
而根目录的数组对于一个Executor的这个生命周期内而言是确定的,它是一个由简单随机算法将所有路径打散的一个固定数组。所以一旦文件名称确定,Executor不换的话,根目录一定是确定的。所以都固定的去访问/mnt/dfs/4这个坏盘。
问题五:为什么两个Executor上的重试都失败了?
其实这个问题只是概率的问题,Spark用类似下面算法打乱所有LOCAL_DIRS的配置,如下面的的简单测试,这种碰撞的概率还是极高的,我们ID5,6,的Executor下DiskBlockManager包含的 localDirs(6)应该都对应于/mnt/dfs/4这个坏盘。
scala> def randomizeInPlace[T](arr: Array[Int], rand: java.util.Random = new java.util.Random): Array[Int] = {
| for (i <- (arr.length - 1) to 1 by -1) {
| val j = rand.nextInt(i + 1)
| val tmp = arr(j)
| arr(j) = arr(i)
| arr(i) = tmp
| }
| arr
| }
randomizeInPlace: [T](arr: Array[Int], rand: java.util.Random)Array[Int]
scala> randomizeInPlace(res11)
res23: Array[Int] = Array(3, 2, 4, 1)
scala> randomizeInPlace(res11)
res24: Array[Int] = Array(2, 3, 4, 1)
scala> randomizeInPlace(res11)
res25: Array[Int] = Array(2, 1, 3, 4)
scala> randomizeInPlace(res11)
res26: Array[Int] = Array(4, 2, 1, 3)
scala> randomizeInPlace(res11)
res27: Array[Int] = Array(2, 3, 4, 1)
1. 问题原因
集群某个或某几个物理机上某块或某几块盘出现磁盘问题时,Spark 由于数据本地性原因反复把Task调度到这个节点的某个Executor,或这个节点的其他Executor上,前者必然失败,后者有概率失败。
2. 规避方案
设置
spark.locality.wait=0s
,让Task有更大的概率调度到别的节点,当然可能会影响一定的性能设置 spark.blacklist.enabled=true
,开启黑名单,把问题节点加到黑名单中,暂不参与Task的分配。当然使用黑名单的话,不注意也很容易踩坑
3. 解决方案
说来也巧,在我刚去社区提https://issues.apache.org/jira/browse/SPARK-29257这个JIRA,并沟通初步方案时,发现社区在两天之前刚将https://github.com/apache/spark/pull/25620这个Pull request合入了,虽然这个PR不是专门解决我所提到的这个问题的,但它确产生了一个副作用,刚好解决了这个问题。
本质的想法就是构建shuffle_${shuffleId}_${mapId}_0.index
这类Shuffle文件时,可以让每次重试都可以生成Unique的文件名,这样就可以生成不同的hash值并挑选别的盘作为根目录了,这样就不会一直在一块坏盘上吊死。这个PR中已经将mapId换成了每个task的taskAttemtId,而这个值就是unique的,所以天然就解决了这个问题。
对于2.x的Spark版本,大家可以尝试合入这个PR。
参考文献:
https://github.com/apache/spark/pull/25620
作者:Kent_Yao
来源:https://www.jianshu.com/p/72ffaa102200
推荐文章:
【PySpark源码解析】用Python调用高效Scala接口,搞定大规模数据分析