查看原文
其他

海量小文件问题综述和解决攻略

大数据学习与分享 大数据学习与分享 2022-07-29

在当今互联网、物联网、云计算、大数据等高速发展的大背景下,数据呈现出几何式增长。这些数据不仅需要巨量的存储空间,而且数据类型繁多、数据大小变化大、流动快等特点,往往产生数亿级的海量小文件。由于在元数据管理、存储效率、访问的性能等方面面临巨大的挑战,因此海量小文件(LSOF,lots of small files)问题是工业界和学术界公认的难题。

本文汇总之前文章以及参考网上关于海量小文件问题的论述和常见系统的解决方案,阐述在大数据系统中对于LSOF的系统性解决方案,以及针对目前大数据领域常用的技术框架面临小文件问题时的原因探讨和解决方法。


小文件问题概述


衡量存储系统性能主要有两个关键指标,即IOPS和数据吞吐量。

IOPS (Input/Output Per Second) 即每秒的输入输出量 (或读写次数) ,是衡量存储系统性能的主要指标之一。IOPS是指单位时间内系统能处理的I/O请求数量,一般以每秒处理的I/O请求数量为单位,I/O请求通常为读或写数据操作请求。随机读写频繁的应用,如OLTP(OnlineTransaction Processing),IOPS是关键衡量指标。

另一个重要指标是数据吞吐量(Throughput),指单位时间内可以成功传输的数据数量。对于大量顺序读写的应用,如VOD(VideoOn Demand),则更关注吞吐量指标。

我们的存储磁盘最适合顺序的大文件I/O读写模式,非常不适合随机的小文件I/O读写模式,这是磁盘文件系统在海量小文件应用下性能表现不佳的根本原因。磁盘文件系统的设计大多都侧重于大文件,包括元数据管理、数据布局和I/O访问流程,另外VFS系统调用机制也非常不利于海量小文件,这些软件层面的机制和实现加剧了小文件读写的性能问题。

对于LOSF而言,IOPS/OPS是关键性能衡量指标,造成性能和存储效率低下的主要原因包括元数据管理、数据布局和I/O管理、Cache管理、网络开销等方面。从理论分析以及上面LOSF优化实践来看,优化应该从元数据管理、缓存机制、合并小文件等方面展开,而且优化是一个系统工程,结合硬件、软件,从多个层面同时着手,优化效果会更显著。

小文件过多引起的主要问题

(1)元数据管理低效
由于小文件数据内容较少,因此元数据的访问性能对小文件访问性能影响巨大。当前主流的磁盘文件系统基本都是面向大文件高聚合带宽设计的,而不是小文件的低延迟访问。
磁盘文件系统中,目录项(dentry)、索引节点(inode)和数据(data)保存在存储介质的不同位置上。因此,访问一个文件需要经历至少3次独立的访问。这样,并发的小文件访问就转变成了大量的随机访问,而这种访问对于广泛使用的磁盘来说是非常低效的。
同时,文件系统通常采用Hash树、B+树或B*树来组织和索引目录,这种方法不能在数以亿计的大目录中很好的扩展,海量目录下检索效率会明显下降。正是由于单个目录元数据组织能力的低效,文件系统使用者通常被鼓励把文件分散在多层次的目录中以提高性能。然而,这种方法会进一步加大路径查询的开销。
(2)数据布局低效
磁盘文件系统使用块来组织磁盘数据,并在inode中使用多级指针或hash树来索引文件数据块。数据块通常比较小,一般为1KB、2KB或4KB。当文件需要存储数据时,文件系统根据预定的策略分配数据块,分配策略会综合考虑数据局部性、存储空间利用效率等因素,通常会优先考虑大文件I/O带宽。
对于大文件,数据块会尽量进行连续分配,具有比较好的空间局部性。
对于小文件,尤其是大文件和小文件混合存储或者经过大量删除和修改后,数据块分配的随机性会进一步加剧,数据块可能零散分布在磁盘上的不同位置,并且会造成大量的磁盘碎片(包括内部碎片和外部碎片),不仅造成访问性能下降,还导致大量磁盘空间浪费。
对于特别小的小文件,比如小于4KB,inode与数据分开存储,这种数据布局也没有充分利用空间局部性,导致随机I/O访问,目前已经有文件系统实现了data in inode。
(3)I/O访问流程复杂
Linux等操作系统采用VFS或类似机制来抽象文件系统的实现,提供标准统一访问接口和流程,它提供通用的Cache机制,处理文件系统相关的所有系统调用,与具体文件系统和其他内核组件(如内存管理)交互。VFS可以屏蔽底层文件系统实现细节,简化文件系统设计,实现对不同文件系统支持的扩展。
VFS通用模型中有涉及四种数据类型:超级块对象(superblock object)、索引结点对象(inode object)、文件对象(file object)和目录项对象(dentry object),进程在进行I/O访问过程中需要频繁与它们交互(如下图所示)。

对于小文件的I/O访问过程,读写数据量比较小,这些流程太过复杂,系统调用开销太大,尤其是其中的open()操作占用了大部分的操作时间。当面对海量小文件并发访问,读写之前的准备工作占用了绝大部分系统时间,有效磁盘服务时间非常低,从而导致小I/O性能极度低下。
对于大多数分布式文件系统而言,通常将元数据与数据两者独立开来,即控制流与数据流进行分离,从而获得更高的系统扩展性和I/O并发性。数据和I/O访问负载被分散到多个物理独立的存储节点,从而实现系统的高扩展性和高性能,每个节点使用磁盘文件系统管理数据,比如XFS、EXT4、XFS等。
因此,相对于磁盘文件系统而言,每个节点的小文件问题是相同的。由于分布式的架构,分布式文件系统中的网络通信、元数据服务MDC、Cache管理、数据布局和I/O访问模式等都会对IOPS/OPS性能产生影响,进一步加剧小文件问题。

小文件合并

小文件合并存储是目前优化LOSF问题最为成功的策略,已经被包括Facebook Haystack和淘宝TFS在内多个分布式存储系统采用。它通过多个逻辑文件共享同一个物理文件,将多个小文件合并存储到一个大文件中,实现高效的小文件存储。为什么这种策略对LOSF效果显著呢?
首先减少了大量元数据,提高了元数据的检索和查询效率,降低了文件读写的 I/O 操作延时。其次将可能连续访问的小文件一同合并存储,增加了文件之间的局部性,将原本小文件间的随机访问变为了顺序访问,大大提高了性能。同时,合并存储能够有效的减少小文件存储时所产生的磁盘碎片问题,提高了磁盘的利用率。最后,合并之后小文件的访问流程也有了很大的变化,由原来许多的open操作转变为了seek操作,定位到大文件具体的位置即可。
大文件加上索引文件,小文件合并存储实际上相当于一个微型文件系统。这种机制对于WORM(Write Once Read Many)模式的分布式存储系统非常适合,而不适合允许改写和删除的存储系统。因为文件改写和删除操作,会造成大文件内部的碎片空洞,如果进行空间管理并在合适时候执行碎片整理,实现比较复杂而且产生额外开销。如果不对碎片进行处理,采用追加写的方式,一方面会浪费存储容量,另一方面又会破坏数据局部性,增加数据分布的随机性,导致读性能下降。此外,如果支持随机读写,大小文件如何统一处理,小文件增长成大文件,大文件退化为小文件,这些问题都是在实际处理时面临的挑战。

Hadoop小文件合并策略和方式

Hadoop中的小文件一般是指明显小于HDFS的block size(默认128M,一般整数倍配置如256M)的文件。但需要注意,HDFS上的有些小文件是不可避免的,比如jar、临时缓存文件等。但当小文件数量变的"海量",以至于Hadoop集群中存储了大量的小文件,就需要对小文件进行处理,而处理的目标是让文件大小尽可能接近HDFS的block size大小或者整数倍。

    

Hadoop小文件带来的问题

1)众所周知,在HDFS中数据和元数据分别由DataNode和NameNode负责,这些元数据每个对象一般占用大约150个字节。大量的小文件相对于大文件会占用大量的NameNode内存。对NameNode内存管理产生巨大挑战,此外对JVM稳定性也有影响如GC。
2)当NameNode重启时,它需要将文件系统元数据从本地磁盘加载到内存中。如果NameNode的元数据很大,重启速度会非常慢。
3)一般来说,NameNode会不断跟踪并检查集群中每个block块的存储位置。这是通过DataNode的定时心跳上报其数据块来实现的。数据节点需要上报的block越多,则也会消耗越多的网络带宽/时延。
4)更多的文件意味着更多的读取请求需要请求NameNode,这可能最终会堵塞NameNode的容量,增加RPC队列和处理延迟,进而导致性能和响应能力下降。
5)对计算引擎如Spark、MapReduce性能造成负面影响。以MapReduce(以下简称MR)为例,大量小文件意味着大量的磁盘IO,磁盘IO通常是MR性能的最大瓶颈之一,在HDFS中对于相同数量的数据,一次大的顺序读取往往优于几次随机读取的性能。如果可以将数据存储在较少,而更大的一些block中,可以降低磁盘IO的性能影响。除了磁盘IO,还有内部任务的划分、资源分配等,建议阅读:《详解MapReduce》

Hadoop小文件是怎么来的

一个Hadoop集群中存在小文件的可能原因如下:

1.流式任务(如spark streaming/flink等实时计算框架)
在做数据处理时,无论是纯实时还是基于batch的准实时,在小的时间窗口内都可能产生大量的小文件。此外对于Spark任务如果过度并行化,每个分区一个文件,产生的文件也可能会增多
    
2.Hive分区表的过度分区
这里的过度分区是指Hive分区表的每个分区数据量很小(比如小于HDFS block size)的Hive表。那么Hive Metastore Server调用开销会随着表拥有的分区数量而增加,影响性能。此时,要衡量数据量重新进行表结构设计(如减少分区粒度)。
3.数据源有大量小文件,未做处理直接迁移到Hadoop集群。
4.对于计算引擎处理任务,以MR为例。
大量的map和reduce task存在。在HDFS上生成的文件基本上与map数量(对于Map-Only作业)或reduce数量(对于MR作业)成正比。此外,MR任务如果未设置合理的reduce数或者未做限制,每个reduce都会生成一个独立的文件。对于数据倾斜,导致大部分的数据都shuffle到一个或几个reduce,然后其他的reduce都会处理较小的数据量并输出小文件。
对于Spark任务,过度并行化也是导致小文件过多的原因之一。
在Spark作业中,根据写任务中提到的分区数量,每个分区会写一个新文件。这类似于MapReduce框架中的每个reduce任务都会创建一个新文件。Spark分区越多,写入的文件就越多。控制分区的数量来减少小文件的生成。

Hadoop小文件的发现

NameNode存储了所有与文件相关的元数据,所以它将整个命名空间保存在内存中,而fsimage是NameNode的本地本机文件系统中的持久化记录。因此,我们可以通过分析fsimage来找出文件的元信息。fsimage中可用的字段有:
Path, Replication, ModificationTime, AccessTime, PreferredBlockSize, BlocksCount, FileSize, NSQUOTA, DSQUOTA, Permission, UserName, GroupName
通常可以采用以下方法来解析fsimage,拷贝Namenode数据目录下的fsimage文件到其他目录,然后执行:
hdfs oiv -p Delimited -delimiter "|" -t /tmp/tmpdir/ -i fsimage_copy_file -o fsimage_deal.out

关于hdfs oiv命令的使用,可以查看useage。
Usage: bin/hdfs oiv [OPTIONS] -i INPUTFILE -o OUTPUTFILEOffline Image ViewerView a Hadoop fsimage INPUTFILE using the specified PROCESSOR,saving the results in OUTPUTFILE.
The oiv utility will attempt to parse correctly formed image filesand will abort fail with mal-formed image files.
The tool works offline and does not require a running cluster inorder to process an image file.
The following image processors are available: * XML: This processor creates an XML document with all elements of the fsimage enumerated, suitable for further analysis by XML tools. * FileDistribution: This processor analyzes the file size distribution in the image. -maxSize specifies the range [0, maxSize] of file sizes to be analyzed (128GB by default). -step defines the granularity of the distribution. (2MB by default) * Web: Run a viewer to expose read-only WebHDFS API. -addr specifies the address to listen. (localhost:5978 by default) * Delimited (experimental): Generate a text file with all of the elements commonto both inodes and inodes-under-construction, separated by a delimiter. The default delimiter is \t, though this may bechanged via the -delimiter argument.
Required command line arguments:-i,--inputFile <arg> FSImage file to process.
Optional command line arguments:-o,--outputFile <arg> Name of output file. If the specifiedfile exists, it will be overwritten. (output to stdout by default)-p,--processor <arg> Select which type of processor to apply against image file. (XML|FileDistribution|Web|Delimited) (Web by default)-delimiter <arg> Delimiting string to use with Delimited processor.-t,--temp <arg> Use temporary dir to cache intermediate result to generateDelimited outputs. If not set, Delimited processor constructs the namespace in memory before outputting text.-h,--help Display usage information and exit

另一种方法是使用fsck命令扫描当前的HDFS目录并保存扫描后的信息。但是不建议在生产环境使用fsck命令,因为它会带来额外的开销,可能影响集群的稳定性。

 解决NameNode的内存问题  


上面的内容提到过每个block的元数据都需要加载到NameNode的内存中,这导致一个Hadoop集群在NameNode中存储的对象是有上限的,并且对象太多会带来启动时间较长以及网络延迟的问题。常见的有两种解决方案,减少集群的NameNode中的对象数量,或者以某种方式让NameNode使用更多的"内存"但不会导致较长的启动时间,这就是Hadoop Archive(HAR)文件和NameNode联邦。
关于NameNode,建议阅读:《必须掌握的分布式文件存储系统—HDFS》《关于HDFS应知应会的N个问题》

 Hadoop Archive Files 



Hadoop archive files通过将许多小文件打包到更大的HAR文件中来缓解NameNode内存问题,类似于Linux上的TAR文件。这样可以让NameNode只处理单个HAR文件,而不是数十个或数百个小文件。可以使用har://前缀而不是hdfs://来访问HAR文件中的文件。HAR文件是基于HDFS中已有的文件创建的。因此,HAR文件不仅可以合并从数据源抽取到HDFS中的数据,也可以合并通过正常的MR处理创建的数据。HAR文件可以独立的用于解决小文件问题,除了HDFS没有其他的依赖。
虽然HAR文件减少了NameNode中小文件对内存的占用,但访问HAR文件内容性能可能会更低。HAR文件仍然随机存储在磁盘上,并且读取HAR内的文件需要访问两个索引 - 一个用于NameNode找到HAR文件本身,一个用于在HAR文件内找到小文件的位置。在HAR中读取文件实际上可能比读取存储在HDFS上的相同文件慢。MapReduce作业的性能同样会受到影响,因为它仍旧会为每个HAR文件中的每个文件启动一个map任务。
所以这里我们需要有一个权衡,HAR文件可以解决NameNode内存问题,但同时会降低读取性能。如果你的小文件主要用于存档,并且不经常访问,那么HAR文件是一个很好的解决方案。如果小文件经常要被读取或者处理,那么可能需要重新考虑解决方案。

 NameNode联邦 



NameNode联邦允许你在一个集群中拥有多个NameNode,每个NameNode都存储元数据对象的子集。这样可以让所有的元数据对象都不止存储在单个机器上,也消除了单个节点的内存限制,因为你可以扩容。这听上去是一个很美丽的方案,但其实它也有局限性。
NameNode联邦隔离了元数据对象 - 仅仅只有某一个NameNode知道某一个特定的元数据对象在哪里,意思就是说如果你想找到某个文件,你必须知道它是保存在哪个NameNode上的。如果你的集群中有多个租户和/或隔离的应用程序,那使用NameNode联邦是挺不错的,你可以通过租户或者应用程序来隔离元数据对象。但是,如果要在所有的应用程序之间共享数据,则该方法其实也并不是完美的。
由于NameNode联邦并不会改变集群中对象或者块的数量,所以它并没有解决MapReduce的性能问题。相反,联邦会增加Hadoop集群安装和维护的复杂度。所以我们说联邦可以解决小文件问题,倒不如说它提供了一种办法让你“隐藏”小文件。

 解决MapReduce性能问题 


根据之前讨论的内容,MR性能问题主要是由随机磁盘IO和启动/管理太多的map任务组合引起的。解决方案似乎很明显 - 合并小文件,然而这个事往往说起来容易做起来难。以下讨论一下几种解决方案:
注:虽然为解决MR的性能问题,但其实同样也是为了解决NameNode的压力,以及解决其他计算引擎比如Impala/Spark的性能问题。
  • 修改数据抽取方法/间隔
解决小文件问题的最简单方法就是在生成阶段就避免小文件的产生。如果是由数据源产生大量小文件并直接拷贝到Hadoop,可以调研了解数据源是否能生成一些大文件,或者从数据源到HDFS的数据抽取过程中进行数据处理合并小文件。如果每小时只抽取10MB的数据,考虑是否改为每天一次,这样创建1个240MB的文件而不是24个10MB的文件。但是,你可能无法控制数据源的改动配合或业务对数据抽取间隔的需求,这样小文件问题无法避免,这时可能需要考虑其他的解决方案。
  • 批量文件合并
当产生小文件是不可避免时,文件合并是常见的解决方案。使用这种方法,你可以定期运行一个MR任务,读取某一个文件夹中的所有小文件,并将它们重写为较少数量的大文件。比如一个文件夹中有1000个文件,你可以在一个MR任务中指定reduce的数量为5,这样1000个输入文件会被合并为5个文件。随后进行一些简单的HDFS文件/文件夹操作(将新文件覆盖回原目录),则可以将NameNode的内存使用减少到200分之1,并且可以提高以后MR或其他计算引擎对同一数据处理的性能。
举例如果使用Pig,只需要2行包括load和store语句即可以实现。比如合并文本文件:
A = load '/data/inputDir' using PigStroage();store A into '/data/inputDir' using PigStroage();

在Hive或MR中实现同样比较容易。这些MR任务运行同样需要集群资源,所以建议调度在生产系统非繁忙时间段执行。但是,应该定期执行这种合并的MR作业,因为小文件随时或者几乎每天都可能产生。但这个合并程序需要有额外的逻辑来判断存在大量小文件的目录,或者你自己是知道哪些目录是存在大量小文件的。因为假如某个目录只有3个文件,运行合并作业远不如合并一个500个文件的文件夹的性能优势提升明显。
检查所有文件夹并确认哪些文件夹中的小文件需要合并,目前主要是通过自定义的脚本或程序,当然一些商业工具也能做,比如Pentaho可以迭代HDFS中的一组文件夹,找到最小合并要求的文件夹。这里还推荐另外一个开源工具File Crush。
https://github.com/edwardcapriolo/filecrush/
File Crush没有专业支持,所以无法保证它可以与Hadoop的后续版本一直保持兼容。
批量合并文件的方法无法保留原始文件名,如果原始文件名对于你了解数据来源非常重要,则批量合并文件的方法也不适用。但一般来说,我们一般只会设计HDFS的各级目录的文件名,而不会细化到每个文件的名字,所以理论来说这种方法问题也不大。
  • Sequence文件
SequenceFile是Hadoop API提供的一种二进制文件,它将数据以<key,value>的形式序列化到文件中,这种二进制文件内部使用Hadoop的标准Writable接口实现序列化和反序列化。有如下特点:
1.基于行存储。它与Hadoop API中的MapFile是互相兼容的。Hive中的SequenceFile继承自Hadoop API的SequenceFile,不过它的key为空,使用value存放实际的值,这样是为了避免MR在运行map阶段的排序过程
2.支持三种压缩类型:None、Record、Block。默认采用Record,但是Record压缩率低;一般建议使用Block压缩
3.优势是文件和Hadoop API的MapFile是相互兼容的

当需要维护原始文件名时,常见的方法是使用sequence文件。在此解决方案中,文件名作为key保存在sequence文件中,然后文件内容会作为value保存。下图给出将一些小文件存储为sequence文件的示例:
------------------------------------------------------------------------------------------------------------| Key | Value | Key | Value | Key | Value------------------------------------------------------------------------------------------------------------| file1.txt | file1 contents| file2.txt |file2 contents | fileN.txt | fileN contents------------------------------------------------------------------------------------------------------------
如果一个sequence文件包含10000个小文件,则同时会包含10000个key在一个文件中。sequence文件支持块压缩,并且是可被拆分的。这样MR作业在处理这个sequence文件时,只需要为每个128MB的block启动一个map任务,而不是每个小文件启动一个map任务。当你在同时抽取数百个或者数千个小文件,并且需要保留原始文件名时,这是非常不错的方案。
但是,如果你一次仅抽取少量的小文件到HDFS,则sequence文件的方法也不太可行,因为sequence文件是不可变的,无法追加。比如3个10MB文件将产生1个30MB的Sequence文件,根据本文前面的定义,这仍然是一个小文件。另外一个问题是如果需要检索sequence文件中的文件名列表则需要遍历整个文件。
另外一个问题是Hive并不能较好的处理由该方法合并出来的sequence文件。Hive将value中的所有数据视为单行。这样会导致Hive查看这些数据不方便,因为以前小文件中的一行的所有数据也是Hive中的单行,即相当于只有一个字段。同时,Hive没办法访问这种sequence的key,即文件名(可以自定义Hive serde来解决)
  • HBase

解决小文件问题,除了HDFS存储外,当然还可以考虑HBase列式存储。使用HBase可以将数据抽取过程从生成大量小HDFS文件更改为以逐条记录写入到HBase表。如果你对数据访问的需求主要是随机查找或者叫点查,则HBase是最好的选择。HBase在架构上就是为快速插入,存储大量数据,单个记录的快速查找以及流式数据处理而设计的。但如果你对数据访问的需求主要是全表扫描,则HBase不是最适合的。
可以基于HBase的表的数据创建Hive表,但是查询这种Hive表对于不同的查询类型性能会不一样。当查询单行或者范围查找时,Hive on HBase会表现不错,但是如果是全表扫描则效率比较低下,大多数分析查询比如带group by的语句都是全表扫描。
使用HBase,可以较好的应对实时数据写入以及实时查询的场景。但是如何分配和平衡HBase与集群上其他的组件的资源使用,以及HBase本身运维都会带来额外的运维管理成本。另外,HBase的性能主要取决于你的数据访问方式,所以在选择HBase解决小文件问题之前,应该进行仔细调研和设计。
关于HBase,建议阅读:《深入探讨HBASE》《HBase高级特性、rowkey设计以及热点问题处理》
  • 使用CombineFileInputFormat
CombineFileInputFormat是Hadoop提供的抽象类,它在MR读取时合并小文件。合并的文件不会持久化到磁盘,它是在一个map任务中合并读取到的这些小文件。好处是MR可以不用为每个小文件启动一个map任务,而且因为是自带的实现类,你不用额外将小文件先提前合并。这解决了MR作业启动太多map任务的问题,但是因为作业仍然在读取多个小文件,随机磁盘IO依旧是一个问题。另外,CombineFileInputFormat大多数情况下都不会考虑data locality,往往会通过网络从其他节点拉取数据。
为了实现这个,需要为不同的文件类型编写Java代码扩展CombineFileInputFormat类。这样实现一个自定义的类后,就可以配置最大的split大小,然后单个map任务会读取小文件并进行合并直到满足这个大小。以下有一个示例参考:
http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/
当然如果是Hive作业有简单的方式,直接配置以下参数即可:
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormatset mapreduce.input.fileinputformat.split.maxsize=1073741824set mapreduce.input.fileinputformat.split.minsize=1073741824
以上是以Hive的单个map作业合并小文件到1GB为示例。
注意以上无论是MR代码实现方式还是Hive,因为合并的文件并不会持久化保存到磁盘,因此CombineFileInputFormat方式并不会缓解NameNode内存管理问题。只是提高MR或者Hive作业的性能。
  • 通过Hive合并小文件
