查看原文
其他

干货 | 详解 FlinkSQL 实现原理

The following article is from 大数据左右手 Author 王了个博

主要内容

本篇主要从FlinkSQL实现的内核与原理,工作流等的视角带大家构建一幅FlinkSQL全景图(以Blink为主介绍),探知背后支撑的“男人们”(组件)。建议收藏,仅此一份。


主要内容:

1. Table API 与 SQL

2. Apache Calcite

3. 元数据

4. SQL 函数

5. Flink Planner 与 Blink Planner

6. Blink SQL执行过程

7. SQL优化器

8. 总结

Table API 与 Table SQL

Table API 和 Table SQL 集成在同一套 API 中。这套 API 的核心概念是Table,用作查询的输入和输出。

Apache Flink 具有两个关系型 API - Table API 和 Table SQL - 用于统一的流和批处理。Table API 是 Scala 和 Java 的语言集成查询 API,它允许用非常直观的方式从关系运算符(如选择、过滤和连接)组成查询。Flink 的 SQL 支持是基于 Apache Calcite,它实现了 SQL 标准。无论输入是批处理输入(DataSet)还是流输入(DataStream),在任一接口中指定的查询都具有相同的语义,并指定相同的结果。

Table API 和 SQL 接口与 Flink 的 DataStream 和 DataSet API 紧密集成。你可以很容易地在所有 API 和建立在 API 基础上的库之间切换。

Apache Calcite

Calcite 是什么

Apache Calcite是一款开源的动态数据管理框架,它提供了标准的 SQL 语言、多种查询优化和连接各种数据源的能力,但不包括数据存储、处理数据的算法和存储元数据的存储库。

Calcite采用的是业界大数据查询框架的一种通用思路,它的目标是“one size fits all(一种方案适应所有需求场景)”,希望能为不同计算平台和数据源提供统一的查询引擎。

Calcite作为一个强大的SQL计算引擎,在Flink内部的SQL引擎模块就是基于Calcite。

Calcite 的特点

  • 支持标准SQL语言;
  • 独立于编程语言和数据源,可以支持不同的前端和后端;
  • 支持关系代数、可定制的逻辑规则和基于成本模型优化的查询引擎;
  • 支持物化视图(materialized view)的管理(创建、丢弃、持久化和自动识别);
  • 基于物化视图的Lattice和Tile机制,以应用于OLAP分析;
  • 支持对流数据的查询。

Calcite 的功能

1. SQL 解析

Calcite 的SQL解析是通过JavaCC实现的,使用JavaCC编写SQL语法描述文件,将SQL解析成未经校验的AST语法树。

2. SQL 校验

无状态的校验:验证SQL语句是否符合规范。有状态的校验:通过与元数据结合验证SQL的Schema,Field,Function是否存在,输入和输出是否符合。

3. 查询优化

对RelNode和逻辑计划树进行优化,得到优化后的生成物理执行计划。

4. SQL 生成器

将物理执行计划生成特定平台的可执行程序,比如Flink,Hive,不同规则的SQL查询语句。

5. 执行

通过各个执行平台在内存中编译,然后执行查询。

FlinkSQL 结合 Calcite

一条SQL从提交到Calcite解析,优化,到最后的Flink执行,一般分以下过程:

1. Sql Parser: 将sql语句通过java cc解析成AST(语法树),在calcite中用SqlNode表示AST;

2. Sql Validator: 结合数字字典(catalog)去验证sql语法;

3. 生成Logical Plan: 将sqlNode表示的AST转换成LogicalPlan, 用relNode表示;

4. 生成 optimized LogicalPlan: 先基于calcite rules 去优化logical Plan,基于flink定制的一些优化rules去优化logical Plan;

5. 生成Flink PhysicalPlan: 这里也是基于flink里头的rules将,将optimized LogicalPlan转成成Flink的物理执行计划;

6. 将物理执行计划转成Flink ExecutionPlan: 就是调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各种算子。

Table API 来提交任务的话,基本流程和运行SQL类似,稍微不同的是:table api parser: flink会把table api表达的计算逻辑也表示成一颗树,用treeNode去表式; 在这棵树上的每个节点的计算逻辑用Expression来表示。

简单说一下SQL优化:RBO(基于规则)

RBO主要是开发人员在使用SQL的过程中,有些发现有些通用的规则,可以显著提高SQL执行的效率,比如最经典的filter下推:

将Filter下推到Join之前执行,这样做的好处是减少了Join的数量,同时降低了CPU,内存,网络等方面的开销,提高效率。

SQL优化的发展,则可以分为两个阶段,即RBO(基于规则),和CBO(基于代价)

RBO和CBO的区别大概在于: RBO只为应用提供的rule,而CBO会根据给出的Cost信息,智能应用rule,求出一个Cost最低的执行计划。需要纠正很多人误区的一点是,CBO其实也是基于rule的,接触到RBO和CBO这两个概念的时候,很容易将他们对立起来。但实际上CBO,可以理解为就是加上Cost的RBO。

