其他
如何避免Spark SQL做数据导入时产生大量小文件
The following article is from 网易有数 Author 大数据平台
什么是小文件?
小文件问题的影响
Spark小文件产生的过程
2. 动态分区插入数据
没有Shuffle的情况下,输入端有多少个逻辑分片,对应的HadoopRDD就会产生多少个HadoopPartition,每个Partition对应于Spark作业的Task(个数为M),分区数为N。最好的情况就是(M=N)&(M中的数据也是根据N来预先打散的),那就刚好写N个文件;最差的情况下,每个Task中都有各个分区的记录,那文件数最终文件数将达到M * N个。这种情况下是极易产生小文件的。
use tpcds_1t_parquet;
INSERT overwrite table store_sales partition
( ss_sold_date_sk )
SELECT * FROM tpcds_1t_ext.et_store_sales;
== Physical Plan ==InsertIntoHiveTable MetastoreRelation tpcds_1t_parquet, store_sales, Map(ss_sold_date_sk -> None), true, false+- HiveTableScan [ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25, ss_sold_date_sk#3L], MetastoreRelation tpcds_1t_ext, et_store_sales
InsertIntoHiveTable MetastoreRelation tpcds_1t_parquet, store_sales, Map(ss_sold_date_sk -> None), true, false
+- *Project [ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25, ss_sold_date_sk#3L]
+- Exchange(coordinator id: 1080882047) hashpartitioning(_nondeterministic#49, 2048), coordinator[target post-shuffle partition size: 67108864]
+- *Project [ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25, ss_sold_date_sk#3L, rand(4184439864130379921) AS _nondeterministic#49]
+- HiveTableScan [ss_sold_date_sk#3L, ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25], MetastoreRelation tpcds_1t_ext, et_store_sales
如何解决Spark SQL产生小文件问题
use tpcds_1t_parquet;
INSERT overwrite table store_sales partition
( ss_sold_date_sk )
SELECT * FROM tpcds_1t_ext.et_store_sales
distribute by ss_sold_date_sk, cast(rand() * 5 as int);
1825 9105 247111074494 /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales
在我们知道那个分区键倾斜的情况下,我们也可以将入库的SQL拆成几个部分。
比如我们store_sales是因为null值倾斜,我们就可以通过where ss_sold_date_sk is not null 和 where ss_sold_date_sk is null 将原始数据分成两个部分。前者可以基于分区字段进行分区,如distribute by ss_sold_date_sk;后者可以基于随机值进行分区,distribute by cast(rand() * 5 as int),这样可以静态的将null值部分分成五个文件。
FROM tpcds_1t_ext.et_store_sales
where ss_sold_date_sk is not null
distribute by ss_sold_date_sk;
FROM tpcds_1t_ext.et_store_sales
where ss_sold_date_sk is null
distribute by distribute by cast(rand() * 5 as int);
FROM tpcds_1t_ext.et_store_sales
where ss_sold_date_sk is null
distribute by distribute by rand() ;
Spark SQL 小文件实验
建表 - 按分区字段插入非空集合到分区表 - 按rand插入空集到分区表,并开启自Spark SQL适应 建表 - 不shuffle 按原始分片直接插入分区表 建表 - 全集按照分区字段插入到分区表 建表 - 全局按分区字段+cast(rand() * 5 as int)方式插入分区表
bin/hadoop fs -count /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/
1825 1863 192985051585 /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales
bin/hadoop fs -du -h /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00000
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00001
183.3 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00002
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00003
183.1 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00004
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00005
183.2 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00006
......
bin/hadoop fs -du -h /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=2452642
194.5 M /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=2452642/part-00369
总结
对于原始数据进行按照分区字段进行shuffle,可以规避小文件问题。但有可能引入数据倾斜的问题 可以通过distribute by ss_sold_date_sk, cast(rand() * N as int),N值可以在文件数量和倾斜度之间做权衡 知道倾斜键的情况下,可以将原始数据分成几个部分处理,不倾斜的按照分区键shuffle,倾斜部分可以按照rand函数来shuffle 对于Spark 2.4 以上版本的用户,也可以使用HINT 详情,链接如下: https://issues.apache.org/jira/browse/SPARK-24940 对于Spark 3.0 以上版本的用户,可以使用自适应查询(AQE)功能,设置spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled为true,Spark就会在计算过程中自动帮助用户合并小文件,更加方便和智能 网易有数大数据平台是一站式大数据管理和应用开发平台,具有敏捷易用,成熟稳定,安全可靠,开放灵活的特点,提供7*24小时专业服务
推荐文章: