实操:关于Flink中文件读取,从实验中看网上的那些博客对吗?
在Flink中关于文件读取,在Flink 1.12之前或者现在用于测试情况下,一般使用老的Flink自带的读取文件的方式,整体操作比较简单,直接调用readTextFile方法即可。如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 该方法可以读取具体的文件,或者是指定路径下的所有文件,还可以从HDFS目录下读取,使用路径hdfs://...
env.readTextFile("c:/abc.txt")
.print();
env.execute;
而通过源码分析我们知道,readTextFile(filePath: String) 底层调用的是 readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo),而这个readFile最终还是要调用addSource方法。另外readTextFile方法默认是使用批处理的方式,如果要想使用流处理需要采用底层他的readFile方法。
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
public class FileReader {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String filePath = "c:/abc/";
TextInputFormat format = new TextInputFormat(new Path(filePath));
// inputFormat表示文件读取的格式,一般使用flink里的TextInputFormat,即一行数据是流中的一个元素
// filePath表示文件/文件夹的路径
// watchType表示文件处理的模式,有两个可选项:PROCESS_ONCE表示一次性读,即批;PROCESS_CONTINUOUSLY表示持续性读,即流
// interval表示流每次读取的时间间隔,默认是-1,表示批。如果设置了watchType,那么interval也要做相应的改动,单位是毫秒,最小是1
env.readFile(format, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, BasicTypeInfo.STRING_TYPE_INFO)
.print();
env.execute();
}
}
通过现象我们可以发现的是,对于文件,PROCESS_CONTINUOUSLY并不是增量的读取,而是每次文本变化之后全量的读取变化的文本,此时就不是一个extractly-once的操作。而对于目录,PROCESS_CONTINUOUSLY又是增量读取。因此在实践中,我们可以将完成的待处理文件直接拷贝到指定的目录中,那么就可以获得extractly-once的行为。
总结:对于单个文件内是全量变化,对于新增文件是增量。
关于文件读取的几个小问题:
1
关于递归读取
默认情况下是不支持递归处理的,需要设置如下配置方可。
Configuration configuration = new Configuration();
// 设置递归获取文件
configuration.setBoolean("recursive.file.enumeration", true);
其实递归读取涉及到多路径读取,通过对FileInputFormat源码分析,它不支持多路径读取,源码如下:
public void setFilePaths(Path... filePaths) {
if (!supportsMultiPaths() && filePaths.length > 1) {
throw new UnsupportedOperationException(
"Multiple paths are not supported by this FileInputFormat.");
}
if (filePaths.length < 1) {
throw new IllegalArgumentException("At least one file path must be specified.");
}
if (filePaths.length == 1) {
// set for backwards compatibility
this.filePath = filePaths[0];
} else {
// clear file path in case it had been set before
this.filePath = null;
}
this.filePaths = filePaths;
}
@Deprecated
public boolean supportsMultiPaths() {
return false;
}
但是真的是这样的吗?我们继续看FileInputFormat的各个子类,每个子类都覆写了supportsMultiPaths方法:
@Override
public boolean supportsMultiPaths() {
return true;
}
所以,在flink中式支持多路径读取的,但是需要注意的是这里的多路径指的是同一个路径下的多路径,暂时还支持多个独立的顶级目录。这个在很多博客中说的也很有问题。
2
关于文件过滤
比如在readTextFile方法中,我们可以看到关于文件的过滤:
TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
我们可以看到关于文件过滤使用的是接口:FilePathFilter,源码如下:
public abstract class FilePathFilter implements Serializable {
private static final long serialVersionUID = 1L;
public static final String HADOOP_COPYING = "_COPYING_";
// 只需要覆写此方法,就可以对指定条件的文件进行过滤,注意的是该方法返回true是过滤掉,不读取的意思
public abstract boolean filterPath(Path filePath);
public static FilePathFilter createDefaultFilter() {
return DefaultFilter.INSTANCE;
}
// 默认的过滤方案,所有以点(.)、下划线(_)开头的文件以及包含_COPYING_的文件都不会被读取到
public static class DefaultFilter extends FilePathFilter {
private static final long serialVersionUID = 1L;
static final DefaultFilter INSTANCE = new DefaultFilter();
DefaultFilter() {}
@Override
public boolean filterPath(Path filePath) {
return filePath == null
|| filePath.getName().startsWith(".")
|| filePath.getName().startsWith("_")
|| filePath.getName().contains(HADOOP_COPYING);
}
}
}
对于接口FilePathFilter还有一个子类GlobFilePathFilter,其支持指定正则的方式过滤文件,源码如下:
// *-匹配任意数量的字符,包括无字符
// **-匹配所有子目录中的任何文件
// ?-匹配任何单个字符
// [abc]-匹配括号中列出的字符之一
// [a-z]-匹配括号中给定范围内的一个字符
public class GlobFilePathFilter extends FilePathFilter {
// 比较核心的两个成员变量
// 表示要包含的文件的列表
private final List<String> includePatterns;
// 表示要排除的文件的列表,优先级高于includePatterns
private final List<String> excludePatterns;
// 构造方法,前面是包含的,后面是排除的
public GlobFilePathFilter(List<String> includePatterns, List<String> excludePatterns) {
this.includePatterns = Preconditions.checkNotNull(includePatterns);
this.excludePatterns = Preconditions.checkNotNull(excludePatterns);
}
...
}
关于以上两点的完整案例如下:
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.GlobFilePathFilter;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import java.util.Collections;
public class FileReader {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String filePath = "C:/def/4/";
TextInputFormat format = new TextInputFormat(new Path(filePath));
// 这里导包不要导错了,一定是flink中的配置类
Configuration configuration = new Configuration();
// 设置递归获取文件
configuration.setBoolean("recursive.file.enumeration", true);
// 修改配置
format.configure(configuration);
// 设置文件过滤器
GlobFilePathFilter filesFilter = new GlobFilePathFilter(
Collections.singletonList("**"),
// 这里要写绝对路径,优先级高于上述
Collections.singletonList("C:/def/4/5/5.txt")
);
// 设置过滤器
//format.setFilesFilter(FilePathFilter.createDefaultFilter());
format.setFilesFilter(filesFilter);
env.readFile(format, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, BasicTypeInfo.STRING_TYPE_INFO)
.print();
env.execute();
}
}
3
关于 FileInputFormat
FileInputFormat是Flink中的FileInputFormat,并不是hadoop中的,包名是:org.apache.flink.api.common.io,它是flink中处理文件的顶级接口,接口继承如下:
可以看出,大体可以分为两类:读二进制文件(BinaryInputFormat)、读指定分隔符的文件(DelimitedInputFormat)。在实际的工作中后者使用的最多,后者默认按照行分隔符("\n")进行分割,但是也是可以指定行分隔符。
// 可以指定字符串作为行分隔符
public void setDelimiter(String delimiter) {
if (delimiter == null) {
throw new IllegalArgumentException("Delimiter must not be null");
}
this.delimiter = delimiter.getBytes(getCharset());
this.delimiterString = delimiter;
}
// 可以指定字符作为行分隔符
public void setDelimiter(char delimiter) {
setDelimiter(String.valueOf(delimiter));
}
// 最底层的调用
public void setDelimiter(byte[] delimiter) {
if (delimiter == null) {
throw new IllegalArgumentException("Delimiter must not be null");
}
this.delimiter = delimiter;
this.delimiterString = null;
}
而DelimitedInputFormat又有两个重要的子类:TextInputFormat,单纯的读一行文本,不做任何的改变,即原样输出;GenericCsvInputFormat,用来读取csv文件,但是我们知道所谓的csv文件就是以逗号(,)作为一行的字段分隔符的文本文件,因此该方法支持修改字段分隔符。
public void setFieldDelimiter(String delimiter) {
if (delimiter == null) {
throw new IllegalArgumentException("Delimiter must not be null");
}
this.fieldDelim = delimiter.getBytes(getCharset());
this.fieldDelimString = delimiter;
}
GenericCsvInputFormat又有三个重要的子类,分别是:
RowCsvInputFormat:将csv文件转化为flink内部的Row类型
PojoCsvInputFormat:将csv文件转化为java中的POJO类型
TupleCsvInputFormat:将csv文件转化为指定元组数的Tuple类型
我们这里看一下Pojo的使用方法,其他的类型使用类似,这里不再一一赘述。
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
public class FileReader {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String filePath = "C:/ghi/1.txt";
// 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo
PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class);
// 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序
String[] fieldOrder = new String[]{"userId", "behavior", "timestamp"};
// 创建 PojoCsvInputFormat
PojoCsvInputFormat format = new PojoCsvInputFormat<>(new Path(filePath), pojoType, fieldOrder);
//指定了换行符,那么在文本中就不能换行,
format.setDelimiter("@_@");
format.setFieldDelimiter("||");
// 如果文本首行是字段名称,还可以跳过首行
// format.setSkipFirstLineAsHeader(true)
env.readFile(format, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, pojoType)
.print();
env.execute();
}
// POJO类必须是public修饰,并且不能是内部类,如果是内部类需要使用static修饰
public static class UserBehavior {
// 字段必须是public修饰或者有对应的public的set/get方法
public long userId; // 用户ID
public String behavior; // 用户行为, 包括("pv", "buy", "cart", "fav")
public long timestamp; // 行为发生的时间戳,单位秒
// POJO的要求,必须要有public的无参构造
public UserBehavior() {
}
public UserBehavior(long userId, String behavior, long timestamp) {
this.userId = userId;
this.behavior = behavior;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "UserBehavior{" +
"userId=" + userId +
", behavior='" + behavior + '\'' +
", timestamp=" + timestamp +
'}';
}
}
}
如果将上述的readFile算子的并行度设置为3,可以看到一个很奇怪的现象,如下,会有两个Custom File Source。这是为什么呢?
通过源码分析我们知道,Flink 的readFile是将文件读取过程拆分为两个子任务,即目录监控和数据读取。这些子任务中的每一个都由单独的实体实现。监控由单个非并行(并行性 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度等于作业并行度。单个监控任务的作用是扫描目录(定期或仅一次,取决于watchType),找到要处理的文件,将它们拆分,并将这些拆分分配给下游reader。reader将是读取数据的最终。每个 split 只能由一个 reader 读取,而一个 reader 可以读取多个 split。
private <OUT> DataStreamSource<OUT> createFileInput(
FileInputFormat<OUT> inputFormat,
TypeInformation<OUT> typeInfo,
String sourceName,
FileProcessingMode monitoringMode,
long interval) {
// 输入分片构建的函数类
ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(
inputFormat, monitoringMode, getParallelism(), interval);
// 读取输入分片的具体实现类
ContinuousFileReaderOperatorFactory<OUT, TimestampedFileInputSplit> factory = new ContinuousFileReaderOperatorFactory<>(inputFormat);
final Boundedness boundedness = monitoringMode == FileProcessingMode.PROCESS_ONCE
? Boundedness.BOUNDED
: Boundedness.CONTINUOUS_UNBOUNDED;
// 和我们使用env.addSource一样,但是后面进跟着调用了一个transform。这里就是整个解析中要重点说明的一点,monitoringFunction中只是负责构建数据切片的,到这一步,其实这个source的并行度还是1。调用transform方法之后,将数据切片中的内容读取出来,这里的并行度才是配置文件中的并行度,我们设置的并行度也只能在这里生效
SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName, null, boundedness)
.transform("Split Reader: " + sourceName, typeInfo, factory);
return new DataStreamSource<>(source);
}
所以通过以上分析,关于网上很多博客说readTextFile底层源码并行度是1,通过源码观察我们发现里面这种表达并不完整。以下是网上博客截图:
涤生大数据往期精彩推荐
8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!
10.基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践
12.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)
13.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(二)
14.5分钟了解实时车联网,车联网(IoV)OLAP 解决方案是怎样的?
15.企业级Apache Kafka集群策略:Kakfa最佳实践总结
20.大数据实战:基于Flink+ODPS进行最近N天实时标签构建
25.玩转大厂金融风控体系建设
26.Doris凭什么这么强?