Spark streaming 设计与实现剖析
最近写spark streaming 比较多,遇到问题也不断的翻 streaming的代码, 是时候总结一下, 主要是记录一下 spark streaming 的设计和实现细节,
主线
可以看到spark streaming 的总体设计和各大组件, spark streaming 跟 storm 和flink都是不同的, spark streaming 核心设计是微批处理, 而 storm 和flink核心设计是原生流设计,
spark streaming 是准实时的架构,延迟要大一些,但是吞吐量要好一些, 把一个 unbound data steam 用刀切成一段一段,然后把每一段包装成 spark core中的 job, 生成rdd task, 放在 spark 集群中运行, 而flink storm 是让数据在处理组件上流动,批量处理只是流处理的一个特例,所以说两种流处理框架本质上是不同的,
上面是spark streaming中的一些核心组件, jobScheduler 是启动入口, 会启动 jobGenerator 和 ReceiverTracker。 jobGenerator 负责根据静态模板生成动态 job, 然后把job 交给 jobScheduler 调度执行, ReceiverTracker 负责划分每个时间单位的数据,以及提供 数据的meta信息,
spark streaming context 的主类里面包含一个 spark context 实例, jobScheduler 运行job的时候,把 DStreamGraph 转变成一个 RDD DAG 然后 在 spark context 的运行
运转 流程
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
上面这段官方给的demo, 很简单就是从 hdfs上面找到一个文件,然后进行 WordCount, 我们通过这个demo来跟踪 spark steaming 核心流程是怎么运转起来的,
数据源
ssc.textFileStream -> textFileStream -> FileInputDStream
这个调用最终把数据源包装成了一个 FileInputDStream
实例, spark streaming 还有其他一些 DStream, 他们都继承于 DStream , 这个 DStream 类似于 spark core 中的RDD, 支持 map, filter 之类的转换操作, 下面还有一些其他的 DStream
数据转换
flatMap -> FlatMappedDStream
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
flatMap 调用包装出来了一个 FlatMappedDStream, 而且这个 FlatMappedDStream 会把上面提到的 FileInputDStream 当 做parent, 而且记录指针, 这个是为了后面回溯的时候使用,
数据输出
wordCounts.print()
这里就比较关键了, 这是一个输出操作, 你可以和 spark core中的 action 类比,
print -> foreachRDD -> ForEachDStream -> register
ssc.graph.addOutputStream(this)
这里会把最后一个 ForEachDStream 注册在 全局变量 graph 上面, 相当于注册了全局模板, 到时候动态生成 job的时候就能找到这个模板,然后生成 RDD DAG,
有同学有点疑问, 上面这些运行完了, 还是没有发现在哪里转换为了spark core 中的job运行在集群里面了
组件启动
在 spark streaming context 启动的时候
全局的 spark context 实例
graph 实例, 这是一个模板类, 记录 DStreamGraph , 其实就是记录了 ForEachDStream,
JobScheduler 这个是核心调度类, 里面会生成 jobGenerator 和 ReceiverTracker
生成动态 RDD DAG
jobGenerator
这个实例生成的时候会 启动一个 RecurringTimer 定时器, 定时器会根据你启动 spark streaming 的时候设定的微批间隔时间, 定时发送 GenerateJobs
这个case class 到 eventLoop中, 然后 jobGenerator 无限循环获取执行, 偏函数匹配 jobGenerator 消息, 最终调用 GenerateJobs
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
根据注释,我们也能猜到这里做了什么, 一个是分配这个时间片内的需要处理数据的meta信息,
然后调用 graph.generateJobs, 里面就调用了,我们上文提到的 outputStream.generateJob
也就是 ForEachDStream.generateJob -> parent.getOrCompute
然后就是不断回溯 FlatMappedDStream 和 FileInputDStream,
这里分为两类,DStream,
第一类是数据源, 他们的 compute 执行就是, 得到一个 获取数据的 RDD, 比如上文我们提到的从文件获取得到的是一个 NewHadoopRDD, 这个我们都比较清楚, 这个RDD执行的时候就是去hdfs 上面读取文件, 如果忘了, 可以看看 spark core 里面的实现,
还有一类是 是继承 ReceiverInputDStream的 input, 比如 KafkaInputDStream , 他们 compute 就是通过 receiverTracker, 获取fetch 数据的meta信息, 然后包装成一个 BlockRDD,
这个RDD运行的时候根据数据meta信息可以获取到数据,
第二类,是转换 DStream, 这类的实现 其实就是往上调用, 类似于递归调用, 实际效果 完全相当于用 RDD API 写了这样的代码:return parentRDD.map(filterFunc), 这样就容易理解多了。
上文提到最终会调用 ForEachDStream.generateJob , 函数如下:
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
我们来看一下这个函数, 比较关键, 这个ForEachDStream new的时候会传入一个 闭包函数,
比如 print 的时候, 传入的闭包就是
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
打印 一些信息, 然后在 rdd上面调用spark core 中的action 操作, 大家都知道这意外着什么, 其实就是 触发了 spark context 的一系列操作, DAG 分解, 然后调度运行,
spark streaming 这边把这个调用操作包装为了自己定义的一个 job , 这个 job的run函数, 就是调用这个闭包, 大家记住这一点, 因为后面流程中会调用到这里,
其实总结一句话, 整个过程, 就是把 DStreamGraph 模板,生成了RDD DAG实例,
真正运行
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
上文主要分析了, generateJobs 这里面做了什么, 下面看如果 match success, 也就是动态job生成之后,返回一个 Seq[Job], 这里是一个 job的集合, 一般每一个 spark streaming的output操作都会产生一个 job, 比如你在 一个 Dstream上连续调用 100次打印, 这里就会有100个job, 这里 会把 Seq[Job] 包装为一个 JobSet, 然后提交运行, 就是交给 jobScheduler 去调度运行,
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
下面就到了这里, 很简单, jobScheduler 维护了一个线程池, 主要是怕大量产生销毁线程会有性能问题, 这里又把刚才提到的job包装了一下, 包装为了一个 JobHandler, 这个 JobHandler 的run方法,
run ->
其实就是 调用了传来的job 的run方法, 大家还记得上文提到的,
其实就是调用了上文提到的闭包, 就是在 spark context上面调用了action操作, 后面就是 spark core 的流程了, 跟spark streaming 就没有什么关系了,
总结
其中整个过程挺清楚的, 代码量也比较小, 多亏了了 Scala的优雅, 但是某些地方会有一点trick,