海量小文件问题综述和解决攻略
在当今互联网、物联网、云计算、大数据等高速发展的大背景下,数据呈现出几何式增长。这些数据不仅需要巨量的存储空间,而且数据类型繁多、数据大小变化大、流动快等特点,往往产生数亿级的海量小文件。由于在元数据管理、存储效率、访问的性能等方面面临巨大的挑战,因此海量小文件(LSOF,lots of small files)问题是工业界和学术界公认的难题。
本文汇总之前文章以及参考网上关于海量小文件问题的论述和常见系统的解决方案,阐述在大数据系统中对于LSOF的系统性解决方案,以及针对目前大数据领域常用的技术框架面临小文件问题时的原因探讨和解决方法。
小文件问题概述
衡量存储系统性能主要有两个关键指标,即IOPS和数据吞吐量。
IOPS (Input/Output Per Second) 即每秒的输入输出量 (或读写次数) ,是衡量存储系统性能的主要指标之一。IOPS是指单位时间内系统能处理的I/O请求数量,一般以每秒处理的I/O请求数量为单位,I/O请求通常为读或写数据操作请求。随机读写频繁的应用,如OLTP(OnlineTransaction Processing),IOPS是关键衡量指标。
另一个重要指标是数据吞吐量(Throughput),指单位时间内可以成功传输的数据数量。对于大量顺序读写的应用,如VOD(VideoOn Demand),则更关注吞吐量指标。
我们的存储磁盘最适合顺序的大文件I/O读写模式,非常不适合随机的小文件I/O读写模式,这是磁盘文件系统在海量小文件应用下性能表现不佳的根本原因。磁盘文件系统的设计大多都侧重于大文件,包括元数据管理、数据布局和I/O访问流程,另外VFS系统调用机制也非常不利于海量小文件,这些软件层面的机制和实现加剧了小文件读写的性能问题。
对于小文件的I/O访问过程,读写数据量比较小,这些流程太过复杂,系统调用开销太大,尤其是其中的open()操作占用了大部分的操作时间。当面对海量小文件并发访问,读写之前的准备工作占用了绝大部分系统时间,有效磁盘服务时间非常低,从而导致小I/O性能极度低下。
小文件合并存储是目前优化LOSF问题最为成功的策略,已经被包括Facebook Haystack和淘宝TFS在内多个分布式存储系统采用。它通过多个逻辑文件共享同一个物理文件,将多个小文件合并存储到一个大文件中,实现高效的小文件存储。为什么这种策略对LOSF效果显著呢?
Hadoop中的小文件一般是指明显小于HDFS的block size(默认128M,一般整数倍配置如256M)的文件。但需要注意,HDFS上的有些小文件是不可避免的,比如jar、临时缓存文件等。但当小文件数量变的"海量",以至于Hadoop集群中存储了大量的小文件,就需要对小文件进行处理,而处理的目标是让文件大小尽可能接近HDFS的block size大小或者整数倍。
1)众所周知,在HDFS中数据和元数据分别由DataNode和NameNode负责,这些元数据每个对象一般占用大约150个字节。大量的小文件相对于大文件会占用大量的NameNode内存。对NameNode内存管理产生巨大挑战,此外对JVM稳定性也有影响如GC。
一个Hadoop集群中存在小文件的可能原因如下:
1.流式任务(如spark streaming/flink等实时计算框架)
2.Hive分区表的过度分区
NameNode存储了所有与文件相关的元数据,所以它将整个命名空间保存在内存中,而fsimage是NameNode的本地本机文件系统中的持久化记录。因此,我们可以通过分析fsimage来找出文件的元信息。fsimage中可用的字段有:
Path, Replication, ModificationTime, AccessTime, PreferredBlockSize, BlocksCount, FileSize, NSQUOTA, DSQUOTA, Permission, UserName, GroupName
hdfs oiv -p Delimited -delimiter "|" -t /tmp/tmpdir/ -i fsimage_copy_file -o fsimage_deal.out
关于hdfs oiv命令的使用,可以查看useage。
Usage: bin/hdfs oiv [OPTIONS] -i INPUTFILE -o OUTPUTFILE
Offline Image Viewer
View a Hadoop fsimage INPUTFILE using the specified PROCESSOR,
saving the results in OUTPUTFILE.
The oiv utility will attempt to parse correctly formed image files
and will abort fail with mal-formed image files.
The tool works offline and does not require a running cluster in
order to process an image file.
The following image processors are available:
* XML: This processor creates an XML document with all elements of
the fsimage enumerated, suitable for further analysis by XML
tools.
* FileDistribution: This processor analyzes the file size
distribution in the image.
-maxSize specifies the range [0, maxSize] of file sizes to be
analyzed (128GB by default).
-step defines the granularity of the distribution. (2MB by default)
* Web: Run a viewer to expose read-only WebHDFS API.
-addr specifies the address to listen. (localhost:5978 by default)
* Delimited (experimental): Generate a text file with all of the elements common
to both inodes and inodes-under-construction, separated by a
delimiter. The default delimiter is \t, though this may be
changed via the -delimiter argument.
Required command line arguments:
-i,--inputFile <arg> FSImage file to process.
Optional command line arguments:
-o,--outputFile <arg> Name of output file. If the specified
file exists, it will be overwritten.
(output to stdout by default)
-p,--processor <arg> Select which type of processor to apply
against image file. (XML|FileDistribution|Web|Delimited)
(Web by default)
-delimiter <arg> Delimiting string to use with Delimited processor.
-t,--temp <arg> Use temporary dir to cache intermediate result to generate
Delimited outputs. If not set, Delimited processor constructs
the namespace in memory before outputting text.
-h,--help Display usage information and exit
另一种方法是使用fsck命令扫描当前的HDFS目录并保存扫描后的信息。但是不建议在生产环境使用fsck命令,因为它会带来额外的开销,可能影响集群的稳定性。
上面的内容提到过每个block的元数据都需要加载到NameNode的内存中,这导致一个Hadoop集群在NameNode中存储的对象是有上限的,并且对象太多会带来启动时间较长以及网络延迟的问题。常见的有两种解决方案,减少集群的NameNode中的对象数量,或者以某种方式让NameNode使用更多的"内存"但不会导致较长的启动时间,这就是Hadoop Archive(HAR)文件和NameNode联邦。
Hadoop archive files通过将许多小文件打包到更大的HAR文件中来缓解NameNode内存问题,类似于Linux上的TAR文件。这样可以让NameNode只处理单个HAR文件,而不是数十个或数百个小文件。可以使用har://前缀而不是hdfs://来访问HAR文件中的文件。HAR文件是基于HDFS中已有的文件创建的。因此,HAR文件不仅可以合并从数据源抽取到HDFS中的数据,也可以合并通过正常的MR处理创建的数据。HAR文件可以独立的用于解决小文件问题,除了HDFS没有其他的依赖。
NameNode联邦允许你在一个集群中拥有多个NameNode,每个NameNode都存储元数据对象的子集。这样可以让所有的元数据对象都不止存储在单个机器上,也消除了单个节点的内存限制,因为你可以扩容。这听上去是一个很美丽的方案,但其实它也有局限性。
根据之前讨论的内容,MR性能问题主要是由随机磁盘IO和启动/管理太多的map任务组合引起的。解决方案似乎很明显 - 合并小文件,然而这个事往往说起来容易做起来难。以下讨论一下几种解决方案:
修改数据抽取方法/间隔
批量文件合并
A = load '/data/inputDir' using PigStroage();
store A into '/data/inputDir' using PigStroage();
在Hive或MR中实现同样比较容易。这些MR任务运行同样需要集群资源,所以建议调度在生产系统非繁忙时间段执行。但是,应该定期执行这种合并的MR作业,因为小文件随时或者几乎每天都可能产生。但这个合并程序需要有额外的逻辑来判断存在大量小文件的目录,或者你自己是知道哪些目录是存在大量小文件的。因为假如某个目录只有3个文件,运行合并作业远不如合并一个500个文件的文件夹的性能优势提升明显。
https://github.com/edwardcapriolo/filecrush/
Sequence文件
当需要维护原始文件名时,常见的方法是使用sequence文件。在此解决方案中,文件名作为key保存在sequence文件中,然后文件内容会作为value保存。下图给出将一些小文件存储为sequence文件的示例:
------------------------------------------------------------------------------------------------------------
| Key | Value | Key | Value | Key | Value
------------------------------------------------------------------------------------------------------------
| file1.txt | file1 contents| file2.txt |file2 contents | fileN.txt | fileN contents
------------------------------------------------------------------------------------------------------------
HBase
解决小文件问题,除了HDFS存储外,当然还可以考虑HBase列式存储。使用HBase可以将数据抽取过程从生成大量小HDFS文件更改为以逐条记录写入到HBase表。如果你对数据访问的需求主要是随机查找或者叫点查,则HBase是最好的选择。HBase在架构上就是为快速插入,存储大量数据,单个记录的快速查找以及流式数据处理而设计的。但如果你对数据访问的需求主要是全表扫描,则HBase不是最适合的。
使用CombineFileInputFormat
http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
set mapreduce.input.fileinputformat.split.maxsize=1073741824
set mapreduce.input.fileinputformat.split.minsize=1073741824
通过Hive合并小文件
-------------------------------------------------------------------------------------------------
Property | Description | Default Value
-------------------------------------------------------------------------------------------------
hive.merge.mapfiles | Merge small files that are produced from | true
| map-only jobs |
-------------------------------------------------------------------------------------------------
hive.merge.mapredfiles | Merge small files that are produced from | false
| map-reduce jobs |
-------------------------------------------------------------------------------------------------
hive.merge.size.per.task | When merging small files the target size for | 256000000
| the merge files at the end of the job | (in bytes)
-------------------------------------------------------------------------------------------------
hive.merge.smallfiles.avgsize | When the average size of the output files | 16000000
| is less than this number,Hive will execute an | (in bytes)
| additional MR job to merge the files based on |
| hive.merge.mapfiles and hive.merge.mapredfiles|
-------------------------------------------------------------------------------------------------
使用Hadoop的追加特性
小文件的问题其实以前也一直困扰着我,对于传统数仓,导致小文件多的原因非常多:
前面,我们谈到了小文件的根源。那么文件多就多了,为什么是个问题呢?核心原因在于HDFS的设计问题,他需要把文件meta信息缓存在内存里,这个内存只能是单机的,所以变成了一个很大的瓶颈。虽然后面HDFS一直尝试解决这个问题,比如引入联邦制等,但是也变相的引入了复杂性。
我们知道,其实大部分存储的问题都有小文件的多的问题,比如HBase等,他们的解决方案是做compaction,本质上就是讲小文件合并成大文件。HBase还有minor compaction和 major compaction之分。截止到目前(0.4.0版本),Delta还没有提供类似的compaction功能,但是基于Delta已经提供的扩展接口,我们也可以很轻易的实现compaction的功能。Compaction的核心点是,在做compaction的过程不能影响读写,而Delta的版本设计可以很简单的做到这一点。
Delta的compaction因为有了上面的约束,会变得异常简单。
输入合并和输出合并
-- 每个Map最大输入大小,决定合并后的文件数
set mapred.max.split.size=256000000;
-- 一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并
set mapred.min.split.size.per.node=100000000;
-- 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
set mapred.min.split.size.per.rack=100000000;
-- 执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
-- 在Map-only的任务结束时合并小文件
set hive.merge.mapfiles = true
-- 在Map-Reduce的任务结束时合并小文件
set hive.merge.mapredfiles = true
-- 合并文件的大小
set hive.merge.size.per.task = 256*1000*1000
-- 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
set hive.merge.smallfiles.avgsize=16000000
Hive使用HAR归档文件
set hive.archive.enabled=true;
set hive.archive.har.parentdir.settable=true;
set har.partfile.size=1099511627776;
ALTER TABLE srcpart ARCHIVE PARTITION(ds= '2021-02-01', hr= '12' );
ALTER TABLE srcpart UNARCHIVE PARTITION(ds= '2021-02-01', hr= '12' );
数据仓库Hive表分区优化
对于统计数据表、数据量不大的基础表、业务上无累计快照和周期性快照要求的数据表,尽可能的不创建分区,而采用数据合并回写的方式解决。 对于一些数据量大的表,如果需要创建分区,提高插叙过程中数据的加载速度,尽可能的只做天级分区。而对于埋点数据,这种特大的数据量的,可以采用小时分区。 对于一些周期快照和累计快照的表,我们尽可能只创建日分区。
对Hive数据进行压缩
set hive.exec.compress.output=true;
set parquet.compression=snappy;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.mapredfiles=true
set hive.optiming.sort.dynamic.partition = true;
--256M
set parquet.blocksize= 268435456;
--256M
set dfs.block.size=268435456;
--128M
set hive.merge.smallfiles.avgsize=134217728;
--256M
set hive.merge.size.per.task = 268435456;
使用sparkstreaming时,如果实时计算结果要写入到HDFS,那么不可避免的会遇到一个问题,那就是在默认情况下会产生非常多的小文件,这是由sparkstreaming的微批处理模式和DStream(RDD)的分布式(partition)特性导致的,sparkstreaming为每个partition启动一个独立的线程来处理数据,一旦文件输出到HDFS,那么这个文件流就关闭了,再来一个batch的parttition任务,就再使用一个新的文件流。
增加batch大小
coalesce和repartition
SparkStreaming外部来处理
自己调用foreach去append
关于Spark SQL小文件问题产生原因分析以及处理方案,建议阅读:《Spark SQL 小文件问题处理》。
https://blog.csdn.net/weixin_43228814/article/details/88883310
https://blog.csdn.net/liuaigui/article/details/9981135
https://blog.csdn.net/xuehuagongzi000/article/details/105978128/
https://blog.csdn.net/LINBE_blazers/article/details/82861981https://zhuanlan.zhihu.com/p/87925958
https://www.lmlphp.com/user/1210/article/item/17926/
推荐文章:
【大数据学习与分享】技术干货合集