Spark 的 Structured Streaming是如何搞定乱序的事件时间的
首发个人公众号 spark技术分享 , 同步个人网站 coolplayer.net ,未经本人同意,禁止一切转载
我们本文主要讲述 Spark’s Structured Streaming 中是如何按照事件时间进行聚合, 以及如何处理乱序到达的事件日志。
我之前阐述过类似的话题,参考这篇之前写 文章
,今天具体聊下 Spark’s Structured Streaming 中是如何按照事件时间进行聚合, 以及 Watermarking 概念。
问题描述
在讨论解决消息乱序问题之前,需先定义时间和顺序。在流处理中,时间的概念有两个:
Event time :Event time是事件发生的时间,经常以时间戳表示,并和数据一起发送。带时间戳的数据流有,Web服务日志、监控agent的日志、移动端日志等;
Processing time :Processing time是处理事件数据的服务器时间,一般是运行流处理应用的服务器时钟。
我们看下下面这个示意图
上图中 time1,time2, time3等是我们 Spark straming 拿到消息将要处理的时间, 图中方块中的数字代表这个event 产生的时间, 有可能因为网络抖动导致部分机器上的日志收集产生了延迟, 在time3的batch中包含event time 为2的日志, 特别说明一下, kafka 中的不同分区的消息也是没有顺序的。
在实时处理过程中也就产生了两个问题:
Spark streaming 从Kafka 中拉取到的一批数据,我们可能认为里面包含多个时间区间的数据
同一个时间的数据可能出现在多个 batch 中
持续增量聚合
我们来看下在 Structured Streaming 中我们是如何处理上述棘手的问题的, Structured Streaming 可以在实时数据上进行 sql 查询聚合,就跟离线数据查询聚合类似, Spark SQL 可以不断地在实时数据上增量聚合, 如果你想看下一个设备的信号量的平均大小, 你可能会写以下代码:
# DataFrame w/ schema [eventTime: timestamp, deviceId: string, signal: bigint]
eventsDF = ...
avgSignalDF = eventsDF.groupBy("deviceId").avg("signal")
我们本文描述的情况不同的是, 这里得到的结果是不断随着新数据的到来动态更新的。 你可以使用不同的模式把结果落地到外部存储上, 参考这里 http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#starting-streaming-queries
当然这里使用的聚合函数你也可以自定义,参考 https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html
在事件时间上做聚合
有时候我们不想在整个数据流上做聚合, 而是想在时间窗口上做聚合(每过去5分钟或者每过去一小时),如果是上个例子,那么现在我们想看下每过去5分钟的一个平均信号量,是否有异常,注意我们这里的时间是日志产生的时间,而不是日志达到处理系统的时间, 这个需求是不是很变态, 但是 Spark’s Structured Streaming 依然可以满足你,这就是我那么喜欢 spark 的原因。
之前 spark streaming 里面很难做到, 可能是设计 spark streaming 的时候,还没有出现这么变态的需求,不过, 在 Structured Streaming 里面,你只要使用 window()函数就解决这个问题了,我们要每5分钟统计过去5分钟内的所有设备产生日志的条数,要按照事件产生的时间聚合哟,代码这样写:
from pyspark.sql.functions import *
windowedAvgSignalDF = \
eventsDF \
.groupBy(window("eventTime", "5 minute")) \
.count()
是不是很简单,我贴个图,让你更清楚地看到过程
是不是很清晰明白。
当然如果我们可以给 spark 提一些更加变态的需求,我想要 每5分钟统计过去10分钟内的所有设备产生日志的条数,也是要按照事件产生的时间聚合哟, 这个时间就重叠了,代码这样写:
from pyspark.sql.functions import *
windowedAvgSignalDF = \
eventsDF \
.groupBy(window("eventTime", "10 minutes", "5 minutes")) \
.count()
轻松搞定,处理过程如下:
这里有人就有疑问了, 如果一条日志因为网络原因迟到了怎么办, Structured Streaming 还是会很耿直地把这条日志统计到属于它的分组里面。
每5分钟统计过去10分钟内每个设备(注意这样按照 设备 和 重叠时间窗口同时分组)产生日志的条数, 如果一条日志因为网络原因迟到了,还是会把这条日志统计到属于它的分组里面。
有状态的增量查询
如果你想知道上述这么强大的功能是怎么实现的,我这里就掰扯掰扯,其实就是 Spark SQL 在内部维护了一个高容错的中间状态存储, 存储结构就是个 key-value 对, key 就对应分组, value 就对应每次增量统计后的一个聚合结果, 当然每次增量统计,就对应 key-value 的一个新的版本, 状态就从旧的状态到了新的状态, 所以才认为是有状态的。
我们都知道,有状态的数据只存在内存中就是靠不住的,老办法, 还是要使用 WAL(write ahead logs, 就是记录下每次增量更新操作的日志) 的老办法, 然后间断的进行 checkpoint , 如果某个时间点系统 挂了, 就从 checkpoint 的点开始使用 WAL 进行重放(replay)
这套东西在数据库中都被玩了几千几万遍了,Structured Streaming 在这里又玩了一遍,就是为了满足你们要的 exactly-once 语义保证。
原理图参考下面:
当然你会发现, 因为我们的数据源是一个 流, 也就是这个数据是无限的, 如果我们要保存无限的数据的状态,肯定是不行的,为了资源, 也为了性能, 你必须要做权衡了, 这里的权衡就是你要做出决定,落后多久以后的数据即使来了,我们也不更新老 key-value 的状态了, 比如每个 设备在 2018年1月1号1点1分1秒 的时间点发来了一条 2017年1月1号1点1分1秒 的日志,我们选择忽略。
watermarking 是个什么玩意
我们上文提到我们会为做成决定,落后多久以后的数据即使来了, 我们也不要了, 这个 watermarking 概念就是来定义这个等待的时间, 举个例子, 我们如果定义最大的延迟时间是 10 分钟, 这就意味着 事件时间 落后当前时间 10分钟内的 日志会被拿来统计指标, 如果再迟了就不管了。 假如 现在 12:33, 如果某条日志的时间时间是 12:23, 我们就直接drop 掉,那么 12:23 之前的key-value对的状态就静止不动了,也就可以不用维护状态了, 别看这个 watermarking 看起来很高大上,其实就是这么回事。
代码也很简单,可以这样写:
windowedCountsDF = \
eventsDF \
.withWatermark("eventTime", "10 minutes") \
.groupBy(
"deviceId",
window("eventTime", "10 minutes", "5 minutes")) \
.count()
Spark SQL 内部会自动跟踪这个最大 可见的 事件时间, 用来drop 掉过时的日志 和 静止不动的状态。
原理见下图
我们可以看到, x 轴是处理集群处理的时间, y轴 代码 事件的时间,然后有一条动态的水位线, 如果在水位下面的日志,我们就不要了,是不是很简单。
欢迎关注 spark技术分享