其他
Spark SQL 小文件问题处理
在生产中,无论是通过SQL语句或者Scala/Java等代码的方式使用Spark SQL处理数据,在Spark SQL写数据时,往往会遇到生成的小文件过多的问题,而管理这些大量的小文件,是一件非常头疼的事情。
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 478 tasks (2026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
当然可以通过调大spark.driver.maxResultSize的默认配置来解决问题,但如果不能从源头上解决小文件问题,以后还可能遇到类似的问题。
此外,Spark在处理任务时,一个分区分配一个task进行处理,多个分区并行处理,虽然并行处理能够提高处理效率,但不是意味着task数越多越好。如果数据量不大,过多的task运行反而会影响效率。
1) 对表test_tab进行写入操作
2) t1的分区数是100,t2的分区数是200,union all后生成的tmp分区数是300
3) test_tab产生的小文件数基本也在300左右
select * from t1 union all select * from t2 as tmp;
insert overwrite table test_tab select * from tmp;
2. 分析
1)执行上述insert操作时的分区并行度,主要受tmp的分区数(对应一个DataSet)影响,
2)tmp的分区数主要受t1、t2以及union all的影响
3)暂且不考虑t1或t2是物理表还是经过其他处理生成的临时表,它们的分区数是确定的,这里主要看经过union all处理后,生成的tmp的分区数和t1、t2的分区数有何关系?
最后,Spark中一个task处理一个分区从而也会影响最终生成的文件数。
通过repartition或coalesce算子控制最后的DataSet的分区数 注意repartition和coalesce的区别,具体可以参考文章《重要|Spark分区并行度决定机制》 将Hive风格的Coalesce and Repartition Hint 应用到Spark SQL 需要注意这种方式对Spark的版本有要求,建议在Spark2.4.X及以上版本使用,示例: INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...
小文件定期合并
可以定时通过异步的方式针对Hive分区表的每一个分区中的小文件进行合并操作
推荐文章:
关于HDFS应知应会的几个问题