万字长文+图解,带你轻松学习 Spark
The following article is from Data跳动 Author 大D
今天给大家分享一篇 Spark 核心知识点的梳理,对知识点的讲解秉承着能用图解的就不照本宣科地陈述,力求精简、通俗易懂。希望能为新手的入门学习扫清障碍,从基础概念入手、再到原理深入,由浅入深地轻松掌握 Spark。
1、初识 Spark
Spark不仅能够在内存中进行高效运算,还是一个大一统的软件栈,可以适用于各种各样原本需要多种不同的分布式平台的场景。
1)背景
Spark作为一个用来快速实现大规模数据计算的通用分布式大数据计算引擎,是大数据开发工程师必备的一项技术栈。Spark相对Hadoop具有较大优势,但Spark并不能完全替代Hadoop。
实际上,Spark已经很好地融入了Hadoop家族,作为其中一员,主要用于替代Hadoop中的MapReduce计算模型。
2)Spark 的优势
Spark拥有Hadoop MapReduce所具备的优点,但不同的是,Hadoop每次经过job执行的中间结果都会存储在HDFS上,而Spark执行job的中间过程数据可以直接保存在内存中,无需读写到HDFS磁盘上。因为内存的读写速度与磁盘的读写速度不在一个数量级上,所以Spark利用内存中的数据可以更快地完成数据的计算处理。
此外,由于Spark在内部使用了弹性分布式数据集(Resilient Distributed Dataset,RDD),经过了数据模型的优化,即便在磁盘上进行分布式计算,其计算性能也是高于Hadoop MapReduce的。
3)Spark的特点
计算速度快
Spark将处理的每个任务都构造出一个有向无环图(Directed Acyclic Graph,DAG)来执行,实现原理是基于RDD在内存中对数据进行迭代计算的,因此计算速度很快。官方数据表明,如果计算数据是从磁盘中读取,Spark计算速度是Hadoop的10倍以上;如果计算数据是从内存中读取,Spark计算速度则是Hadoop的100倍以上。
易于使用
Spark提供了80多个高级运算操作,支持丰富的算子。开发人员只需调用Spark封装好的API来实现即可,无需关注Spark的底层架构。
通用大数据框架
大数据处理的传统方案需要维护多个平台,比如,离线任务是放在Hadoop MapRedue上运行,实时流计算任务是放在Storm上运行。而Spark则提供了一站式整体解决方案,可以将即时查询、离线计算、实时流计算等多种开发库无缝组合使用。
支持多种资源管理器
Spark支持多种运行模式,比如Local、Standalone、YARN、Mesos、AWS等部署模式。用户可以根据现有的大数据平台灵活地选择运行模式。
Spark生态圈丰富
Spark不仅支持多种资源管理器调度job,也支持HDFS、HBase等多种持久化层读取数据,来完成基于不同组件实现的应用程序计算。目前,Spark生态圈已经从大数据计算和数据挖掘扩展到机器学习、NLP、语音识别等领域。
2、Spark 的模块组成
Spark 是包含多个紧密集成的组件,这些组件结合密切并且可以相互调用,这样我们可以像在平常软件项目中使用程序库一样,组合使用这些的组件。
1)Spark 的模块组成
Spark 基于 Spark Core 建立了 Spark SQL、Spark Streaming、MLlib、GraphX、SparkR等核心组件;
基于这些不同组件又可以实现不同的计算任务;
这些计算任务的运行模式有:本地模式、独立模式、YARN、Mesos等;
Spark任务的计算可以从HDFS、HBase、Cassandra等多种数据源中存取数据。
3、Spark 的运行原理
Local 模式 :学习测试使用,分为 Local 单线程和 Local-Cluster 多线程两种方式;
Standalone 模式 :学习测试使用,在 Spark 自己的资源调度管理框架上运行;
ON YARN :生产环境使用,在 YARN 资源管理器框架上运行,由 YARN 负责资源管理,Spark 负责任务调度和计算;
ON Mesos :生产环境使用,在 Mesos 资源管理器框架上运行,由 Mesos 负责资源管理,Spark 负责任务调度和计算;
On Cloud :运行在 AWS、阿里云、华为云等环境。
Cluster Manager :Spark 集群管理器,主要用于整个集群资源的管理和分配,有多种部署和运行模式;
Worker :Spark 的工作节点,用于执行提交的任务;
Executor :真正执行计算任务的一个进程,负责 Task 的运行并且将运行的结果数据保存到内存或磁盘上;
Driver :Application 的驱动程序,可以理解为驱动程序运行中的 main() 函数,Driver 在运行过程中会创建 Spark Context;
Application :基于 Spark API 编写的应用程序,包括实现 Driver 功能的代码和在集群中多个节点上运行的 Executor 代码。
通过注册机制向 Cluster Manager汇报自身的 CPU 和内存等资源使用信息;
在 Master 的指示下,创建并启动 Executor(真正的计算单元);
将资源和任务进一步分配给 Executor 并运行;
同步资源信息和 Executor 状态信息给 Cluster Manager。
运行 Application 的 main() 函数;
创建 SparkContext;
划分 RDD 并生成 DAG;
构建 Job 并将每个 Job 都拆分为多个 Stage,每个 Stage 由多个 Task 构成,也被称为 Task Set;
与 Spark 中的其他组件进行资源协调;
生成并发送 Task 到 Executor。
4、RDD 概念及核心结构
Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
一组分区(Partition)的列表,其中分区也就是RDD的基本组成单位;
一个函数会被作用到每个分区上,RDD 的计算是以分区为单位的;
一个 RDD 会依赖其他多个 RDD,它们之间具有依赖关系;
可选,对于K-V型的RDD会有一个分区函数,控制key分到哪个reduce;
可选,一个存储每个分区优先位置的列表。
Stage:当 Spark 执行作业时,会根据 RDD 之间的依赖关系,按照宽窄依赖生成一个最优的执行计划。如果 RDD 之间为窄依赖,则会被划到一个 Stage 中;如果 RDD 之间为宽依赖,则会被划分到不同的 Stage 中,这样做的原因就是每个 Stage 内的 RDD 都尽可能在各个节点上并行地被执行,以提高运行效率。
优先列表(PreferredLocation):用于存储每个分区优先位置的列表,对于每个 HDFS 文件来说,就是保存下每个分区所在 block 的位置。按照“移动数据不如移动计算”的理念,Spark 在执行任务调度时会优先选择有存储数据的 Worker 节点进行任务运算。
CheckPoint:是 Spark 提供的一种基于快照的缓存机制,如果在任务运算中,多次使用同一个 RDD,可以将这个 RDD 进行缓存处理。这样,该 RDD 只有在第一次计算时会根据依赖关系得到分区数据,在后续使用到该 RDD 时,直接从缓存处取而不是重新进行计算。
如下图,对 RDD-1 做快照缓存处理,那么当RDD-n 在用到 RDD-1 数据时,无需重新计算 RDD-1,而是直接从缓存处取数重算。
此外,Spark 还提供了另一种缓存机制 Cache,其中的数据是由 Executor 管理的,当 Executor 消失时,Cache 缓存的数据也将会消失。而 CheckPoint 是将数据保存到磁盘或者 HDFS 中的,当任务运行错误时,Job 会从CheckPoint 缓存位置取数继续计算。
5、Spark RDD 的宽窄依赖关系
宽依赖 :父 RDD 中每个分区的数据都可以被子 RDD 的多个分区使用(涉及到了shuffle);
窄依赖 :父 RDD 中每个分区的数据最多只能被子 RDD 的一个分区使用。
说白了,就是看两个 RDD 的分区之间,是不是一对一的关系,若是则为窄依赖,反之则为宽依赖。
宽依赖的算子 :join(非hash-partitioned)、groupByKey、partitionBy;
窄依赖的算子 :map、filter、union、join(hash-partitioned)、mapPartitions;
6、Spark RDD 的转换操作与行动操作
val lines = sc.parallelize(Arrays.asList(1,2,3,4,5),n)
val lines = sc.textFile("../temp.txt")
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
val countRDD = errorsRDD.count()
如果分不清楚给定的一个 RDD 操作方法是属于转换操作还是行动操作,去看下它的返回类型,转换操作返回的是 RDD 类型,而行动操作则返回的是其他的数据类型。
7、Spark RDD 中常用的操作算子
val input = sc.parallelize(List(-2,-1,0,1,2))
val rdd1 = input.map(x => x * x)
val rdd2 = rdd1.filter(x => x != 0 )
如果需要对 RDD 中每一个元素处理后生成多个输出,也有相应的算子,比如 flatMap()。它和 map() 类似,也是将输入函数应用到 RDD 中的每个元素,不过返回的不是一个元素了,而是一个返回值序列的迭代器。
最终得到的输出是一个包含各个迭代器可访问的所有元素的 RDD,flatMap() 最经典的一个用法就是把输入的一行字符串切分为一个个的单词。
举个栗子,将行数据切分成单词,对比下 map() 与 flat() 的不同。
val lines = sc.parallelize(List("hello spark","hi,flink"))
val rdd1 = lines.map(line => line.split(","))
val rdd2 = lines.flatMap(line => line.split(","))
可以看到,把 lines 中的每一个 line,使用所提供的函数执行一遍,map() 输出的 rdd1 中仍然只有两个元素;而 flatMap() 输出的 rdd2 则是将原 RDD 中的数据“拍扁”了,这样就得到了一个由各列表中元素组成的 RDD,而不是一个由列表组成的 RDD。
2) RDD 的转换算子
Spark 中的转换算子主要用于 RDD 之间的转化和数据处理,常见的转换算子具体如下:(发送推文预览时发现表格内容展示不全,就干脆截图了
3)RDD 的行动算子
Spark 中行动算子主要用于对分布式环境中 RDD 的转化操作结果进行统一地执行处理,比如结果收集、数据保存等,常用的行动算子具体如下:
8、Spark 的共享变量之累加器和广播变量
本节将介绍下 Spark 编程中两种类型的共享变量:累加器和广播变量。
简单说,累加器是用来对信息进行聚合的,而广播变量则是用来高效分发较大对象的。
1) 闭包的概念
在讲共享变量之前,我们先了解下啥是闭包,代码如下。
var n = 1
val func = (i:Int) => i + n
函数 func 中有两个变量 n 和 i ,其中 i 为该函数的形式参数,也就是入参,在 func 函数被调用时, i 会被赋予一个新的值,我们称之为绑定变量(bound variable)。而 n 则是定义在了函数 func 外面的,该函数并没有赋予其任何值,我们称之为自由变量(free variable)。
像 func 函数这样,返回结果依赖于声明在函数外部的一个或多个变量,将这些自由变量捕获并构成的封闭函数,我们称之为“闭包”。
先看一个累加求和的栗子,如果在集群模式下运行以下代码,会发现结果并非我们所期待的累计求和。
var sum = 0
val arr = Array(1,2,3,4,5)
sc.parallelize(arr).foreach(x => sum + x)
println(sum)
sum 的结果为0,导致这个结果的原因就是存在闭包。
在集群中 Spark 会将对 RDD 的操作处理分解为 Tasks ,每个 Task 由 Executor 执行。而在执行之前,Spark会计算 task 的闭包(也就是 foreach() )。闭包会被序列化并发送给每个 Executor,但是发送给 Executor 的是副本,所以在 Driver 上输出的依然是 sum 本身。
如果想对 sum 变量进行更新,则就要用到接下来我们要讲的累加器。
2)累加器的原理
累加器是对信息进行聚合的,通常在向 Spark 传递函数时,比如使用 map() 或者 filter() 传条件时,可以使用 Driver 中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,然而,正如前面所述,更新这些副本的值,并不会影响到 Driver 中对应的变量。
累加器则突破了这个限制,可以将工作节点中的值聚合到 Driver 中。它的一个典型用途就是对作业执行过程中的特定事件进行计数。
举个栗子,给了一个日志记录,需要统计这个文件中有多少空行。
val sc = new SparkContext(...)
val logs = sc.textFile(...)
val blanklines = sc.accumulator(0)
val callSigns = logs.flatMap(line => {
if(line == ""){
blanklines += 1
}
line.split("")
})
callSigns.count()
println("日志中的空行数为:" + blanklines.value)
总结下累加器的使用,首先 Driver 调用了 SparkContext.accumulator(initialValue) 方法,创建一个名为 blanklines 且初始值为0的累加器。然后在遇到空行时,Spark 闭包里的执行器代码就会对其 +1 。执行完成之后,Driver 可以调用累加器的 value 属性来访问累加器的值。
需要说明的是,只有在行动算子 count() 运行之后,才可以 println 出正确的值,因为我们之前讲过 flatMap() 是惰性计算的,只有遇到行动操作之后才会出发强制执行运算进行求值。
另外,工作节点上的任务是不可以访问累加器的值,在这些任务看来,累加器是一个只写的变量。
对于累加器的使用,不仅可以进行数据的 sum 加法,也可以跟踪数据的最大值 max、最小值 min等。
3)广播变量的原理
前面说了,Spark 会自动把闭包中所有引用到的自由变量发送到工作节点上,那么每个 Task 的闭包都会持有自由变量的副本。如果自由变量的内容很大且 Task 很多的情况下,为每个 Task 分发这样的自由变量的代价将会巨大,必然会对网络 IO 造成压力。
广播变量则突破了这个限制,不是把变量副本发给所有的 Task ,而是将其分发给所有的工作节点一次,这样节点上的 Task 可以共享一个变量副本。
Spark 使用的是一种高效的类似 BitTorrent 的通信机制,可以降低通信成本。广播的数据只会被发动各个节点一次,除了 Driver 可以修改,其他节点都是只读,并且广播数据是以序列化形式缓存在系统中的,当 Task 需要数据时对其反序列化操作即可。
在使用中,Spark 可以通过调用 SparkContext.broadcast(v) 创建广播变量,并通过调用 value 来访问其值,举例代码如下:
val broadcastVar = sc.broadcast(Array(1,2,3))
broadcastVar.value
2、Presto on Spark:通过 Spark 来扩展 Presto
3、Spark 单 value,key-value 类型 21 个算子(图解与源码)
关注「大数据与机器学习文摘」
看精选技术文章和最新行业资讯
点赞和在看就是最大的支持❤️