如果你在使用Hive时因为"create table as"或"insert overwrite"语句输出了小文件,你可以通过设置一些参数来缓解。通过设置这些参数。Hive会在本身的SQL作业执行完毕后会单独起一个MR任务来合并输出的小文件。
注意这个设置仅对Hive创建的文件生效,比如你使用Sqoop导数到Hive表,或者直接抽数到HDFS等,该方法都不会起作用。涉及的配置参数如下:
------------------------------------------------------------------------------------------------- Property | Description | Default Value-------------------------------------------------------------------------------------------------hive.merge.mapfiles | Merge small files that are produced from | true | map-only jobs | -------------------------------------------------------------------------------------------------hive.merge.mapredfiles | Merge small files that are produced from | false | map-reduce jobs | -------------------------------------------------------------------------------------------------hive.merge.size.per.task | When merging small files the target size for | 256000000 | the merge files at the end of the job | (in bytes)-------------------------------------------------------------------------------------------------hive.merge.smallfiles.avgsize | When the average size of the output files | 16000000 | is less than this number,Hive will execute an | (in bytes) | additional MR job to merge the files based on | | hive.merge.mapfiles and hive.merge.mapredfiles| -------------------------------------------------------------------------------------------------

  • 使用Hadoop的追加特性
有些人可能会问,为什么不使用Hadoop自带的Append特性来解决小文件问题,即当第一次输出是小文件时,后面的输出可以继续追加这些小文件,让小文件变成大文件,这听上去是个不错的建议,但其实做起来挺难的,因为Hadoop生态系统里的工具都不支持包括Flume,Sqoop,Pig,Hive,Spark,Impala和MR。比如MR任务有一个规定,输出结果目录必须是在之前不存在的。所以MR作业肯定无法使用Append特性,由于Sqoop,Pig和Hive都使用了MR,所以这些工具也不支持Append。Flume不支持Append主要是因为它假设经过一段时间比如几秒,多少字节,多少事件数或者不活动的秒数,Flume就会关闭文件而不再打开它。
如果你想使用Append来解决小文件问题,则你需要自己编写特定的程序来追加到现有的文件。另外,当集群中其他应用程序如果正在读取或处理这些需要追加的文件,你就不能使用自定义的MR或者Spark程序来追加这些文件了。所以如果要使用这种方法,你最好还是谨慎考虑。
选择何种办法来解决小文件问题取决于各个方面,主要来自数据访问方式以及存储要求,具体包括:
1.小文件是在整个数据pipeline的哪个部分生成的?我们是要在抽数之前处理还是抽取到集群后处理?
2.是什么工具生成的小文件?可以通过调整工具的配置来减少小文件的数量吗?
3.企业的大数据团队的技能水平怎么样?他们有能力编写一些自定义程序来处理小文件或者抽数逻辑吗?他们未来有能力维护吗?
4.小文件生成的频率是多少?为了生成大文件,需要多久合并一次小文件?
5.什么工具会访问这些小文件?比如Hive,Impala,Spark或者其他程序?
6.对于一个生产集群来说的话,存在哪些时间窗口,集群有空余的资源来运行合并小文件的程序?
7.计算引擎访问数据时能接受怎样的延迟?这涉及我们考虑如何合并小文件,包括大小,压缩格式等。
此外,我们还可以从文件的压缩方面入手。关于Hadoop支持的压缩算法等,建议阅读:《Hadoop支持的压缩格式对比和应用场景以及Hadoop native库》

海量小文件及Delta的处理

 海量小文件根源 


小文件的问题其实以前也一直困扰着我,对于传统数仓,导致小文件多的原因非常多:
1.分区粒度,如果你分区非常多,就会导致更多的文件数产生
2.很多流式程序是只增操作,每个周期都会产生N个文件,常年累月,积石成山。
3.以前为了解决更新问题,经常一份数据会有中间好几个存储状态,也会导致文件数很多。
为了解决小文件问题,我们也是八仙过海各显神通,一般而言可能都是写个MR/Spark程序读取特定目录的数据,然后将数据重新生成N个文件。但是在以前,这种模式会有比较致命的问题:
因为在生成的新文件要替换原来的文件,而替换的过程不是原子过程,所以这个时候如果正好发生读,是会影响的。
其次,很多读的程序,都会缓存文件路径,因为我们重新生成了文件,文件名称也变化了,导致读的程序的缓存失效,会发生比如文件找不到等异常。对于在一个进程比较好说,做下刷新就行,但是读往往是在不同的进程实例里,这个时候通知他们也是很难的事情。再极端一点,读取这个表的程序可能是另外一个团队维护的。所以其实小文件并没有想象的那么好解决,或者说能够优雅的解决。

 为什么海量小文件是问题 