元数据

Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

1. 目前支持的类型

(1) GenericInMemoryCatalog

是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

(2) JdbcCatalog

JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。

(3) HiveCatalog

HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。

(4) 用户自定义 Catalog

Catalog 是可扩展的,用户可以通过实现 Catalog 接口来开发自定义 Catalog。想要在 SQL CLI 中使用自定义 Catalog,用户除了需要实现自定义的 Catalog 之外,还需要为这个 Catalog 实现对应的 CatalogFactory 接口。

CatalogFactory 定义了一组属性,用于 SQL CLI 启动时配置 Catalog。这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到 CatalogFactory 并初始化相应的 Catalog 实例。

2. 元数据分类

catalog定义主要有三种数据类型接口,也就是常用到的数据库,表&视图,函数。当然还有最上层的Catalog容器。

(1) 数据库

等同于数据库中库的实例,接口定义为CatalogDatabase,定义数据库实例的元数据,一个数据库实例中包含表,视图,函数等多种对象。

(2) 表&视图

CatalogTable对应数据库中的表,CatalogView队形数据库中的视图。

是一种存储的实体,包换了字段信息,表的分区,属性,描述信息。其实说白了字段定义和之前印象的数据库很是类似。你可以对比过来。不同的是,拿flink来说,所有的表都是外部数据源,除了上面所说的,还需要访问信息,比如IP端口,mater地址,connector连接类等等。

视图是一个虚拟概念,本质上是一条SQL查询语句,底层对应一张表或者多张表。包含SQL查询语句,视图的字段信息,视图的属性等等的信息。

(3) 函数

CatalogFunction是函数元数据的接口。函数元数据包含了所在的类信息和编程语言。

3. 数据访问

Flink的Table API和SQL程序可以连接到其他外部系统,用于读和写批处理表和流表。source table提供对存储在外部系统(如数据库、消息队列或文件系统)中的数据的访问。sink table 向外部存储系统发送表。根据source和sink器的类型,它们支持不同的格式,如CSV、Avro、Parquet或ORC。

(1) TableSchema

Table Source 和 Sink需要具备对外数据源的描述能力,所以Flink定义了TableSchema对象来定义字段名称和字段类型,存储格式等等信息

(2) 时间属性

支持处理时间和时间时间

(3) Watermark

用来处理乱序的数据。

4. Table Source & Table Sink

Flink本地支持各种连接器,可以查看往期总结
  • Filesystem
  • Elasticsearch
  • Apache Kafka
  • Amazon Kinesis Data Streams
  • JDBC
  • Apache HBase
  • Apache Hive
几个主要Table Source与Sink体系

(1) StreamTableSource

流数据抽象,区分了无界数据与有界数据。

(2) LookupableTableSource

按照Join条件中的字段进行关联。

(3) FilterableTableSource

过滤不符合条件的记录。

(4) LimitableTableSource

限制记录条数。

(5) ProjectableTableSource

过滤不会被使用的字段。

(6) AppendStreamTableSink

追加模式的TableSink 支持追加,不支持更新。

(7) RetractStreamTableSink

支持召回模式的TableSink,召回模式其实就是流上的update。

(8) UpsertStreamTableSink

有则更新,无则插入

SQL 函数

临时函数和持久化函数。临时函数始终由用户创建,它容易改变并且仅在会话的生命周期内有效。持久化函数不是由系统提供,就是存储在 Catalog 中,它在会话的整个生命周期内都有效。

内置函数

Table API和SQL为用户提供了一组用于数据转换的内置函数。如果您需要的函数还不受支持,您可以实现用户定义的函数

(1) Comparison Functions(比较型函数)

  eg:value1 = value2

(2) Logical Functions(逻辑函数)

  eg: boolean1 OR boolean2

(3) Arithmetic Functions(算术函数)

 eg: numeric1 + numeric2

(4) String Functions(字符串函数)

UPPER(string)

(5) Temporal Functions(时间函数)

YEAR(date)

(6) Conditional Functions(有条件的函数)

IF(condition, true_value, false_value)

(7) Type Conversion Functions(类型转换函数)

CAST(value AS type)

(8) Collection Functions(集合函数)

array '[' INT ']'

(9) Value Construction Functions  , Value Access Functions,Grouping Functions,Hash Functions,Auxiliary Functions,Aggregate Functions,Column Functions (不一一列举)

自定义函数

(1) 标量函数(UDF)

标量函数 将标量值转换成一个新标量值,也就是对一行数据中的一个或者多个字段返回一个单值。

(2) 聚合函数(UDAGG)

自定义聚合函数(UDAGG)是把一个表(一行或者多行,每行可以有一列或者多列)聚合成一个标量值。

