查看原文
其他

通俗说Spark

刘超 刘超的通俗云计算 2019-03-29

前面有一篇文章形象解析了Yarn的工作原理,这一篇文章通俗解析一下当前最火的大数据框架Spark。


通俗说基于Yarn的Map-Reduce过程


听说过Spark 的人常听到他强于Hadoop 的原因是他是基于内存的计算,因而比Hadoop快,可是数据量如此之大,怎么可能都放在内存里面呢?


当然不是所有的都在内存里面,Spark比hadoop快而是由Spark全新的运行机制决定的。


一提Spark 的大数据处理能力,有一个抽象的概念叫RDD,其实用户可以逻辑地认为数据全在内存中,仅仅关注数据处理的逻辑即可,这有点像客户提的需求,往往是抽象的,需要在实现的过程中慢慢的落地。(这里接着延续通俗说Yarn里面接项目的模式)


客户开始口若悬河的描述他们想怎么处理这些数据,例如每个都加一(map),过滤掉一些(filter),合并一下(union),按照key做个汇总(reduceByKey),最后处理完了,写入HDFS。


好了,你作为需求分析师开始记录客户想要做的改进,一一记下来,最后形成了一张如下的有向无环图。



好了,接到需求了,开始干吧!等等,咱们统筹规划一下,先别着急动手。

这是Spark和hadoop编程模型不同的地方,上述的所有的操作,map-reduce看到一条就做一条,例如每个都加一,构成一个map-reduce,读一次磁盘,写一次磁盘,同理过滤,构成另一个map-reduce,以此类推,所以整个处理过程比较慢。


Spark当看到一个需求的时候,判断这是一个中间状态的转换(Transformation),还是客户要的最后结果,如果是中间状态,则等等看,如果是客户要结果了,才开始真正的行动(Action)。


当要行动的时候,客户的需求已经完全清楚了,可以统筹规划。



这个做统筹规划的人叫DAGScheduler,他不真正的执行任务,也不调度任务的执行,既不是程序员,也不是项目经理,而是需求分析师,给出来的常称为High Level Design。


DAGScheduler中的DAG是Directed Acyclic Graph,有向无环图,他讲客户的需求画成一个图,这样看起来清晰多了,然后从最后客户想要的结果入手进行分析。


客户想要的结果肯定是最后一步finalStage,然后看要得到这个结果的上一步的数据是什么,因为数据量很大,就像上一次讲Yarn一样,需要分成多个团队并行处理,但是有时候并行不起来,还需要汇总一下结果,在Spark里面也是这样的,如果上一步的数据到这一步子团队内部就能搞定,就不需要开会汇总,互相交换资源,例如上一步的数据是每个省的高考分数,这一步要求得到每个省的最高分数,这样自己省里面自己就能搞定,这叫做窄依赖,可以在一个Stage里面搞定,这就是Spark比较好的一个模式,只要是窄依赖,不需要启动另外的map-reducce,在一个子团队一直做,如果每个子团队处理数据量不大,就不用落盘,直到需要汇总的那个时候。


什么时候需要汇总呢?例如上一步是每个省的高考分数,这一步求全国的文科和理科状元,这样一个省内就搞不定了,需要大家开会汇总,每个省将自己的文科状元和理科状元报上来,分别在理科处理子团队和文科处理子团队进行排序处理,这个过程各个省之间的数据需要交换,因而这个过程往往成为数据处理的瓶颈,称为宽依赖,宽依赖会将处理过程分成两个Stage,因为一个子团队内部搞不定了,需要人统筹开会,交换资源,这个过程叫做shuffle,专门组织会议,统筹这件事情的叫做shuffleManager


就是按照这种思路,DAGScheduler从最后的输出结果往前推,凡是窄依赖的算作一个Stage,凡是宽依赖的算作另外的Stage,于是将整个处理的图划分为多个Stage,规划阶段就算可以了。


接下来是要执行了。DAGScheduler会创建一个Job,对于每一个Stage,因为Stage内部都是窄依赖,因而可以分成多个团队并行处理的,当一个Stage处理完毕,进入下一个Stage的时候,会进行一次Shuffle,也即集中开会交换数据,然后下一个Stage同样多个团队并行处理,直到输出结果。


对于DAGScheduler来讲,作为需求分析师,将任务划分为Stage以后,接下来要将每个Stage划分为并行的Task,以便不同的团队处理不同的Task。除了最后输出阶段的Task叫做ResultTask以外,前面的Stage的Task都称为MapShuffleTask。