前面,我们谈到了小文件的根源。那么文件多就多了,为什么是个问题呢?核心原因在于HDFS的设计问题,他需要把文件meta信息缓存在内存里,这个内存只能是单机的,所以变成了一个很大的瓶颈。虽然后面HDFS一直尝试解决这个问题,比如引入联邦制等,但是也变相的引入了复杂性。

 Delta如何解决小文件 


我们知道,其实大部分存储的问题都有小文件的多的问题,比如HBase等,他们的解决方案是做compaction,本质上就是讲小文件合并成大文件。HBase还有minor compaction和 major compaction之分。截止到目前(0.4.0版本),Delta还没有提供类似的compaction功能,但是基于Delta已经提供的扩展接口,我们也可以很轻易的实现compaction的功能。Compaction的核心点是,在做compaction的过程不能影响读写,而Delta的版本设计可以很简单的做到这一点。
我在Delta Plus里实现了一个compaction的版本。但是目前这个版本也有点限制,就是能够被compact的delta表不能包含update/delete操作。那为什么不能包含upsert操作呢?原因是compaction也是一个非常重的操作,持续的时间可能非常长,并且他是依赖于他开始那一瞬间读到的数据的。如果发生了upsert操作,意味着他读到的数据可能已经失效了,这个时候它会失败需要吃重新读,重新合并,重新写,而这个过程很长,可能它再次重试的时候,又有数据进行了upsert,那么可怜的它似乎永远都不能完成自己的工作了。而假设我们只允许新增数据,那么因为以前的文件不会发生变更,所以我们可以对以前的数据做合并然后产生新的文件,标记删除以前的文件,整个过程不会阻止数据的新增和读取。
似乎是不完美,但是在前面的章节中,我们说到,upsert在发生upsert的时候会动态调整控制文件的数目,所以他相当于自动具备了自己的compaction机制。而只有append操作的表,他的文件是一个一直增长的过程,所以需要我们手动进行compaction操作。

 Delta compaction过程 


Delta的compaction因为有了上面的约束,会变得异常简单。
1.读取某个版本之前的数据
2.将涉及到标记删除的文件真实物理删除
3.将标记为add的文件按分区(如果有分区)进行合并操作产生新的文件,然后标记删除这些文件。物理删除这些文件。
4.获取事务并且尝试提交。
compaction有个特殊的设计是,他并不会在开始工作前就尝试获取事务,而是直到所有的实际工作都做完了,才最后获取事物并且进行提交。这得益于前面我们说的对应表的数据只增特性。

Hive小文件处理方案

  • 输入合并和输出合并
1.配置map输入合并
-- 每个Map最大输入大小,决定合并后的文件数set mapred.max.split.size=256000000;-- 一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并set mapred.min.split.size.per.node=100000000;-- 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并set mapred.min.split.size.per.rack=100000000;-- 执行Map前进行小文件合并set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
2.配置hive结果端合并
通过设置hive的配置项在执行结束后对结果文件进行合并:
-- 在Map-only的任务结束时合并小文件set hive.merge.mapfiles = true-- 在Map-Reduce的任务结束时合并小文件set hive.merge.mapredfiles = true-- 合并文件的大小set hive.merge.size.per.task = 256*1000*1000 -- 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件mergeset hive.merge.smallfiles.avgsize=16000000
hive在对结果文件进行合并时会执行一个额外的map-only脚本,mapper的数量是文件总大小除以size.per.task参数所得的值,触发合并的条件是:根据查询类型不同,相应的mapfiles/mapredfiles参数需要打开;结果文件的平均大小需要大于avgsize参数的值。
  • Hive使用HAR归档文件
Hadoop的归档文件格式也是解决小文件问题的方式之一。而且hive提供了原生支持:
set hive.archive.enabled=true;set hive.archive.har.parentdir.settable=true;set har.partfile.size=1099511627776;ALTER TABLE srcpart ARCHIVE PARTITION(ds= '2021-02-01', hr= '12' );ALTER TABLE srcpart UNARCHIVE PARTITION(ds= '2021-02-01', hr= '12' );
如果使用的不是分区表,则可以创建成外部表,并使用har://协议来指定路径。
  • 数据仓库Hive表分区优化
