查看原文
其他

Spark 的 Structured Streaming是如何搞定乱序的事件时间的

孙彪彪 张江打工人 2021-09-05

首发个人公众号  spark技术分享 ,  同步个人网站  coolplayer.net ,未经本人同意,禁止一切转载

我们本文主要讲述 Spark’s Structured Streaming  中是如何按照事件时间进行聚合, 以及如何处理乱序到达的事件日志。

我之前阐述过类似的话题,参考这篇之前写 文章

,今天具体聊下 Spark’s Structured Streaming  中是如何按照事件时间进行聚合, 以及 Watermarking 概念。

问题描述

在讨论解决消息乱序问题之前,需先定义时间和顺序。在流处理中,时间的概念有两个:

  • Event time :Event time是事件发生的时间,经常以时间戳表示,并和数据一起发送。带时间戳的数据流有,Web服务日志、监控agent的日志、移动端日志等;

  • Processing time  :Processing time是处理事件数据的服务器时间,一般是运行流处理应用的服务器时钟。


我们看下下面这个示意图


上图中 time1,time2, time3等是我们 Spark straming 拿到消息将要处理的时间, 图中方块中的数字代表这个event 产生的时间, 有可能因为网络抖动导致部分机器上的日志收集产生了延迟, 在time3的batch中包含event time 为2的日志, 特别说明一下, kafka 中的不同分区的消息也是没有顺序的。

在实时处理过程中也就产生了两个问题:

  • Spark streaming 从Kafka 中拉取到的一批数据,我们可能认为里面包含多个时间区间的数据

  • 同一个时间的数据可能出现在多个 batch 中

持续增量聚合

我们来看下在 Structured Streaming  中我们是如何处理上述棘手的问题的, Structured Streaming  可以在实时数据上进行 sql 查询聚合,就跟离线数据查询聚合类似,  Spark SQL 可以不断地在实时数据上增量聚合, 如果你想看下一个设备的信号量的平均大小, 你可能会写以下代码:

# DataFrame w/ schema [eventTime: timestamp, deviceId: string, signal: bigint] eventsDF = ... avgSignalDF = eventsDF.groupBy("deviceId").avg("signal")

我们本文描述的情况不同的是, 这里得到的结果是不断随着新数据的到来动态更新的。 你可以使用不同的模式把结果落地到外部存储上, 参考这里 http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#starting-streaming-queries

当然这里使用的聚合函数你也可以自定义,参考 https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

在事件时间上做聚合

有时候我们不想在整个数据流上做聚合, 而是想在时间窗口上做聚合(每过去5分钟或者每过去一小时),如果是上个例子,那么现在我们想看下每过去5分钟的一个平均信号量,是否有异常,注意我们这里的时间是日志产生的时间,而不是日志达到处理系统的时间, 这个需求是不是很变态, 但是  Spark’s Structured Streaming 依然可以满足你,这就是我那么喜欢 spark 的原因。

之前 spark streaming 里面很难做到, 可能是设计 spark streaming 的时候,还没有出现这么变态的需求,不过, 在 Structured Streaming 里面,你只要使用 window()函数就解决这个问题了,我们要每5分钟统计过去5分钟内的所有设备产生日志的条数,要按照事件产生的时间聚合哟,代码这样写:

from pyspark.sql.functions import * windowedAvgSignalDF = \  eventsDF \    .groupBy(window("eventTime", "5 minute")) \    .count()

是不是很简单,我贴个图,让你更清楚地看到过程

是不是很清晰明白。

当然如果我们可以给 spark 提一些更加变态的需求,我想要 每5分钟统计过去10分钟内的所有设备产生日志的条数,也是要按照事件产生的时间聚合哟, 这个时间就重叠了,代码这样写:

from pyspark.sql.functions import * windowedAvgSignalDF = \  eventsDF \    .groupBy(window("eventTime", "10 minutes", "5 minutes")) \    .count()

轻松搞定,处理过程如下:



这里有人就有疑问了, 如果一条日志因为网络原因迟到了怎么办, Structured Streaming  还是会很耿直地把这条日志统计到属于它的分组里面。


每5分钟统计过去10分钟内每个设备(注意这样按照 设备 和 重叠时间窗口同时分组)产生日志的条数, 如果一条日志因为网络原因迟到了,还是会把这条日志统计到属于它的分组里面。

有状态的增量查询

如果你想知道上述这么强大的功能是怎么实现的,我这里就掰扯掰扯,其实就是  Spark SQL 在内部维护了一个高容错的中间状态存储, 存储结构就是个 key-value 对, key 就对应分组, value 就对应每次增量统计后的一个聚合结果, 当然每次增量统计,就对应  key-value 的一个新的版本, 状态就从旧的状态到了新的状态, 所以才认为是有状态的。

我们都知道,有状态的数据只存在内存中就是靠不住的,老办法, 还是要使用 WAL(write ahead logs, 就是记录下每次增量更新操作的日志) 的老办法,  然后间断的进行 checkpoint , 如果某个时间点系统 挂了, 就从 checkpoint 的点开始使用 WAL 进行重放(replay)

这套东西在数据库中都被玩了几千几万遍了,Structured Streaming 在这里又玩了一遍,就是为了满足你们要的 exactly-once 语义保证。

原理图参考下面:



当然你会发现, 因为我们的数据源是一个 流, 也就是这个数据是无限的, 如果我们要保存无限的数据的状态,肯定是不行的,为了资源, 也为了性能, 你必须要做权衡了, 这里的权衡就是你要做出决定,落后多久以后的数据即使来了,我们也不更新老 key-value 的状态了, 比如每个 设备在 2018年1月1号1点1分1秒  的时间点发来了一条  2017年1月1号1点1分1秒 的日志,我们选择忽略。

watermarking 是个什么玩意

我们上文提到我们会为做成决定,落后多久以后的数据即使来了, 我们也不要了, 这个 watermarking 概念就是来定义这个等待的时间, 举个例子, 我们如果定义最大的延迟时间是 10 分钟, 这就意味着 事件时间 落后当前时间 10分钟内的 日志会被拿来统计指标, 如果再迟了就不管了。 假如 现在 12:33, 如果某条日志的时间时间是 12:23, 我们就直接drop 掉,那么 12:23 之前的key-value对的状态就静止不动了,也就可以不用维护状态了, 别看这个 watermarking 看起来很高大上,其实就是这么回事。

代码也很简单,可以这样写:

windowedCountsDF = \  eventsDF \    .withWatermark("eventTime", "10 minutes") \    .groupBy(      "deviceId",      window("eventTime", "10 minutes", "5 minutes")) \    .count()

Spark SQL  内部会自动跟踪这个最大 可见的 事件时间,   用来drop 掉过时的日志 和 静止不动的状态。

原理见下图



我们可以看到, x 轴是处理集群处理的时间,  y轴 代码 事件的时间,然后有一条动态的水位线, 如果在水位下面的日志,我们就不要了,是不是很简单。


欢迎关注 spark技术分享     

                           



: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存