查看原文
其他

实操:关于Flink中文件读取,从实验中看网上的那些博客对吗?

涤生-健哥 涤生大数据
2024-12-05

在Flink中关于文件读取,在Flink 1.12之前或者现在用于测试情况下,一般使用老的Flink自带的读取文件的方式,整体操作比较简单,直接调用readTextFile方法即可。如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 该方法可以读取具体的文件,或者是指定路径下的所有文件,还可以从HDFS目录下读取,使用路径hdfs://...env.readTextFile("c:/abc.txt")   .print();env.execute;

而通过源码分析我们知道,readTextFile(filePath: String) 底层调用的是 readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo),而这个readFile最终还是要调用addSource方法。另外readTextFile方法默认是使用批处理的方式,如果要想使用流处理需要采用底层他的readFile方法。

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.java.io.TextInputFormat;import org.apache.flink.core.fs.Path;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.FileProcessingMode; public class FileReader {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        String filePath = "c:/abc/";        TextInputFormat format = new TextInputFormat(new Path(filePath));        // inputFormat表示文件读取的格式,一般使用flink里的TextInputFormat,即一行数据是流中的一个元素        // filePath表示文件/文件夹的路径        // watchType表示文件处理的模式,有两个可选项:PROCESS_ONCE表示一次性读,即批;PROCESS_CONTINUOUSLY表示持续性读,即流        // interval表示流每次读取的时间间隔,默认是-1,表示批。如果设置了watchType,那么interval也要做相应的改动,单位是毫秒,最小是1        env.readFile(format, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, BasicTypeInfo.STRING_TYPE_INFO)            .print();        env.execute();    }}

通过现象我们可以发现的是,对于文件,PROCESS_CONTINUOUSLY并不是增量的读取,而是每次文本变化之后全量的读取变化的文本,此时就不是一个extractly-once的操作。而对于目录,PROCESS_CONTINUOUSLY又是增量读取。因此在实践中,我们可以将完成的待处理文件直接拷贝到指定的目录中,那么就可以获得extractly-once的行为

总结:对于单个文件内是全量变化,对于新增文件是增量。

关于文件读取的几个小问题: 

1

关于递归读取


默认情况下是不支持递归处理的,需要设置如下配置方可。

Configuration configuration = new Configuration();// 设置递归获取文件configuration.setBoolean("recursive.file.enumeration", true);

其实递归读取涉及到多路径读取,通过对FileInputFormat源码分析,它不支持多路径读取,源码如下:

public void setFilePaths(Path... filePaths) {    if (!supportsMultiPaths() && filePaths.length > 1) {        throw new UnsupportedOperationException(                "Multiple paths are not supported by this FileInputFormat.");    }    if (filePaths.length < 1) {        throw new IllegalArgumentException("At least one file path must be specified.");    }    if (filePaths.length == 1) {        // set for backwards compatibility        this.filePath = filePaths[0];    } else {        // clear file path in case it had been set before        this.filePath = null;    }     this.filePaths = filePaths;} @Deprecatedpublic boolean supportsMultiPaths() {    return false;}

但是真的是这样的吗?我们继续看FileInputFormat的各个子类,每个子类都覆写了supportsMultiPaths方法:

@Overridepublic boolean supportsMultiPaths() {    return true;}

所以,在flink中式支持多路径读取的,但是需要注意的是这里的多路径指的是同一个路径下的多路径,暂时还支持多个独立的顶级目录。这个在很多博客中说的也很有问题

 

2

关于文件过滤


比如在readTextFile方法中,我们可以看到关于文件的过滤:

TextInputFormat format = new TextInputFormat(new Path(filePath));format.setFilesFilter(FilePathFilter.createDefaultFilter());

我们可以看到关于文件过滤使用的是接口:FilePathFilter,源码如下:

public abstract class FilePathFilter implements Serializable {     private static final long serialVersionUID = 1L;     public static final String HADOOP_COPYING = "_COPYING_";     // 只需要覆写此方法,就可以对指定条件的文件进行过滤,注意的是该方法返回true是过滤掉,不读取的意思    public abstract boolean filterPath(Path filePath);     public static FilePathFilter createDefaultFilter() {        return DefaultFilter.INSTANCE;    }     // 默认的过滤方案,所有以点(.)、下划线(_)开头的文件以及包含_COPYING_的文件都不会被读取到    public static class DefaultFilter extends FilePathFilter {         private static final long serialVersionUID = 1L;         static final DefaultFilter INSTANCE = new DefaultFilter();         DefaultFilter() {}         @Override        public boolean filterPath(Path filePath) {            return filePath == null                    || filePath.getName().startsWith(".")                    || filePath.getName().startsWith("_")                    || filePath.getName().contains(HADOOP_COPYING);        }    }}

对于接口FilePathFilter还有一个子类GlobFilePathFilter,其支持指定正则的方式过滤文件,源码如下:

// *-匹配任意数量的字符,包括无字符// **-匹配所有子目录中的任何文件// ?-匹配任何单个字符// [abc]-匹配括号中列出的字符之一// [a-z]-匹配括号中给定范围内的一个字符public class GlobFilePathFilter extends FilePathFilter {    // 比较核心的两个成员变量    // 表示要包含的文件的列表    private final List<String> includePatterns;     // 表示要排除的文件的列表,优先级高于includePatterns    private final List<String> excludePatterns;     // 构造方法,前面是包含的,后面是排除的    public GlobFilePathFilter(List<String> includePatterns, List<String> excludePatterns) {        this.includePatterns = Preconditions.checkNotNull(includePatterns);        this.excludePatterns = Preconditions.checkNotNull(excludePatterns);    }    ...}

关于以上两点的完整案例如下:

import org.apache.flink.api.common.io.FilePathFilter;import org.apache.flink.api.common.io.GlobFilePathFilter;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.java.io.TextInputFormat;import org.apache.flink.configuration.Configuration;import org.apache.flink.core.fs.Path;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import java.util.Collections; public class FileReader {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         String filePath = "C:/def/4/";        TextInputFormat format = new TextInputFormat(new Path(filePath));         // 这里导包不要导错了,一定是flink中的配置类        Configuration configuration = new Configuration();        // 设置递归获取文件        configuration.setBoolean("recursive.file.enumeration", true);        // 修改配置        format.configure(configuration);                 // 设置文件过滤器        GlobFilePathFilter filesFilter = new GlobFilePathFilter(                Collections.singletonList("**"),                // 这里要写绝对路径,优先级高于上述                Collections.singletonList("C:/def/4/5/5.txt")        );        // 设置过滤器        //format.setFilesFilter(FilePathFilter.createDefaultFilter());        format.setFilesFilter(filesFilter);         env.readFile(format, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, BasicTypeInfo.STRING_TYPE_INFO)            .print();        env.execute();    }}

3

关于 FileInputFormat


FileInputFormat是Flink中的FileInputFormat,并不是hadoop中的,包名是:org.apache.flink.api.common.io,它是flink中处理文件的顶级接口,接口继承如下:

可以看出,大体可以分为两类:读二进制文件(BinaryInputFormat)、读指定分隔符的文件(DelimitedInputFormat)。在实际的工作中后者使用的最多,后者默认按照行分隔符("\n")进行分割,但是也是可以指定行分隔符。

// 可以指定字符串作为行分隔符public void setDelimiter(String delimiter) {    if (delimiter == null) {        throw new IllegalArgumentException("Delimiter must not be null");    }    this.delimiter = delimiter.getBytes(getCharset());    this.delimiterString = delimiter;} // 可以指定字符作为行分隔符public void setDelimiter(char delimiter) {    setDelimiter(String.valueOf(delimiter));} // 最底层的调用public void setDelimiter(byte[] delimiter) {    if (delimiter == null) {        throw new IllegalArgumentException("Delimiter must not be null");    }    this.delimiter = delimiter;    this.delimiterString = null;}

而DelimitedInputFormat又有两个重要的子类:TextInputFormat,单纯的读一行文本,不做任何的改变,即原样输出;GenericCsvInputFormat,用来读取csv文件,但是我们知道所谓的csv文件就是以逗号(,)作为一行的字段分隔符的文本文件,因此该方法支持修改字段分隔符。

public void setFieldDelimiter(String delimiter) {    if (delimiter == null) {        throw new IllegalArgumentException("Delimiter must not be null");    }     this.fieldDelim = delimiter.getBytes(getCharset());    this.fieldDelimString = delimiter;}

GenericCsvInputFormat又有三个重要的子类,分别是:

  • RowCsvInputFormat:将csv文件转化为flink内部的Row类型

  • PojoCsvInputFormat:将csv文件转化为java中的POJO类型

  • TupleCsvInputFormat:将csv文件转化为指定元组数的Tuple类型

我们这里看一下Pojo的使用方法,其他的类型使用类似,这里不再一一赘述。

import org.apache.flink.api.java.io.PojoCsvInputFormat;import org.apache.flink.api.java.typeutils.PojoTypeInfo;import org.apache.flink.api.java.typeutils.TypeExtractor;import org.apache.flink.core.fs.Path;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.FileProcessingMode;  public class FileReader {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         String filePath = "C:/ghi/1.txt";         // 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo        PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class);         // 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序        String[] fieldOrder = new String[]{"userId", "behavior", "timestamp"};         // 创建 PojoCsvInputFormat        PojoCsvInputFormat format = new PojoCsvInputFormat<>(new Path(filePath), pojoType, fieldOrder);         //指定了换行符,那么在文本中就不能换行,        format.setDelimiter("@_@");        format.setFieldDelimiter("||");                 // 如果文本首行是字段名称,还可以跳过首行        // format.setSkipFirstLineAsHeader(true)        env.readFile(format, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, pojoType)           .print();        env.execute();    }     // POJO类必须是public修饰,并且不能是内部类,如果是内部类需要使用static修饰    public static class UserBehavior {        // 字段必须是public修饰或者有对应的public的set/get方法        public long userId;         // 用户ID        public String behavior;     // 用户行为, 包括("pv", "buy", "cart", "fav")        public long timestamp;      // 行为发生的时间戳,单位秒          // POJO的要求,必须要有public的无参构造        public UserBehavior() {        }         public UserBehavior(long userId, String behavior, long timestamp) {            this.userId = userId;            this.behavior = behavior;            this.timestamp = timestamp;        }         @Override        public String toString() {            return "UserBehavior{" +                    "userId=" + userId +                    ", behavior='" + behavior + '\'' +                    ", timestamp=" + timestamp +                    '}';        }    }}

如果将上述的readFile算子的并行度设置为3,可以看到一个很奇怪的现象,如下,会有两个Custom File Source。这是为什么呢?

通过源码分析我们知道,Flink 的readFile是将文件读取过程拆分为两个子任务,即目录监控数据读取这些子任务中的每一个都由单独的实体实现。监控由单个非并行(并行性 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度等于作业并行度。单个监控任务的作用是扫描目录(定期或仅一次,取决于watchType),找到要处理的文件,将它们拆分,并将这些拆分分配给下游reader。reader将是读取数据的最终。每个 split 只能由一个 reader 读取,而一个 reader 可以读取多个 split

private <OUT> DataStreamSource<OUT> createFileInput(        FileInputFormat<OUT> inputFormat,        TypeInformation<OUT> typeInfo,        String sourceName,        FileProcessingMode monitoringMode,        long interval) {     // 输入分片构建的函数类    ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(                    inputFormat, monitoringMode, getParallelism(), interval);     // 读取输入分片的具体实现类    ContinuousFileReaderOperatorFactory<OUT, TimestampedFileInputSplit> factory = new ContinuousFileReaderOperatorFactory<>(inputFormat);     final Boundedness boundedness = monitoringMode == FileProcessingMode.PROCESS_ONCE                    ? Boundedness.BOUNDED                    : Boundedness.CONTINUOUS_UNBOUNDED;     // 和我们使用env.addSource一样,但是后面进跟着调用了一个transform。这里就是整个解析中要重点说明的一点,monitoringFunction中只是负责构建数据切片的,到这一步,其实这个source的并行度还是1。调用transform方法之后,将数据切片中的内容读取出来,这里的并行度才是配置文件中的并行度,我们设置的并行度也只能在这里生效    SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName, null, boundedness)                    .transform("Split Reader: " + sourceName, typeInfo, factory);     return new DataStreamSource<>(source);}

所以通过以上分析,关于网上很多博客说readTextFile底层源码并行度是1,通过源码观察我们发现里面这种表达并不完整。以下是网上博客截图:

涤生大数据往期精彩推荐

1.企业数仓DQC数据质量管理实践篇

2.企业数据治理实战总结--数仓面试必备

3.OneData理论案例实战—企业级数仓业务过程

4.中大厂数仓模型规范与度量指标有哪些?

5.手把手教你搭建用户画像系统(入门篇上)

6.手把手教你搭建用户画像系统(入门篇下)

7.SQL优化之诊断篇:快速定位生产性能问题实践

8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!

9.新能源趋势下一个简单的数仓项目,助力理解数仓模型

10.基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践

11.开发实战角度:distinct实现原理及具体优化总结

12.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)

13.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(二)

14.5分钟了解实时车联网,车联网(IoV)OLAP 解决方案是怎样的?

15.企业级Apache Kafka集群策略:Kakfa最佳实践总结

16.玩转Spark小文件合并与文件读写提交机制

17.一文详解Spark内存模型原理,面试轻松搞定

18.大厂8年老司机漫谈数仓架构

19.一文带你深入吃透Spark的窗口函数

20.大数据实战:基于Flink+ODPS进行最近N天实时标签构建

21.数仓面试高频-如何在Hive中实现拉链表

22.数仓面试还不懂什么是基线管理?

23.传说中的热点值打散之代码怎么写? 

24.列转行经典实现,细谈hive中的爆炸函数

25.玩转大厂金融风控体系建设

26.Doris凭什么这么强?

27.数仓疑惑:深度解析数据域和主题域的区别?

28.数仓实战:主题大宽表的详解及hive中的实现


继续滑动看下一个
涤生大数据
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存