其他
Flink最难知识点再解析 | 时间/窗口/水印/迟到数据处理
该窗口中存在数据
事件时间到达窗口的结束时间
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
/**
* 水印测试
*/
object WaterMarkFunc01 {
// 线程安全的时间格式化对象
val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS")
def main(args: Array[String]): Unit = {
val hostName = "s102"
val port = 9000
val delimiter = '\n'
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 将EventTime设置为流数据时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val streams: DataStream[String] = env.socketTextStream(hostName, port, delimiter)
import org.apache.flink.api.scala._
val data = streams.map(data => {
// 输入数据格式:name:时间戳
// flink:1559223685000
try {
val items = data.split(":")
(items(0), items(1).toLong)
} catch {
case _: Exception => println("输入数据不符合格式:" + data)
("0", 0L)
}
}).filter(data => !data._1.equals("0") && data._2 != 0L)
//为数据流中的元素分配时间戳,并定期创建水印以监控事件时间进度
val waterStream: DataStream[(String, Long)] = data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
// 事件时间
var currentMaxTimestamp = 0L
val maxOutOfOrderness = 3000L
var lastEmittedWatermark: Long = Long.MinValue
// Returns the current watermark
override def getCurrentWatermark: Watermark = {
// 允许延迟三秒
val potentialWM = currentMaxTimestamp - maxOutOfOrderness
// 保证水印能依次递增
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM
}
new Watermark(lastEmittedWatermark)
}
// Assigns a timestamp to an element, in milliseconds since the Epoch
override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
// 将元素的时间字段值作为该数据的timestamp
val time = element._2
if (time > currentMaxTimestamp) {
currentMaxTimestamp = time
}
val outData = String.format("key: %s EventTime: %s waterMark: %s",
element._1,
sdf.format(time),
sdf.format(getCurrentWatermark.getTimestamp))
println(outData)
time
}
})
val result: DataStream[String] = waterStream.keyBy(0)// 根据name值进行分组
.window(TumblingEventTimeWindows.of(Time.seconds(5L)))// 5s跨度的基于事件时间的翻滚窗口
.apply(new WindowFunction[(String, Long), String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val timeArr = ArrayBuffer[String]()
val iterator = input.iterator
while (iterator.hasNext) {
val tup2 = iterator.next()
timeArr.append(sdf.format(tup2._2))
}
val outData = String.format("key: %s data: %s startTime: %s endTime: %s",
key.toString,
timeArr.mkString("-"),
sdf.format(window.getStart),
sdf.format(window.getEnd))
out.collect(outData)
}
})
result.print("window计算结果:")
env.execute(this.getClass.getName)
}
}
还记得我们开始说的吗?
flink会根据window的间隔时间进行时间窗口范围的划分(与数据进入flink的时间无关)
1、allowedLateness(lateness: Time)
def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {
javaStream.allowedLateness(lateness)
this
}
2、sideOutputLateData(outputTag: OutputTag[T])
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
javaStream.sideOutputLateData(outputTag)
this
}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
/**
* 延迟测试
*/
object WaterMarkFunc02 {
// 线程安全的时间格式化对象
val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS")
def main(args: Array[String]): Unit = {
val hostName = "s102"
val port = 9000
val delimiter = '\n'
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 将EventTime设置为流数据时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val streams: DataStream[String] = env.socketTextStream(hostName, port, delimiter)
import org.apache.flink.api.scala._
val data = streams.map(data => {
// 输入数据格式:name:时间戳
// flink:1559223685000
try {
val items = data.split(":")
(items(0), items(1).toLong)
} catch {
case _: Exception => println("输入数据不符合格式:" + data)
("0", 0L)
}
}).filter(data => !data._1.equals("0") && data._2 != 0L)
//为数据流中的元素分配时间戳,并定期创建水印以监控事件时间进度
val waterStream: DataStream[(String, Long)] = data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
// 事件时间
var currentMaxTimestamp = 0L
val maxOutOfOrderness = 3000L
var lastEmittedWatermark: Long = Long.MinValue
// Returns the current watermark
override def getCurrentWatermark: Watermark = {
// 允许延迟三秒
val potentialWM = currentMaxTimestamp - maxOutOfOrderness
// 保证水印能依次递增
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM
}
new Watermark(lastEmittedWatermark)
}
// Assigns a timestamp to an element, in milliseconds since the Epoch
override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
// 将元素的时间字段值作为该数据的timestamp
val time = element._2
if (time > currentMaxTimestamp) {
currentMaxTimestamp = time
}
val outData = String.format("key: %s EventTime: %s waterMark: %s",
element._1,
sdf.format(time),
sdf.format(getCurrentWatermark.getTimestamp))
println(outData)
time
}
})
val lateData = new OutputTag[(String,Long)]("late")
val result: DataStream[String] = waterStream.keyBy(0)// 根据name值进行分组
.window(TumblingEventTimeWindows.of(Time.seconds(5L)))// 5s跨度的基于事件时间的翻滚窗口
/**
* 对于此窗口而言,允许2秒的迟到数据,即第一次触发是在watermark > end-of-window时
* 第二次(或多次)触发的条件是watermark < end-of-window + allowedLateness时间内,这个窗口有late数据到达
*/
.allowedLateness(Time.seconds(2L))
.sideOutputLateData(lateData)
.apply(new WindowFunction[(String, Long), String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val timeArr = ArrayBuffer[String]()
val iterator = input.iterator
while (iterator.hasNext) {
val tup2 = iterator.next()
timeArr.append(sdf.format(tup2._2))
}
val outData = String.format("key: %s data: %s startTime: %s endTime: %s",
key.toString,
timeArr.mkString("-"),
sdf.format(window.getStart),
sdf.format(window.getEnd))
out.collect(outData)
}
})
result.print("window计算结果:")
val late = result.getSideOutput(lateData)
late.print("迟到的数据:")
env.execute(this.getClass.getName)
}
}
接下来开始输入数据进行测试验证:
可以看到window范围为【15-20】,这时候我们再输入几条属于该范围的数据:
输入了事件时间为17、16、15三条数据,都触发了window操作,那我们试着输入一下窗口范围为【10-15】的数据:
长按扫描下方👇二维码注册
加群主微信:whispererrr,送一份2T的Java和大数据学习大礼包!
文章不错?点个【在看】吧! 👇