基于 Spark Structured Streaming 的开发与数据处理
一、前言
大家好,我是云祁!
今天这篇文章,我会和大家讲下如何基于 Spark Structured Streaming 进行开发与数据处理 。首先我们来比较下,Structured Streaming 与 Spark Streaming 它有哪些不同呢?带着 Structured Streaming 有啥厉害之处的Σ(っ °Д °;)っ 疑问,我们开始今天的学习吧!
Tips:我会在两者比较的时候介绍 Spark Structured Streaming 的一些特性,请留意。
二、Spark Streaming vs Structured Streaming
2.1 Spark Streaming
Spark Streaming 是 Spark 最初的流处理框架,使用了微批的形式来进行流处理。
提供了基于 RDDs 的 Dstream API,每个时间间隔内的数据为一个RDD,源源不断对 RDD 进行处理来实现流计算。
2.2 Structured Streaming
Spark 2.X 出来的流框架,采用了无界表的概念,流数据相当于往一个表上不断追加行。
基于 Spark SQL 引擎实现,可以使用大多数 Spark SQL 的 function。
2.3 区别
2.3.1 流模型
Spark Streaming
Structured Streaming
“Input Table” is Unbounded - 无界的输入表
Structured Streaming 将实时数据当做被连续追加的表,流上的每一条数据都类似于将一行新数据添加到表中。
以上图为例,每隔1秒从输入源获取数据到 Input Table,并触发Query计算,然后将结果写入 Result Table,之后根据指定的 Output模式(有三种) 进行写出。
Complete Mode (完整模式)整个更新后的结果将被写入外部存储 Append Mode (default) (追加模式)由于将写入最后一个触发器,因此结果表中只会追加新行 Update Mode (更新模式) 只写入结果表中因为最后一个触发器而更新的行
上面的1秒是指定的触发间隔(trigger interval),如果不指定的话,先前数据的处理完成后,系统将立即检查是否有新数据。
With every trigger interval, new rows get appended to the input table.
需要注意的是,Spark Streaming 本身设计就是一批批的以批处理间隔划分RDD;而Structured Streaming中并没有提出批的概念,Structured Streaming 按照每个 Trigger Interval(触发间隔) 接收数据到 Input Table,将数据处理后再追加到无边界的 Result Table 中,想要何种方式输出结果取决于指定的模式。所以,虽说 Structured Streaming 也有类似于 Spark Streaming 的 Interval,其本质概念是不一样的。Structured Streaming更像流模式。
2.3.2 RDD vs DataFrame、DataSet
Spark Streaming中的DStream编程接口是RDD,我们需要对RDD进行处理,处理起来较为费劲且不美观。
stream.foreachRDD(rdd => {
balabala(rdd)
})
Structured Streaming使用DataFrame、DataSet的编程接口,处理数据时可以使用Spark SQL中提供的方法,数据的转换和输出会变得更加简单。
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "hadoop01:9092")
.option("subscribe", "order_data")
.load()
.select($"value".cast("string"))
.as[String]
.writeStream
.outputMode("complete")
.format("console")
2.3.3 Process Time vs Event Time
Process Time:流处理引擎接收到数据的时间
Event Time:事件真正发生的时间
Spark Streaming中由于其微批的概念,会将一段时间内接收的数据放入一个批内,进而对数据进行处理。划分批的时间是Process Time,而不是Event Time,Spark Streaming没有提供对Event Time的支持。
Structured Streaming提供了基于事件时间处理数据的功能,如果数据包含事件的时间戳,就可以基于事件时间进行处理。
这里以窗口计数为例说明一下区别:
我们这里以10分钟为窗口间隔,5分钟为滑动间隔,每隔5分钟统计过去10分钟网站的pv
假设有一些迟到的点击数据,其本身事件时间是12:01,被spark接收到的时间是12:11;在Spark Streaming的统计中,会毫不犹豫的将它算作是12:05-12:15这个范围内的pv,这显然是不恰当的;在Structured Streaming中,可以使用事件时间将它划分到12:00-12:10的范围内,这才是我们想要的效果。
2.3.4 可靠性保障
两者在可靠性保证方面都是使用了checkpoint机制。
checkpoint通过设置检查点,将数据保存到文件系统,在出现出故障的时候进行数据恢复。
在Spark Streaming中,如果我们需要修改流程序的代码,在修改代码重新提交任务时,是不能从checkpoint中恢复数据的(程序就跑不起来),是因为spark不认识修改后的程序了。
在Structured Streaming中,对于指定的代码修改操作,是不影响修改后从checkpoint中恢复数据的。
2.3.5 sink
二者的输出数据(写入下游)的方式有很大的不同。
Spark Streaming中提供了foreachRDD()
方法,通过自己编程实现将每个批的数据写出。
stream.foreachRDD(rdd => {
save(rdd)
})
Structured Streaming 自身提供了一些sink(Console Sink、File Sink、Kafka Sink等),只要通过option配置就可以使用;对于需要自定义的Sink,提供了ForeachWriter的编程接口,实现相关方法就可以完成。
// console sink
val query = res
.writeStream
.outputMode("append")
.format("console")
.start()
2.4 总结
总体来说,Structured Streaming 有更简洁的API、更完善的流功能、更适用于流处理。而spark streaming,更适用于与偏批处理的场景。
三、Spark Structured Streaming 特性
前面已经讲了 Structured Streaming 的基本概念,及其会再讲下它存储、自动流化、容错、性能等方面的特性,在事件时间的处理机制。
因为流处理具有如下显著的复杂性特征,所以很难建立非常健壮的处理过程:
一是数据有各种不同格式(Jason、Avro、二进制)、脏数据、不及时且无序; 二是复杂的加载过程,基于事件时间的过程需要支持交互查询,和机器学习组合使用; 三是不同的存储系统和格式(SQL、NoSQL、Parquet 等),要考虑如何容错。
因为可以运行在Spark SQL引擎上,Spark Structured Streaming天然拥有较好的性能、良好的扩展性及容错性等Spark优势。除此之外,它还具备丰富、统一、高层次的API,因此便于处理复杂的数据和工作流。再加上,无论是Spark自身,还是其集成的多个存储系统,都有丰富的生态圈。这些优势也让Spark Structured Streaming得到更多的发展和使用。
流的定义是一种无限表(unbounded table),把数据流中的新数据追加在这张无限表中,而它的查询过程可以拆解为几个步骤,例如可以从Kafka读取JSON数据,解析JSON数据,存入结构化Parquet表中,并确保端到端的容错机制。其中的特性包括:
支持多种消息队列,比如Files/Kafka/Kinesis等。 可以用join(), union()连接多个不同类型的数据源。 返回一个DataFrame,它具有一个无限表的结构。 你可以按需选择SQL(BI分析)、DataFrame(数据科学家分析)、DataSet(数据引擎),它们有几乎一样的语义-和性能。 把Kafka的JSON结构的记录转换成String,生成嵌套列,利用了很多优化过的处理函数来完成这个动作,例如from_json(),也允许各种自定义函数协助处理,例如Lambdas, flatMap。 在Sink步骤中可以写入外部存储系统,例如Parquet。在Kafka sink中,支持foreach来对输出数据做任何处理,支持事务和exactly-once方式。 支持固定时间间隔的微批次处理,具备微批次处理的高性能性,支持低延迟的连续处理(Spark 2.3),支持检查点机制(check point)。 秒级处理来自Kafka的结构化源数据,可以充分为查询做好准备。
Spark SQL把批次查询转化为一系列增量执行计划,从而可以分批次地操作数据。
在性能上,Structured Streaming 重用了Spark SQL优化器和Tungsten引擎,而且成本降低了3倍!!
Structured Streaming 隔离处理逻辑采用的是可配置化的方式(比如定制JSON的输入数据格式),执行方式是批处理还是流查询很容易识别。这里我们还需要知道批处理、微批次-流处理、持续流处理三种模式的延迟性、吞吐性和资源分配情况。
在时间窗口的支持上,Structured Streaming 支持基于事件时间(event-time)的聚合,这样更容易了解每隔一段时间发生的事情。同时也支持各种用户定义聚合函数(User Defined Aggregate Function,UDAF)。
另外,Structured Streaming可通过不同触发器间分布式存储的状态来进行聚合,状态被存储在内存中,归档采用HDFS的Write Ahead Log (WAL)机制。当然,Structured Streaming还可自动处理过时的数据,更新旧的保存状态。因为历史状态记录可能无限增长,这会带来一些性能问题,为了限制状态记录的大小,Spark使用水印(watermarking)来删除不再更新的旧的聚合数据。允许支持自定义状态函数,比如事件或处理时间的超时,同时支持Scala和Java。
在苹果的信息安全平台中,每秒将产生有百万级事件,Structured Streaming可以用来做缺陷检测,下图是该平台架构:
四、Spark Structured Streaming 开发
4.1 Handling Event-time
Event-time is the time embedded in the data itself, not the time when the data is received. (事件时间是嵌入数据本身的时间,而不是接收数据的时间。)
The event-time is a column value in the each row (事件时间是每行中的一个列值)
4.2 Handling Late Data
The event-time model allows to handle data that has arrived later than expected. (事件时间模型允许处理晚于预期的数据。)
Update the old aggregates when there is late data (当有延迟的数据时,更新旧的聚合) Clean up old aggregates (清理旧的数据集)
Watermarking(水印)
A time threshold for how late data can still be handled. (延迟数据处理的时间阈值。)
4.3 Using DataFrame & Dataset
case class WordCount(word: String, count: Int)
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
// Split the lines into words
val df = lines.as[String].flatMap(_.split(" ").map(w => (w, 1))).withColumnRenamed("_1", "word").withColumnRenamed("_2", "count")
df.createOrReplaceTempView("updates")
val df1 = spark.sql("select count(*) from updates")
val df = df.as[WordCount]
import org.apache.spark.sql.expressions.scalalang.typed
val result = ds.groupByKey(_.word.substring(0, 1)).agg(typed.sum(_.count))
// Start running the query that prints the running counts to the console
val query = result.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
4.4 “Legal” Join Types
4.5 Streaming Deduplication (流去重)
Duplicate records can be dropped in data streams using a unique identifier in the events. (可以使用事件中的唯一标识符在数据流中删除重复的记录。)
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// Without watermark using guid column
streamingDf.dropDuplicates("guid")
// With watermark using guid and eventTime columns
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime")
4.6 Unsupported / Changed Operations (不支持/更改的操作)
◆ Unsupported Operations:
Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) Limit and take first N rows Distinct operations on streaming Datasets Few types of outer joins on streaming Datasets Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.
◆ Changed uses:
count() → ds.groupBy().count foreach() → ds.writeStream.foreach( … ) show() → use the console sink
4.7 Input Sources for Structured Streaming (输入源)
File source
val eventSchema = new StructType().add("event_id", "string").add("user_id", "string")
.add("start_time", "string").add("address", "string")
.add("city", "string").add("state", "string")
.add("country", "string")
.add("latitude", "float").add("longitude", "float")
val dfEvents = spark.readStream.option("sep", ",").schema(eventSchema).csv("hdfs:///user/events/events_*.csv")
val result = dfEvents.groupBy("user_id").count()
// Start running the query that prints the running counts to the console
val query = result.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
Kafka source Socket source (test only) Rate source (test only)