其他
代达罗斯之殇-大数据领域小文件问题解决攻略
:
点击上方蓝色字体,选择“设为星标”
回复”资源“获取更多惊喜
小文件问题概述
小文件合并
Hadoop小文件合并策略和方式
Hadoop 小文件是怎么来的
现在我们越来越多的将Hadoop用于(准)实时计算,在做数据抽取时处理的频率可能是每小时,每天,每周等,每次可能就只生成一个不到10MB的文件。
数据源有大量小文件,未做处理直接拷贝到Hadoop集群。
MapReduce作业的配置未设置合理的reducer或者未做限制,每个reduce都会生成一个独立的文件。另外如果数据倾斜,导致大量的数据都shuffle到一个reduce,然后其他的reduce都会处理较小的数据量并输出小文件。
解决NameNode的内存问题
Hadoop Archive Files
NameNode联邦
解决MapReduce性能问题
2.批量文件合并
3.Sequence文件
4.HBase
5.S3DistCp (如果使用Amazon EMR)
6.使用CombineFileInputFormat
7.通过Hive合并小文件
8.使用Hadoop的追加特性
修改数据抽取方法/间隔
批量文件合并
Sequence文件
HBase
S3DistCp (如果使用Amazon EMR)
使用CombineFileInputFormat
set mapreduce.input.fileinputformat.split.maxsize=1073741824
set mapreduce.input.fileinputformat.split.minsize=1073741824
通过Hive合并小文件
使用Hadoop的追加特性
Hive优化之小文件问题及其解决方案
增加batch大小
Coalesce和repartition
rdd.repartition(1)
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
分区数之间的比例不太悬殊。比如原有1000个分区,减少到200个分区,这时可以将shuffle设为false,因为子RDD中的一个分区只对应父RDD的5个分区,压力不大。分区数之间的比例悬殊。比如原有500个分区,减少到1个分区,就要将shuffle设为true,保证生成CoalescedRDD之前的操作有足够的并行度,防止Executor出现单点问题。这也就是本节开头的做法了。
coalesce(numPartitions, shuffle = true)
}
SparkStreaming外部来处理
自己调用foreach去append
Spark SQL 小文件问题产生原因分析以及处理方案
现象
2) t1的分区数是100,t2的分区数是200,union all后生成的tmp分区数是300
3) test_tab产生的小文件数基本也在300左右
select * from t1 union all select * from t2 as tmp;
insert overwrite table test_tab select * from tmp;
分析
1)union的RDD分区器已定义并且它们的分区器相同
多个父RDD具有相同的分区器,union后产生的RDD的分区器与父RDD相同且分区数也相同。比如,n个RDD的分区器相同且是defined,分区数是m个。那么这n个RDD最终union生成的一个RDD的分区数仍是m,分区器也是相同的
2)不满足第一种情况,则通过union生成的RDD的分区数为父RDD的分区数之和
通过repartition或coalesce算子控制最后的DataSet的分区数
将Hive风格的Coalesce and Repartition Hint 应用到Spark SQL需要注意这种方式对Spark的版本有要求,建议在Spark2.4.X及以上版本使用,示例:
INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...小文件定期合并 可以定时通过异步的方式针对Hive分区表的每一个分区中的小文件进行合并操作。
Flink小文件合并
自定义 PartitionCommitPolicy
import org.apache.flink.hive.shaded.parquet.example.data.Group;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.GroupReadSupport;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.flink.hive.shaded.parquet.hadoop.util.HadoopInputFile;
import org.apache.flink.hive.shaded.parquet.schema.MessageType;
import org.apache.flink.table.filesystem.PartitionCommitPolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);
@Override
public void commit(Context context) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
String partitionPath = context.partitionPath().getPath();
List<Path> files = listAllFiles(fs, new Path(partitionPath), "part-");
LOGGER.info("{} files in path {}", files.size(), partitionPath);
MessageType schema = getParquetSchema(files, conf);
if (schema == null) {
return;
}
LOGGER.info("Fetched parquet schema: {}", schema.toString());
Path result = merge(partitionPath, schema, files, fs);
LOGGER.info("Files merged into {}", result.toString());
}
private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException {
List<Path> result = new ArrayList<>();
RemoteIterator<LocatedFileStatus> dirIterator = fs.listFiles(dir, false);
while (dirIterator.hasNext()) {
LocatedFileStatus fileStatus = dirIterator.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
result.add(filePath);
}
}
return result;
}
private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException {
if (files.size() == 0) {
return null;
}
HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf);
ParquetFileReader reader = ParquetFileReader.open(inputFile);
ParquetMetadata metadata = reader.getFooter();
MessageType schema = metadata.getFileMetaData().getSchema();
reader.close();
return schema;
}
private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException {
Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet");
ParquetWriter<Group> writer = ExampleParquetWriter.builder(mergeDest)
.withType(schema)
.withConf(fs.getConf())
.withWriteMode(Mode.CREATE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
for (Path file : files) {
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withConf(fs.getConf())
.build();
Group data;
while((data = reader.read()) != null) {
writer.write(data);
}
reader.close();
}
writer.close();
for (Path file : files) {
fs.delete(file, false);
}
return mergeDest;
}
}
auto-compaction 是否自动合并
compaction.file-size: compact target file size, default is rolling-file-size 合并后文件大小
super.notifyCheckpointComplete(checkpointId);
output.collect(new StreamRecord<>(new EndCheckpoint(
checkpointId,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks())));
}
在 FileSystem/Hive connector 的流式写入中支持小文件合并 (FLINK-19345)
参考目录
版权声明:
本文为《大数据真好玩》整理,原作者独家授权。未经原作者允许转载追究侵权责任。编辑|冷眼丶微信公众号|大数据真好玩文章不错?点个【在看】吧! 👇