查看原文
其他

Spark streaming 设计与实现剖析

最近写spark streaming 比较多,遇到问题也不断的翻 streaming的代码, 是时候总结一下, 主要是记录一下 spark streaming 的设计和实现细节,

主线



可以看到spark streaming 的总体设计和各大组件, spark streaming 跟 storm 和flink都是不同的, spark streaming 核心设计是微批处理, 而 storm 和flink核心设计是原生流设计,
spark streaming 是准实时的架构,延迟要大一些,但是吞吐量要好一些, 把一个 unbound data steam 用刀切成一段一段,然后把每一段包装成 spark core中的 job, 生成rdd task, 放在 spark 集群中运行,   而flink storm 是让数据在处理组件上流动,批量处理只是流处理的一个特例,所以说两种流处理框架本质上是不同的,


上面是spark streaming中的一些核心组件,  jobScheduler 是启动入口, 会启动 jobGenerator 和 ReceiverTracker。 jobGenerator 负责根据静态模板生成动态 job, 然后把job 交给 jobScheduler 调度执行, ReceiverTracker 负责划分每个时间单位的数据,以及提供 数据的meta信息,

spark streaming context  的主类里面包含一个 spark  context 实例,   jobScheduler 运行job的时候,把 DStreamGraph 转变成一个  RDD DAG 然后 在 spark context 的运行

运转 流程

     StreamingExamples.setStreamingLogLevels()    val sparkConf = new SparkConf().setAppName("HdfsWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(2))    val lines = ssc.textFileStream(args(0))    val words = lines.flatMap(_.split(" "))    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)    wordCounts.print()    ssc.start()    ssc.awaitTermination()

上面这段官方给的demo, 很简单就是从 hdfs上面找到一个文件,然后进行 WordCount,  我们通过这个demo来跟踪 spark steaming 核心流程是怎么运转起来的,

数据源

ssc.textFileStream  -> textFileStream -> FileInputDStream 这个调用最终把数据源包装成了一个  FileInputDStream 实例,  spark streaming 还有其他一些 DStream, 他们都继承于 DStream , 这个 DStream 类似于 spark core 中的RDD,   支持 map, filter 之类的转换操作, 下面还有一些其他的 DStream


数据转换

flatMap  -> FlatMappedDStream
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))

flatMap 调用包装出来了一个 FlatMappedDStream, 而且这个 FlatMappedDStream 会把上面提到的 FileInputDStream 当 做parent, 而且记录指针,  这个是为了后面回溯的时候使用,

数据输出

wordCounts.print()
这里就比较关键了, 这是一个输出操作, 你可以和 spark core中的 action 类比,

print -> foreachRDD -> ForEachDStream -> register

ssc.graph.addOutputStream(this)

这里会把最后一个  ForEachDStream 注册在 全局变量 graph 上面, 相当于注册了全局模板, 到时候动态生成 job的时候就能找到这个模板,然后生成 RDD DAG,

有同学有点疑问, 上面这些运行完了, 还是没有发现在哪里转换为了spark core 中的job运行在集群里面了

组件启动

在 spark streaming context 启动的时候

  • 全局的 spark context 实例

  • graph 实例, 这是一个模板类, 记录 DStreamGraph , 其实就是记录了 ForEachDStream,

  • JobScheduler  这个是核心调度类, 里面会生成  jobGenerator 和 ReceiverTracker

生成动态 RDD DAG

jobGenerator这个实例生成的时候会 启动一个  RecurringTimer 定时器, 定时器会根据你启动 spark streaming 的时候设定的微批间隔时间, 定时发送  GenerateJobs 这个case class 到 eventLoop中, 然后 jobGenerator 无限循环获取执行,  偏函数匹配 jobGenerator 消息, 最终调用 GenerateJobs

jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block

根据注释,我们也能猜到这里做了什么, 一个是分配这个时间片内的需要处理数据的meta信息,
然后调用 graph.generateJobs, 里面就调用了,我们上文提到的  outputStream.generateJob
也就是 ForEachDStream.generateJob -> parent.getOrCompute

然后就是不断回溯  FlatMappedDStream 和 FileInputDStream,

这里分为两类,DStream,

第一类是数据源, 他们的 compute 执行就是, 得到一个 获取数据的 RDD, 比如上文我们提到的从文件获取得到的是一个 NewHadoopRDD, 这个我们都比较清楚, 这个RDD执行的时候就是去hdfs 上面读取文件, 如果忘了, 可以看看 spark core 里面的实现,

还有一类是 是继承  ReceiverInputDStream的 input,  比如 KafkaInputDStream  , 他们 compute 就是通过 receiverTracker, 获取fetch 数据的meta信息, 然后包装成一个  BlockRDD,
这个RDD运行的时候根据数据meta信息可以获取到数据,

第二类,是转换 DStream,  这类的实现 其实就是往上调用, 类似于递归调用, 实际效果 完全相当于用 RDD API 写了这样的代码:return parentRDD.map(filterFunc), 这样就容易理解多了。

上文提到最终会调用 ForEachDStream.generateJob , 函数如下:

override def generateJob(time: Time): Option[Job] = {    parent.getOrCompute(time) match {      case Some(rdd) =>        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {          foreachFunc(rdd, time)        }        Some(new Job(time, jobFunc))      case None => None    }  }

我们来看一下这个函数, 比较关键,  这个ForEachDStream new的时候会传入一个 闭包函数,

比如 print 的时候, 传入的闭包就是

def foreachFunc: (RDD[T], Time) => Unit = {      (rdd: RDD[T], time: Time) => {        val firstNum = rdd.take(num + 1)        // scalastyle:off println        println("-------------------------------------------")        println(s"Time: $time")        println("-------------------------------------------")        firstNum.take(num).foreach(println)        if (firstNum.length > num) println("...")        println()        // scalastyle:on println      }    }

打印 一些信息, 然后在 rdd上面调用spark core 中的action 操作, 大家都知道这意外着什么, 其实就是 触发了 spark context 的一系列操作, DAG 分解, 然后调度运行,

spark streaming 这边把这个调用操作包装为了自己定义的一个 job , 这个 job的run函数, 就是调用这个闭包, 大家记住这一点, 因为后面流程中会调用到这里,

其实总结一句话, 整个过程, 就是把 DStreamGraph 模板,生成了RDD  DAG实例,

真正运行

Try {      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch      graph.generateJobs(time) // generate jobs using allocated block    } match {      case Success(jobs) =>        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

上文主要分析了, generateJobs 这里面做了什么, 下面看如果 match success, 也就是动态job生成之后,返回一个 Seq[Job], 这里是一个 job的集合, 一般每一个 spark streaming的output操作都会产生一个 job, 比如你在 一个 Dstream上连续调用 100次打印, 这里就会有100个job,  这里 会把 Seq[Job] 包装为一个 JobSet, 然后提交运行, 就是交给    jobScheduler 去调度运行,

jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

下面就到了这里,   很简单, jobScheduler 维护了一个线程池, 主要是怕大量产生销毁线程会有性能问题, 这里又把刚才提到的job包装了一下, 包装为了一个 JobHandler, 这个 JobHandler 的run方法,

run ->其实就是 调用了传来的job 的run方法, 大家还记得上文提到的,

其实就是调用了上文提到的闭包, 就是在 spark context上面调用了action操作, 后面就是 spark core 的流程了, 跟spark streaming 就没有什么关系了,

总结

其中整个过程挺清楚的, 代码量也比较小, 多亏了了 Scala的优雅, 但是某些地方会有一点trick,

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存