其他

Spark的Structured Streaming是如何解决乱序的事件时间的

2017-10-27 孙彪彪 AI前线
作者 | 孙彪彪
编辑 | Vincent

AI 前线导读: 本文主要讲述 Spark’s Structured Streaming  中是如何按照事件时间进行聚合, 以及如何处理乱序到达的事件日志。作者之前阐述过类似的话题,可参考这篇之前的 文章


今天具体聊下 Spark’s Structured Streaming  中是如何按照事件时间进行聚合, 以及 Watermarking 概念。


更多干货内容请关注微信公众号“AI 前线”,ID:ai-front

关注人工智能的落地实践,与企业一起探寻 AI 的边界,AICon 全球人工智能技术大会火热售票中,6 折倒计时一周抢票,报名链接请参看【阅读原文】
问题描述

在讨论解决消息乱序问题之前,需先定义时间和顺序。在流处理中,时间的概念有两个:

  • Event time :Event time 是事件发生的时间,经常以时间戳表示,并和数据一起发送。带时间戳的数据流有,Web 服务日志、监控 agent 的日志、移动端日志等;

  • Processing time  :Processing time 是处理事件数据的服务器时间,一般是运行流处理应用的服务器时钟。

我们看下下面这个示意图

上图中 time1,time2, time3 等是我们 Spark straming 拿到消息将要处理的时间, 图中方块中的数字代表这个 event 产生的时间, 有可能因为网络抖动导致部分机器上的日志收集产生了延迟, 在 time3 的 batch 中包含 event time 为 2 的日志, 特别说明一下, kafka 中的不同分区的消息也是没有顺序的。在实时处理过程中也就产生了两个问题:

  • Spark streaming 从 Kafka 中拉取到的一批数据,我们可能认为里面包含多个时间区间的数据

  • 同一个时间的数据可能出现在多个 batch 中

持续增量聚合

我们来看下在 Structured Streaming  中我们是如何处理上述棘手的问题的, Structured Streaming  可以在实时数据上进行 sql 查询聚合,就跟离线数据查询聚合类似,  Spark SQL 可以不断地在实时数据上增量聚合, 如果你想看下一个设备的信号量的平均大小, 你可能会写以下代码:

我们本文描述的情况不同的是, 这里得到的结果是不断随着新数据的到来动态更新的。 你可以使用不同的模式把结果落地到外部存储上, 参考这里

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#starting-streaming-queries

当然这里使用的聚合函数你也可以自定义,参考

https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

在事件时间上做聚合

有时候我们不想在整个数据流上做聚合, 而是想在时间窗口上做聚合(每过去 5 分钟或者每过去一小时),如果是上个例子,那么现在我们想看下每过去 5 分钟的一个平均信号量,是否有异常,注意我们这里的时间是日志产生的时间,而不是日志达到处理系统的时间, 这个需求是不是很变态, 但是  Spark’s Structured Streaming 依然可以满足你,这就是我那么喜欢 spark 的原因。

之前 spark streaming 里面很难做到, 可能是设计 spark streaming 的时候,还没有出现这么变态的需求,不过, 在 Structured Streaming 里面,你只要使用 window() 函数就解决这个问题了,我们要每 5 分钟统计过去 5 分钟内的所有设备产生日志的条数,要按照事件产生的时间聚合哟,代码这样写:

是不是很简单,我贴个图,让你更清楚地看到过程

是不是很清晰明白。

当然如果我们可以给 spark 提一些更加变态的需求,我想要 每 5 分钟统计过去 10 分钟内的所有设备产生日志的条数,也是要按照事件产生的时间聚合哟, 这个时间就重叠了,代码这样写:

轻松搞定,处理过程如下:

这里有人就有疑问了, 如果一条日志因为网络原因迟到了怎么办, Structured Streaming  还是会很耿直地把这条日志统计到属于它的分组里面。

每 5 分钟统计过去 10 分钟内每个设备(注意这样按照 设备 和 重叠时间窗口同时分组)产生日志的条数, 如果一条日志因为网络原因迟到了,还是会把这条日志统计到属于它的分组里面。

有状态的增量查询

如果你想知道上述这么强大的功能是怎么实现的,我这里就掰扯掰扯,其实就是  Spark SQL 在内部维护了一个高容错的中间状态存储, 存储结构就是个 key-value 对, key 就对应分组, value 就对应每次增量统计后的一个聚合结果, 当然每次增量统计,就对应  key-value 的一个新的版本, 状态就从旧的状态到了新的状态, 所以才认为是有状态的。

我们都知道,有状态的数据只存在内存中就是靠不住的,老办法, 还是要使用 WAL(write ahead logs, 就是记录下每次增量更新操作的日志) 的老办法,  然后间断的进行 checkpoint , 如果某个时间点系统 挂了, 就从 checkpoint 的点开始使用 WAL 进行重放(replay)

这套东西在数据库中都被玩了几千几万遍了,Structured Streaming 在这里又玩了一遍,就是为了满足你们要的 exactly-once 语义保证。

原理图参考下面:

当然你会发现, 因为我们的数据源是一个 流, 也就是这个数据是无限的, 如果我们要保存无限的数据的状态,肯定是不行的,为了资源, 也为了性能, 你必须要做权衡了, 这里的权衡就是你要做出决定,落后多久以后的数据即使来了,我们也不更新老 key-value 的状态了, 比如每个 设备在 2018 年 1 月 1 号 1 点 1 分 1 秒  的时间点发来了一条  2017 年 1 月 1 号 1 点 1 分 1 秒 的日志,我们选择忽略。

watermarking 是个什么玩意

我们上文提到我们会为做成决定,落后多久以后的数据即使来了, 我们也不要了, 这个 watermarking 概念就是来定义这个等待的时间, 举个例子, 我们如果定义最大的延迟时间是 10 分钟, 这就意味着 事件时间 落后当前时间 10 分钟内的 日志会被拿来统计指标, 如果再迟了就不管了。 假如 现在 12:33, 如果某条日志的时间时间是 12:23, 我们就直接 drop 掉,那么 12:23 之前的 key-value 对的状态就静止不动了,也就可以不用维护状态了, 别看这个 watermarking 看起来很高大上,其实就是这么回事。

代码也很简单,可以这样写:

Spark SQL  内部会自动跟踪这个最大 可见的 事件时间,   用来 drop 掉过时的日志 和 静止不动的状态。

原理见下图

我们可以看到, x 轴是处理集群处理的时间,  y 轴 代码 事件的时间,然后有一条动态的水位线, 如果在水位下面的日志,我们就不要了,是不是很简单。



点击【阅读原文】进入 AICon 官网页面

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存