数据仓库创建数仓表时,ETL开发人员基于使用习惯和处理的方便性,经常创建多级分区存储数据。但是过多的分区会消耗NameNode大量的资源,而且也会引入小文件的问题。所以对于创建数仓表的分区,要求如下:
  1. 对于统计数据表、数据量不大的基础表、业务上无累计快照和周期性快照要求的数据表,尽可能的不创建分区,而采用数据合并回写的方式解决。
  2. 对于一些数据量大的表,如果需要创建分区,提高插叙过程中数据的加载速度,尽可能的只做天级分区。而对于埋点数据,这种特大的数据量的,可以采用小时分区。
  3. 对于一些周期快照和累计快照的表,我们尽可能只创建日分区。

  • 对Hive数据进行压缩
出于对小文件数据治理的目的,建议使用非TexFile的序列化存储方式存储数据。并且如果一张Hive表存在大量的小文件,建议通过以下参数设置压缩:
set hive.exec.compress.output=true;set parquet.compression=snappy;set hive.merge.mapfiles=true;set hive.merge.mapredfiles=true; set hive.merge.mapredfiles=trueset hive.optiming.sort.dynamic.partition = true;--256Mset parquet.blocksize= 268435456;--256Mset dfs.block.size=268435456; --128Mset hive.merge.smallfiles.avgsize=134217728; --256Mset hive.merge.size.per.task = 268435456; 

Spark小文件问题产生原因分析及处理方案

 Spark流,如SparkStreaming


使用sparkstreaming时,如果实时计算结果要写入到HDFS,那么不可避免的会遇到一个问题,那就是在默认情况下会产生非常多的小文件,这是由sparkstreaming的微批处理模式和DStream(RDD)的分布式(partition)特性导致的,sparkstreaming为每个partition启动一个独立的线程来处理数据,一旦文件输出到HDFS,那么这个文件流就关闭了,再来一个batch的parttition任务,就再使用一个新的文件流。
那么假设,一个batch为10s,每个输出的DStream有32个partition,那么一个小时产生的文件数将会达到(3600/10) * 32=11520个之多。众多小文件带来的结果是有大量的文件元信息,比如文件的location、文件大小、block number等需要NameNode来维护,NameNode压力会非常大。
不管是什么格式的文件,parquet、text、JSON或者Avro,都会遇到这种小文件问题,这里讨论几种处理sparkstreaming小文件的典型方法。
  • 增加batch大小
这种方法很容易理解,batch越大,从外部接收的event就越多,内存积累的数据也就越多,那么输出的文件数也就回变少,比如将上面例子中的batch时间从10s增加为100s,那么一个小时的文件数量就会减少到1152个。但是此时延迟会比较大,不适合实时性要求高的场景。
  • coalesce和repartition
小文件的基数是:batch_number * partition_number,而第一种方法是减少batch_number,那么这种方法就是减少partition_number了,这个api不细说,就是减少初始的分区个数。看过spark源码的童鞋都知道,对于窄依赖,一个子rdd的partition规则继承父rdd,对于宽依赖(就是那些个叉叉叉ByKey操作),如果没有特殊指定分区个数,也继承自父rdd。那么初始的SourceDstream是几个partiion,最终的输出就是几个partition。所以coalesce大法的好处就是,可以在最终要输出的时候,来减少一把partition个数。但是这个方法的缺点也很明显,本来是32个线程在写256M数据,现在可能变成了4个线程在写256M数据,而没有写完成这256M数据,这个batch是不算做结束的。那么一个batch的处理时延必定增长,batch挤压会逐渐增大。
关于coalesce和repartition的区别以及使用场景,建议阅读:《重要 | Spark分区并行度决定机制》
  • SparkStreaming外部来处理
在sparkstreaming外再启动定时的批处理任务来合并sparkstreaming产生的小文件。需要注意合并任务的时间划分,避免合并正在写入的sparkstreaming文件。
  • 自己调用foreach去append
sparkstreaming提供的foreach这个outout类api,可以让我们自定义输出计算结果的方法。那么我们其实也可以利用这个特性,那就是每个batch在要写文件时,并不是去生成一个新的文件流,而是把之前的文件打开。考虑这种方法的可行性,首先,HDFS上的文件不支持修改,但是很多都支持追加,那么每个batch的每个partition就对应一个输出文件,每次都去追加这个partition对应的输出文件,这样也可以实现减少文件数量的目的。这种方法要注意的就是不能无限制的追加,当判断一个文件已经达到某一个阈值时,就要产生一个新的文件进行追加了。

 Spark SQL 


关于Spark SQL小文件问题产生原因分析以及处理方案,建议阅读
:《Spark SQL 小文件问题处理》。
参考目录:
https://blog.csdn.net/weixin_43228814/article/details/88883310
https://blog.csdn.net/liuaigui/article/details/9981135
https://blog.csdn.net/xuehuagongzi000/article/details/105978128/
https://blog.csdn.net/LINBE_blazers/article/details/82861981
https://zhuanlan.zhihu.com/p/87925958
https://www.lmlphp.com/user/1210/article/item/17926/

推荐文章:
【大数据学习与分享】技术干货合集

关注大数据学习与分享,获取更多技术干货

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存