玩转Spark小文件合并与文件读写提交机制
1.Spark小文件产生的原因
大数据领域的小文件,通常是指文件大小显著小于HDFS Block块(128MB/256MB)的文件,小文件过多会给HDFS带来严重的性能瓶颈(主要表现在NameNode节点元数据的管理以及客户端的元数据访问请求),并且对用户作业的稳定性和集群数据的维护也会带来很大的挑战。
Spark程序产生的文件数量直接取决于RDD中partition分区的数量和表分区的数量,需要注意的是,这里提到的两个分区,概念并不相同,RDD的partition分区与任务并行度高度相关,而表分区则是指的Hive表分区(对于Hive,一个分区对应一个目录,主要用于数据的归类存储),产生的文件数目一般是RDD分区数和表分区数的乘积。因此,当Spark任务并行度过高或表分区数目过大时,非常容易产生大量的小文件。
2.Spark小文件合并带来的收益
Spark小文件合并功能在多个方面给用户带来了较大的收益,主要归纳为如下四点
减少文件数量,便于数据生命周期管理;对于HDFS这种文件系统,任何一个文件、目录或者数据块(Block)都会在NameNode机器节点的内存中保存一份元数据,受限于NameNode机器节点物理内存的大小,HDFS上存放的文件数量也将受到极大的制约,将大量小文件合并成为少数几个大文件,能很大程度地减小NameNode的负载压力,提高执行性能。此外,处理小文件并非HDFS的擅长之处,HDFS的设计目标是流式访问大数据集(TB级别),在HDFS上存放大量小文件,访问这些小文件获取数据时,需要频繁连接访问不同的DataNode节点,严重影响数据的读写效率
提高用户作业的数据读取效率;如果数据源分布着大量小文件,则Spark作业在从数据源读取数据时,也会产生大量的task,而Spark作业可用的Executor数量有限(相对应地,并行执行的task数量有限),在这种情况下,每个Executor会依次执行多个task,执行task的时候,会进行反序列化等一系列操作;如果数据源都是文件大小均衡、接近数据块大小的大文件,则Spark作业在从数据源读取数据时,产生的task数量相对会大幅减少,在这种情况下,每个Executor可能只需要依次执行少量几个task,避免在多个task之间切换,提高执行效率。
降低Spark driver端出现OOM的概率;对于Spark作业,数据源分布的小文件数量越多,则Spark作业在从数据源读取数据时,产生的task数量也会更多,而数据分片信息以及对应产生的task元信息都保存在driver端的内存中,会给driver带来很大的压力,甚至引发OOM异常导致作业执行失败。
加快大型Spark作业的执行速度;在基于FileOutputCommitter V1文件提交机制的应用场景下,Spark作业在所有task执行完commitTask方法提交数据后,driver端会继续执行commitJob方法来依次将数据文件转移到job的最终目标目录下,当文件数量非常大时,会成为一个严重的性能瓶颈,导致Spark作业迟迟不能结束
3.Spark小文件合并基本原理
Spark任务在执行过程中,通常都要经历从数据源获取数据,然后在内存中进行计算,最后将数据进行持久化的过程,其中有两个非常关键的操作:
1、executor端的task任务执行commitTask方法,将数据文件从task临时目录转移到Job临时目录;
2、driver端执行commitJob方法,将各个task任务提交的数据文件,从Job临时目录转移到Job的最终目标目录。
Spark小文件合并的基本原理:
在executor端,各个task任务执行完commitTask方法提交数据后,先获取作业对应的所有小文件,然后按照分区对小文件进行分组合并,最后driver端执行commitJob方法,将合并后的数据文件转移到Job的最终目标目录。在Spark作业中,引入小文件合并功能的执行流程,如下图:
开源的Spark有两种不同的文件提交机制,即FileOutputCommitter V1和FileOutputCommitter V2,它们是Spark小文件合并功能的算法基础,只有充分了解FileOutputCommitter V1和FileOutputCommitter V2文件提交机制,才能更好地理解Spark小文件合并功能的工作原理。首先,我们来了解下,Spark写文件跟其他的程序写文件有什么不一样的地方。
4.Spark写文件不一样的地方
通常情况下,在单机上写文件时,都会生成一个指定文件名的文件,而调用Spark DataFrame的writer接口来写文件时,所得到的结果却不尽相同。如下图右侧所示,在指定路径下写入了3个数据文件。为什么会这样?这就与Spark的执行方式有关了,Spark是分布式计算系统,其RDD中的数据是分散在多个Partition中的,而每个Partition对应一个task来执行,这些task会根据vcores数量来并行执行。在下图示例中,分配了3个Partition,所以生成了part-00000、part-00001、part-00002共3个文件(文件名中间的一长串UUID是在job中统一生成的)。按照这样的执行方式,如果直接把数据写入到指定的路径下,会出现哪些问题呢?
问题1:由于是多个task并行写文件,如何保证所有task写的所有文件要么同时对外可见,要么同时不可见?在上图示例中,3个task的写入速度是不同的,那就会导致不同时刻看到的文件个数是不一样的。此外,如果有一个task执行失败了,会导致有2个文件残留在这个路径下;
问题2:同一个task可能因为Speculation推测执行或其他原因,导致某一时刻有多个task attempt并行执行,即同一个task有多个实例同时写相同的数据到相同的文件中,势必会造成冲突。如何来保证最终只有一个task是成功的并且数据是正确的呢?
FileOutputCommitter V1 和 FileOutputCommitter V2文件提交机制,很好地解决了上述问题,虽然它们在执行逻辑上非常相似,但在保证原子性和数据一致性等方面,差异很大。下面,我们就来详细了解这两种文件提交机制。
4.1FileOutputCommitter V1文件提交机制
FileOutputCommitter V1文件提交机制的基本工作原理,需要经历两次rename过程,每个task先将数据写入到如下临时目录:
finalTargetDir/_temporary/appAttemptDir/_temporary/taskAttemptDir/dataFile
等到task完成数据写入后,执行commitTask方法做第一次rename,将数据文件从task临时目录转移到如下临时目录:
finalTargetDir/_temporary/appAttemptDir/taskDir/dataFile
最后,当所有task都执行完commitTask方法后,由Driver负责执行commitJob方法做第二次rename,依次将数据文件从job临时目录下的各个task目录中,转移到如下最终目标目录中,并生成_SUCCESS标识文件:
finalTargetDir/dataFile
FileOutputCommitter V1文件提交机制的执行流程,如下图:
4.2 FileOutputCommitter V2文件提交机制
FileOutputCommitter V1文件提交机制较好地解决了数据一致性的问题,因为只有在rename的过程中才可能出现数据一致性问题,而通常情况下,这种问题出现的概率非常低。但是,两次rename带来了性能上的问题,主要表现在:如果有大量task写入数据,即使所有task都执行完成了,仍需等待较长一段时间作业才结束,这些时间主要耗费在driver端做第二次rename,这个问题在对象存储中尤为突出。
FileOutputCommitter V2文件提交机制的出现,解决了两次rename存在的性能问题,其执行流程如下图。相比于FileOutputCommitter V1文件提交机制,主要去掉了在commitJob阶段做第二次rename来提高性能,但是牺牲了一部分的数据一致性。在FileOutputCommitter V2文件提交机制中,如果部分task已执行成功,而此时job执行失败,就会出现一部分数据对外可见,也就是出现了脏数据,需要数据消费者根据是否新生成了_SUCCESS标识文件来判断数据的完整性。
涤生大数据往期精彩推荐
8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!
10.基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践
12.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)
13.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(二)
14.5分钟了解实时车联网,车联网(IoV)OLAP 解决方案是怎样的?
15.企业级Apache Kafka集群策略:Kakfa最佳实践总结