(3) 表值函数(UDTF)

表值函数 将标量值转换成新的行数据。可以接收一个或者多个字段作为参数,输出多行列数据。

(4) 表值聚合函数(UDTAGG)

自定义表值聚合函数(UDTAGG)可以把一个表(一行或者多行,每行有一列或者多列)聚合成另一张表,结果中可以有多行多列。

(5) 异步表值函数

异步表值函数 是异步查询外部数据系统的特殊函数。

Planner 与 Blink Planner

Flink Table/SQL体系中的Planner(即查询处理器)是沟通Flink与Calcite的桥梁,为Table/SQL API提供完整的解析、优化和执行环境。

Flink Table 的新架构实现了查询处理器的插件化,项目完整保留原有 Flink Planner (Old Planner),同时又引入了新的 Blink Planner,用户可以自行选择使用 Old Planner 还是 Blink Planner。

主要区别:

  • Blink做到了真正的流批统一,即将批看做是特殊的流,把处理批的API和处理流的API做成了一样的。也就是说不管是批数据还是流数据,底层统统都是DataStream。所以使用Blink作为table planner的程序,Table和DataSet是不能相互转换的。

  • Blink planner是不支持BatchTableSource的,它只支持StreamTableSource。

  • Blink Planner和Old Planner的FilterableTableSource是不兼容的。Old - Planner会下推PlannerExpression到FilterableTableSource。而Blink planner下推的是Expression。

  • 基于String的键值对配置项只能用于Blink Planner

  • Blink Planner会优化多个sink到同一个TableEnvironment和StreamTableEnvironment。而Old Planner会为不同的sink优化到自己的DAG中,也就是说有几个sink就有几个DAG。

  • Old Planner 不支持 catalog统计,Blink支持。

  • Old Planner 不支持版本表(versioned Table)。版本表类似HBASE中版本表的意思,每个key可以记住过去的几个值。

Blink SQL执行过程

SQL执行过程分三个阶段

(1) 从SQL到 Operation

(2) 从Operation 到 Transformation

(3) 环境的执行阶段

从SQL到 Operation

(1) 解析SQL转换为QueryOperation;

(2) SQL解析为SqlNode;

(3) 校验SqlNode;

(4) 调用Calcite SQLToRelConvertrt将SqlNode转化为RelNode逻辑树;

(5) RelNode转化为Operation。

Operation 到 Transformation

(1) DQL(数据查询语言)转换,在flink中作为中间运算;

(2) DML(数据操作语言),DQL转换。

整个转换从Operation开始,先转换为Calcite的逻辑计划树,再转化为Flink的逻辑计划树,然后进行优化。优化后的逻辑树转换为Flink的物理执行,物理执行生成一系列的算子,udf等等,包装到Transformation中。

环境的执行阶段

有了Transformation后正式进入到StreamGraph的过程中,最终交给Flink集群去运行。

SQL优化器

查询优化器

再次提到两个优化器:RBO(基于规则的优化器) 和 CBO(基于代价的优化器)

(1) RBO(基于规则的优化器)会将原有表达式裁剪掉,遍历一系列规则(Rule),只要满足条件就转换,生成最终的执行计划。一些常见的规则包括分区裁剪(Partition Prune)、列裁剪、谓词下推(Predicate Pushdown)、投影下推(Projection Pushdown)、聚合下推、limit下推、sort下推、常量折叠(Constant Folding)、子查询内联转join等。

(2) CBO(基于代价的优化器)会将原有表达式保留,基于统计信息和代价模型,尝试探索生成等价关系表达式,最终取代价最小的执行计划。CBO的实现有两种模型,Volcano模型,Cascades模型。这两种模型思想很是相似,不同点在于Cascades模型一边遍历SQL逻辑树,一边优化,从而进一步裁剪掉一些执行计划。

目前各大数据库和计算引擎倾向于CBO。

总结

在目前情况下,在阿里对Flink社区的贡献下,Flink包含了Flink SQL 和 Blink SQL体系,Flink Planner称之为 Old Planner,Blink Planner称之为 New Planner。从中可以发现 Blink Planner是未来,Flink Planner将会被淘汰。

FlinkSQL依靠 Calcite提供了一套SQL验证,解析,优化等等操作。同时FlinkSQL提供元数据管理,SQL函数,数据源的建设。也自由化地提供了自定义函数,自定义connector连接,丰富了场景的使用。

FlinkSQL你值得拥有!!!

- EOF -

推荐阅读  点击标题可跳转

1、OPPO 数据湖统一存储技术实践

2、一文理解 Raft 分布式一致性算法

3、2 万字 | Kafka 知识体系保姆级教程,附详细解析


看完本文有收获?请转发分享给更多人

关注「大数据与机器学习文摘」,成为Top 1%

点赞和在看就是最大的支持❤️

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存