其他
转载一个看不懂的文章:F1 Query
上图的论文是我在搜索Presto相关的文章时找到的,其实这篇文章已经历时五年了,并且最近Google又发表了它的续。看的云山雾罩,然后又在网站上上找到了一篇读后感,也就是本文的正文。欢迎看懂的同学来交流群里指导一下大家。
老规矩给出论文连接:http://www.vldb.org/pvldb/vol11/p1835-samwel.pdf
原文来自:https://www.jianshu.com/p/13466b59e632
作者:xumingmingv
概述
F1 Query
的大数据处理系统的设计。F1 Query
是Google内部进行异构查询的引擎,它支持对各种不同的文件格式、各种不同的存储系统( Bigtable
, Spanner
, Google Spreadsheets
) 的数据进行联合查询。听起来跟 Presto
很像对吧,这确实也是我看到这篇论文介绍的第一反应,但是随着你看得更深入一点你就会发现这篇论文的着重点完全不在于对多数据源的支持,它甚至完全没有描述是怎么做到支持多种不同异构数据源的。F1 Query 更引以为傲的是:We present F1 Query, an SQL query engine that is unique not because of its focus on doing one thing well, but instead because it aims to cover all corners of the requirements space for enterprise data processing and analysis. (F1 Query 能够覆盖企业级大数据处理和分析领域所有数据处理需求。)
支持对小规模的 OLTP 式的数据进行高效查询。
支持低延迟地对大批量的(异构)数据进行快速即席查询。
支持对超大规模数据进行可靠的 ETL 处理。
设计初衷
这里其实主要就是在说我们也经常说的计算与存储分离啦。
整体架构
F1 Master
, 它负责对所有的查询进行监控并且管理所有 F1 Worker
。然后是 F1 Server
,F1 Server
在角色上有点像我们 Data Lake Analytics 的 FrontNode 的角色, 在请求真正执行之前做一些执行计划编译、优化的工作,是整个系统的“前端”,而真正的数据处理是由 F1 Worker
来完成的。Catalog Service
扮演的元数据中心的角色,各种异构数据的元信息都保存在这个服务里面,也就形成了一个全局的统一视图 -- 不管你数据是保存在什么介质里面。(我们 Data Lake Analytics 和 AWS的 Athena Glue都有类似的服务)。Batch Metadata
保存的是 Batch Execution
模式下任务的一些元信息,比如执行计划之类的。UDF Server
是 Google 比较创新的一个概念,它是一个 UDF 的仓库,而且是在执行引擎之外的,执行引擎通过 RPC 与 UDF Server 进行交互。F1 Server
和 F1 Worker
都是无状态的,当需要水平扩展的时候,只需要向集群里面加入新的机器就好了,数据层面不需要做任何重新分布的工作。查询的执行
F1 Server
离这些数据更近,然后返回一个 F1 Server
的列表给客户端,客户端接到之后,把这个请求重新发给这些新的 F1 Server
进行查询。F1 Query 强调虽然把计算和存储分离了,并且借助高效的网络设置,已经解决了很多数据本地化的问题,但是数据还是离计算越近,性能越好。F1 Server
上进行编译和优化,然后把这个优化好的执行计划推到执行层,而执行的时候根据客户端指定的模式偏好来选择到底用何种模式来执行。数据源
F1 Server
和 F1 Worker
不止可以访问本数据中心的数据,还可以跨数据中心访问数据。F1 Query
同时也像 Presto
一样,可以支持对各种异构数据源的查询。而且跟 Presto 一样,F1 Query 把所有的数据源都抽象成一个关系型的表(因为最终使用的查询语言是SQL嘛),因此隐藏掉了数据源本身的实现细节。不同的数据源之间可以进行关联的JOIN查询,同时借助前面提到的 Catalog Service
来统一管理这些异构数据源的元数据。整个就是一个企业级的大数据库啊,可以看到整个企业里面的所有数据。Catalog Service
管理的数据, F1 Query 还能查询不在这个元数据中心里面的数据,通过一个叫做 DEFINE TABLE
(而不是 CREATE TABLE
)的语句来对这个要查询的数据源进行描述,描述之后就可以进行查询了:DEFINE TABLE People(
format = 'csv',
path = '/path/to/peoplefile',
columns = 'name:STRING, DateOfBirth:DATE'
);
SELECT Name, DateOfBirth FROM People WHERE Name = 'John Doe';
其实本质上就是创建一个临时表,只在当前的 session 有效,为什么不用 CREATE TEMP TABLE
这种更容易理解的语法呢?这是我始终不大明白的地方。我们 Data Lake Analytics 也有类似的直接查询裸数据的语法,可以说英雄所见略同啊: SELECT count(*) FROM TABLE temp_1
(
col1 int,
col2 string
)
LOCATION 'oss://test-oss-bucket/tbl1_part/kv1.txt';
Connector
, 而在 F1 Query 里面是实现一个 Table-Valued Function(TVF)。Data Sink
Catalog Service
管理的表,也可以不是。如果是被管理的表,那么是通过 CREATE TABLE
语法创建出来的。而这个 Data Sink 的表默认是的实现是保存到 Google 的 Colossus 分布式文件系统上面去了。而用户也可以像 DEFINE TABLE 语法一样,可以用 EXPORT DATA
语法指定输出到自定义的表里面去。在这一点上 F1 Query 貌似没有 Presto 来的灵活,Presto 里面的 Data Sink 可以是任何类型的存储,并且不需要什么特殊的 EXPORT DATA
的语法。
查询语言
SQL 2011
, 他们在这上面做了一些扩展以进行嵌套结构的数据查询。比较值得一提的是,F1 Query
的SQL方言跟 Big Query
、Dremel
以及 Spanner SQL
是一样的,这样用户可以在这些系统之间很容易进行迁移 -- 统一是主旋律啊。三大执行模式
Centralized Execution
, Distributed Execution
以及 Batch Execution
。其中 Centralized Execution
和 Distributed Execution
都属于交互式(Interative)的执行模式。交互式执行模式
Centralized Execution
F1 Server
直接就执行掉了, 因为这种请求处理的数据量不大,对于资源的要求不高,因此 F1 Server
内部其实是以单线程的 pull-based
模式来执行的:pull-based
模型,是因为当这个计划开始执行的时候,上层的算子递归地调用底层算子的 GetNext()
方法来获取它自己的输入。总体来说数据都是被从下向上“拉”出来的,因此叫 pull-based
。Distributed Execution
Distributed Execution
,第一个接到这个查询请求的 F1 Server
只是充当一个调度者的角色,真正的执行是由一组 F1 Worker
共同执行。这种模式的架构就跟 Presto 很像了,这两个角色在 Presto 里面分别叫做 Coordinator
和Worker
。
Centralized
模式,什么时候用 Distributed
模式呢? 优化器对 SQL 进行解析,如果发现这个查询最好要用大并发进行分区读的话,那么它会走 Distributed
的模式,否则走的就是 Centralized
模式。Fragments
), 每个片段由一组 F1 Worker
来执行,这些片段是同时并发执行的,并且内部可能会应用流水线技术。Fragments
的呢? 优化器使用的是自底向上的策略来拆分的,每个单独的算子对于输入数据的分布(Data Distribution)都会可以有一定的要求的。一般来说这种要求是指数据是否按照某个字段进行分片。典型的例子是 Hash Join , Hash Join 需要数据按照 Group Key 或者 Join Key 进行 hash 分片 -- 这就是 HashJoin 算子的数据分布需求。如果当前的数据分布策略能够满足这个算子的要求,那么这个算子保留在当前的 Fragment 里面,否则我们就要在执行计划当中插入一个 Exchange 节点来进行数据的重新分布,同时也划分了Fragment 之间的边界。Fragment
边界之后下面一件事件就是决定这些 Fragment 的并行度, 并行度的计算也是自底向上的过程,首先最底层的 TableScan 决定了最初的并行度,然后这种并行度的信息会被一层一层地上推给一个叫做 Width Calculator
的模块来逐步计算每个 Fragment 的并行度。比如一个 HashJoin 在一个 50 并行度和一个100 并行度的两个输入 Fragment 之间进行的话,那么这个 HashJoin 算子会选用 100 并行度以照顾比较大的那个输入算子。感觉这就是在描述Presto的实现啊。在读这篇论文之前我一直搞不清楚的就是这个神奇的 Exchange 算子是怎么来的,看了这篇论文总算搞清楚了。
数据重分布(Reparitition)策略
Fragment
是并行执行的,整个执行的数据流可以看作一个DAG,数据在流经 Fragment 边界的时候会被一个 Exchange
算子进行重新分布(repartition), 对于每条数据, 数据的发送者利用一个分区函数来计算它的目的地(一个分区值: partition number
),而每个 partition number
对应到目标 Fragment 里面的一个具体的 Worker
。Exchange
的算子是通过 RPC 来实现的(Presto里面也是这样的), 而且数据的发送和接收之间还有流控的机制,这种基于 RPC 的通信机制的并发性还是挺好的,可以做到每个 Fragment 几千个分区,如果要求更高的并发度,那么就要使用 Batch Execution
模式来执行了。count(*)
), 那么所有的数据会被发送一个分区。这种情况是可以优化的,通常会在目标 Aggregation
算子之前生成另外一个 PartialAggregation
算子,这样做的好处一是提高了总体的并行度,因为多个Worker参与了聚合操作;另外因为做了部分聚合之后,要往下游发的数据变少了,Worker 间传送的总数据也就少了。F1 Query
的执行是一个可能会有多个根节点的DAG, 一个上游节点的数据可能会流向多个下游的 Fragment , 比如对同一份输入进行多种聚合,F1 Query在实现这种执行计划的时候上游的 Fragment 只会执行一次,只是把数据发往多个下游而已。这种方式对于下游数据消费速度非常敏感,因为多个不同分支可能以不同的速度消费数据,任何一个有问题就可能造成上游 Fragment 数据的堆积。F1 Query 规避这个问题的方法是把数据在内存里面进行缓冲,让下游 Fragment 慢慢消费;如果所有的下游都 Block 住的话,那么它会把数据吐到文件系统上面去避免上游 Fragment 内存爆掉。这貌似在描述我们要做的多路输出的技术方案啊。
性能考虑
HashJoin Worker
的内存里面,Broadcast HashJoin
对于数据倾斜天生免疫,因为数据是可以随机发的,但是对于 Build Input
的大小比较敏感。LookupJoin
,比较初级的做法是来一条数据我们查询一下 BuildInput
,这样显而易见性能会很差,时间可能都花在查询 BuildInput 上面了。F1 Query 当然不会这么做,F1 Query会做批量、异步处理,它会 batch 一堆数据,一次性的发给 BuildInput 去一次性查询,因为是批量查询,中间如果有重复的key也可以自动去重,节省总体的执行时间。而查询 BuildInput 的时候它会继续消费上游过来的数据,而不会堵住,保证整个过程的流水线式的执行。动态KeyRange
的数据分布算法,上游的数据发送者根据它看到的数据的分布动态地对数据的KeyRange进行分配,这个做法的依据是它本地看到的数据分布情况应该跟总体数据的分布情况类似,因此可以得到比较好的数据分布效果,避免数据倾斜。F1 Query没有透露关于这个算法更详细的信息。Batch Execution
更好。Batch Execution
FlumeJava
或者 MapReduce
写出来, 而不是SQL,MapReduce
(以及 FlumeJava
) 代码主要的问题主要在于开发和维护的成本太高,而且SQL优化器层面可以做很多优化的事情比如属性裁剪、条件下推等等,手写的 MapReduce
都是享受不到的。这一点貌似阿里巴巴倒是更现代化,阿里巴巴内部绝大多数的这种ETL工作都是用SQL来写,通过UDF来支持特定的业务逻辑,实在复杂的才用 MapReduce 任务来做。
Batch Execution
模式下接受的查询语言还是SQL,但是它后台会把SQL任务翻译成 MapReduce 的任务来执行。我们知道跟流式执行不一样,MapReduce 的不同 Stage
不是同时执行的,后一个Stage 必须等前一个 Stage 完全成功之后才能开始,因此中间结果全部落盘(Colossus分布式文件系统),这使得 MapReduce 的不同的 Stage 可以异步交互,而不需要同时在线。同时这种机制又提供了一定的容错性,如果一个 Stage 出错了,我们不需要重跑整个任务,因为 Stage 的输入保存在文件系统上,我们重跑这个失败的 Stage 就好了。Batch Execution
不止可以自动处理服务端的异常,它还能自动规避客户端的异常,客户端可以提交异步的查询,然后断开连接,而在服务器端查询会继续执行不会终止。而交互式查询都是同步执行的,客户端一旦断开整个查询也就失败了。Interative Execution
跟 Batch Execution
执行方式迥异,但是他们的查询解析、查询优化等等前端模块是完全一样的,区别只在于最后的执行阶段,如下图所示·:可扩展性
支持自定义的数据源
支持新的UDF, UDAF, TVF
UDF Server
的服务,UDF Server 是一个远程的 RPC 服务,它里面承载着 UDF,而这些 UDF 可以用任何语言比如 C++, Java, Go 来写。UDF Server 的概念第一次听说是在 Apache Beam 里面,Apache Beam 也是出自Google 之手,可见 UDF Server 在 Google 内部已经是个很成熟的概念了。这个概念还是很创新的,以前总感觉UDF这种东西性能一定要高,不能有远程调用,否则性能会很差,没想到 Google 干脆把真个 UDF 的实现都放到远端了。而性能问题则通过之前解决 HashJoin 里面 解决 BuildInput 性能类似的手段,通过 批量化 + 异步化 + 流水线化
,使得远端 UDF Server 的延迟完全被掩盖掉了。跟UDF类似,UDAF也是采用类似的策略,只不过调用UDAF 远程服务的时候除了要传递当前要聚合的输入数据,还要传当前已经聚合的结果,远程的UDAF服务则会返回新的聚合结果。因为UDF Server都是无状态的,使得F1 Query可以很好地把整体的流量分布到整个 UDF Server 集群里面,提高整体的性能。把 UDF 的概念从具体的执行引擎里面拿出来了,使得各种不同的数据执行引擎可以共用同一个 UDF Server,而不需要重复开发。用户在编写 UDF 的时候也只需要编写一份,因为业务处理的逻辑都是一样的,没必要为了每种引擎单独适配。
因为引擎与 UDF Server 通过 RPC 进行交互,这就不限定 UDF 到底用什么编程语言进行编写了,给了 UDF 编写者更大的自由度。
Table Valued Function
, 这种函数比较有意思,它的输入是一张表(当然还可以有其它普通的参数),输出是另外一张表,这种给了用户更大的自由度,对于一些新兴的场景比如机器学习就特别适合: 机器学习在模型训练的时候就是把一张表作为输入,然后输出一张新的表。比如下面的例子:SELECT * FROM EventsFromPastDays( 3, TABLE Clicks);
Clicks
表查数据,通过一个TVF( EventsFromPastDays
) 产生新的一张临时表( EventsFromPastDays( 3, TABLE Clicks)
),最后再用 SELECT
查询出来进行展现。CREATE TABLE FUNCTION EventsFromPastDays(
num_days INT64,
events ANY TABLE
) AS
SELECT * FROM
events
WHERE date >= DATE_SUB( CURRENT_DATE(), INTERVAL num_days DAY);
总结
批量化 + 异步化 + 流水线
的优化策略, 另外动态 KeyRange
的点子也是蛮有意思的。Distribution Execution
的部分几乎就是在描述 Presto 的实现,想了解 Presto 大体原理的同学可以从这篇论文开始。——END——
文章不错?点个【在看】吧! 👇