分好了Task,具体任务交给哪个团队,需求分析师就不管了,是项目经理的事情,由另外的TaskScheduler负责分配。


TaskScheduler相当于项目经理,开始为这个job的task分配资源,但是往往项目经理接的项目不止一个,整个团队的资源就这么多,因而需要一个调度算法,资源先给谁,后给谁。这里面有两种算法,fifo是先来先得,哪个job先来,先给谁资源,fire是公平算法,对于已经接手的项目,资源平均分配。


当TaskScheduler决定要并行的运行一组Task的时候,就让自己的助理SchedulerBackend去真的申请资源,去哪里申请呢?当然是Mesos,Yarn,或者Spark自己实现的standalone。Mesos和Yarn如何分配资源的,请参考另外的文章。


反正资源管理系统会在资源足够的节点上为任务分配空间,然后会通知节点上启动一个Executer来真正的做事情,是最终真正干活的人。


Executer创建好后会向SchedulerBackend报告,一个执行这个任务的虚拟小组就算成立了。


Executer做什么呢?当然听领导安排了,领导会将任务要处理哪些数据,如何处理等配置序列化后发给它,它读取解序列化后就知道要做啥了,开始干货吧。


前面说过了,任务有两种,一种是ShuffleMapTask,一种是ResultTask,处理方式各不相同。除了最后输出结果是ResultTask,其他的处理任务都是ShuffleMapTask。


我们先来说ShuffleMapTask,为什么叫做ShuffleMapTask呢?因为作为Spark的中间过程,首先会从上一个Stage获取结果,而从上一个Stage到这一个Stage,一定是一个宽依赖,所以一定是经历了一个Shuffle的过程,这个Stage获取到的结果,其实是上一Stage Shuffle的结果,得到结果后,在本Stage中运行Map任务,并行的处理这个Stage的所有处理,由于同一个Stage里面都是窄依赖,所以不需要汇总交换信息,当本Stage的Map处理完毕之后,接着进行下一个Stage,由于又要跨Stage了,又要进行一次Shuffle了,只不过这个Stage是Shuffle的前一半,下一半会在下一个Stage完成。


所以说一个shuffle的过程是跨两个Stage的,跨两个ShuffleMapTask的。Shuffle的过程由于比较复杂,就像需要开大会,汇总,交换信息来搞定,所以需要一些人来总管这件事情,称为ShuffleManager,在Master端和Executor端都有进程,会统一管理上一个Stage会输出多少个部分,下一个Stage会输入多少个部分,称为partition,谁的数据应该拷贝给谁,ShuffleManager有很多的实现方式,当前主流的是SortShuffleManager。


例如上面的高考分数例子中,上一个Stage的partition的数目是省份的数量,下一个Stage的partition的数量是2,一个文科,一个理科,如何汇总数据呢,当然应该将每个省里面的文科的分数和理科的分数单独保存,然后每个省的文科的分数都会拷贝给下一个Stage文科这个partition,每个省的理科的分数都拷贝给下一个Stage理科这个partition。


在上一个Stage的Map阶段,写入结果的是SortShuffleWriter,他的工作机制如下:

  • 对于每一个Partition,创建一个Array,将属于这个Partition的key/value放入数组

  • 如果Array超过阈值,写写入外部存储,外部存储会记录这个Partition的ID和保存了多少条目

  • 将所有写入到外部存储的文件进行归并

  • 生成最后的数据文件是,需要生成Index文件,记录partition的位置


当结果写入文件后,会将结果的状态MapStatus汇报给SortShuffleManager,将这个结果保存在MapOutputTracker


这一个Stage中,会从SortShuffleReader中读取上面的结果,会调用BlockStoreShuffleFetcher的fetch函数,先从MapOutputTracker中得到元数据。然后根据元数据获取数据,如果在本地,则通过BlockManager读取,如果在其他Executor上,则调用BlockTransferService进程传输。


当本Stage的Shuffle结束之后,接下来就是好好的做自己的事情了,每个Partition可以在自己的窄依赖里面转换,不需要和其他的Partition沟通,例如map,filter等等。当本Stage的装换做完,到了一个宽依赖的时候,就需要开启新的Shuffle,为下一个Stage做准备了。


如此一个Stage一个Stage的下去,知道最后要输出结果的Stage了,在这个Stage里面的分给每个Executor的任务称为ResultTask。如果是ResultTask,则是最后一个Stage,获取RDD和作用于RDD的函数func,对RDD这个partition的每一项,执行func函数,例如写入HDFS。


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存