其他
Flink会话窗口和定时器原理详解
点击上方蓝色字体,选择“设为星标”
回复”资源“获取更多资源
前言
AggregateFunction
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
OUT getResult(ACC accumulator);
ACC merge(ACC a, ACC b);
}
Session Window & MergingWindowAssigner
private static final long serialVersionUID = 1L;
public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);
public interface MergeCallback<W> {
void merge(Collection<W> toBeMerged, W mergeResult);
}
}
// sort the windows by the start time and then merge overlapping windows
List<TimeWindow> sortedWindows = new ArrayList<>(windows);
Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
@Override
public int compare(TimeWindow o1, TimeWindow o2) {
return Long.compare(o1.getStart(), o2.getStart());
}
});
List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;
for (TimeWindow candidate: sortedWindows) {
if (currentMerge == null) {
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
} else if (currentMerge.f0.intersects(candidate)) {
currentMerge.f0 = currentMerge.f0.cover(candidate);
currentMerge.f1.add(candidate);
} else {
merged.add(currentMerge);
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
}
}
if (currentMerge != null) {
merged.add(currentMerge);
}
for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
if (m.f1.size() > 1) {
c.merge(m.f1, m.f0);
}
}
}
// TimeWindow.intersects()
public boolean intersects(TimeWindow other) {
return this.start <= other.end && this.end >= other.start;
}
// TimeWindow.cover()
public TimeWindow cover(TimeWindow other) {
return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
}
MergingWindowSet
private final Map<W, W> mapping;
// Mapping when we created the MergingWindowSet...
private final Map<W, W> initialMapping;
private final ListState<Tuple2<W, W>> state;
public W getStateWindow(W window) {
return mapping.get(window);
}
public void persist() throws Exception {
if (!mapping.equals(initialMapping)) {
state.clear();
for (Map.Entry<W, W> window : mapping.entrySet()) {
state.add(new Tuple2<>(window.getKey(), window.getValue()));
}
}
}
List<W> windows = new ArrayList<>();
windows.addAll(this.mapping.keySet());
windows.add(newWindow);
final Map<W, Collection<W>> mergeResults = new HashMap<>();
windowAssigner.mergeWindows(windows,
new MergingWindowAssigner.MergeCallback<W>() {
@Override
public void merge(Collection<W> toBeMerged, W mergeResult) {
if (LOG.isDebugEnabled()) {
LOG.debug("Merging {} into {}", toBeMerged, mergeResult);
}
mergeResults.put(mergeResult, toBeMerged);
}
});
W resultWindow = newWindow;
boolean mergedNewWindow = false;
// perform the merge
for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
W mergeResult = c.getKey();
Collection<W> mergedWindows = c.getValue();
// if our new window is in the merged windows make the merge result the
// result window
if (mergedWindows.remove(newWindow)) {
mergedNewWindow = true;
resultWindow = mergeResult;
}
// pick any of the merged windows and choose that window's state window
// as the state window for the merge result
W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next());
// figure out the state windows that we are merging
List<W> mergedStateWindows = new ArrayList<>();
for (W mergedWindow: mergedWindows) {
W res = this.mapping.remove(mergedWindow);
if (res != null) {
mergedStateWindows.add(res);
}
}
this.mapping.put(mergeResult, mergedStateWindow);
// don't put the target state window into the merged windows
mergedStateWindows.remove(mergedStateWindow);
// don't merge the new window itself, it never had any state associated with it
// i.e. if we are only merging one pre-existing window into itself
// without extending the pre-existing window
if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
mergeFunction.merge(mergeResult,
mergedWindows,
this.mapping.get(mergeResult),
mergedStateWindows);
}
}
// the new window created a new, self-contained window without merging
if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) {
this.mapping.put(resultWindow, resultWindow);
}
return resultWindow;
}
mergeResult:窗口的时域合并结果;
mergedWindows:本次被合并的窗口集合;
mergedStateWindow:将要合并的状态窗口结果(目前就是基准的状态窗口);
mergedStateWindows:本次被合并的状态窗口集合。
/**
* This gets called when a merge occurs.
*
* @param mergeResult The newly resulting merged {@code Window}.
* @param mergedWindows The merged {@code Window Windows}.
* @param stateWindowResult The state window of the merge result.
* @param mergedStateWindows The merged state windows.
* @throws Exception
*/
void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception;
}
Window Merging in WindowOperator
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window: elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
@Override
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
// ...
} else if (!windowAssigner.isEventTime()) {
// ...
}
triggerContext.key = key;
triggerContext.window = mergeResult;
triggerContext.onMerge(mergedWindows);
for (W m: mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
// drop if the window is already late
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
isSkippedElement = false;
W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = actualWindow;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(actualWindow, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in state
mergingWindows.persist();
} else {
// ......
}
// ......
}
调用TriggerContext.onMerge()方法,更新触发器注册的定时器时间,然后遍历所有被合并的原始窗口,调用TriggerContext.clear()方法删除它们的触发器,保证合并后的窗口能够被正确地触发;
调用InternalMergingState.mergeNamespaces()方法,将待合并窗口的状态与基准窗口的状态合并,产生的stateWindowResult就是状态窗口。
Back on AggregateFunction
基于堆的AbstractHeapMergingState→HeapAggregatingState
public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
if (sources == null || sources.isEmpty()) {
return; // nothing to do
}
final StateTable<K, N, SV> map = stateTable;
SV merged = null;
// merge the sources
for (N source : sources) {
// get and remove the next source per namespace/key
SV sourceState = map.removeAndGetOld(source);
if (merged != null && sourceState != null) {
merged = mergeState(merged, sourceState); // <----
} else if (merged == null) {
merged = sourceState;
}
}
// merge into the target, if needed
if (merged != null) {
map.transform(target, merged, mergeTransformation);
}
}
@Override
protected ACC mergeState(ACC a, ACC b) {
return aggregateTransformation.aggFunction.merge(a, b); // <----
}
基于RocksDB的RocksDBAggregatingState
public void mergeNamespaces(N target, Collection<N> sources) {
if (sources == null || sources.isEmpty()) {
return;
}
try {
ACC current = null;
// merge the sources to the target
for (N source : sources) {
if (source != null) {
setCurrentNamespace(source);
final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily, writeOptions, sourceKey);
if (valueBytes != null) {
dataInputView.setBuffer(valueBytes);
ACC value = valueSerializer.deserialize(dataInputView);
if (current != null) {
current = aggFunction.merge(current, value); // <----
}
else {
current = value;
}
}
}
}
// ......
}
catch (Exception e) {
throw new FlinkRuntimeException("Error while merging state in RocksDB", e);
}
}
Flink定时器
注册处理时间定时器,直到系统的processingTime超过了注册的时间就会触发定时任务
注册事件时间定时器,直到watermark值超过了注册的时间就会触发定时任务另外也可以删除已经注册的定时器。
package com.bolingcavalry.keyedprocessfunction;
public class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
package com.bolingcavalry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
if(StringUtils.isNullOrWhitespaceOnly(s)) {
System.out.println("invalid line");
return;
}
for(String word : s.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
package com.bolingcavalry.keyedprocessfunction;
import com.bolingcavalry.Splitter;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @description 体验KeyedProcessFunction类(时间类型是处理时间)
*/
public class ProcessTime {
/**
* KeyedProcessFunction的子类,作用是将每个单词最新出现时间记录到backend,并创建定时器,
* 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子
*/
static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {
// 自定义状态
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化状态,name是myState
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(
Tuple2<String, Integer> value,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// 取得当前是哪个单词
Tuple currentKey = ctx.getCurrentKey();
// 从backend取得当前单词的myState状态
CountWithTimestamp current = state.value();
// 如果myState还从未没有赋值过,就在此初始化
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// 单词数量加一
current.count++;
// 取当前元素的时间戳,作为该单词最后一次出现的时间
current.lastModified = ctx.timestamp();
// 重新保存到backend,包括该单词出现的次数,以及最后一次出现的时间
state.update(current);
// 为当前单词创建定时器,十秒后后触发
long timer = current.lastModified + 10000;
ctx.timerService().registerProcessingTimeTimer(timer);
// 打印所有信息,用于核对数据正确性
System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)\n\n",
currentKey.getField(0),
current.count,
current.lastModified,
time(current.lastModified),
timer,
time(timer)));
}
/**
* 定时器触发后执行的方法
* @param timestamp 这个时间戳代表的是该定时器的触发时间
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// 取得当前单词
Tuple currentKey = ctx.getCurrentKey();
// 取得该单词的myState状态
CountWithTimestamp result = state.value();
// 当前元素是否已经连续10秒未出现的标志
boolean isTimeout = false;
// timestamp是定时器触发时间,如果等于最后一次更新时间+10秒,就表示这十秒内已经收到过该单词了,
// 这种连续十秒没有出现的元素,被发送到下游算子
if (timestamp == result.lastModified + 10000) {
// 发送
out.collect(new Tuple2<String, Long>(result.key, result.count));
isTimeout = true;
}
// 打印数据,用于核对是否符合预期
System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %s\n\n",
currentKey.getField(0),
result.count,
result.lastModified,
time(result.lastModified),
timestamp,
time(timestamp),
String.valueOf(isTimeout)));
}
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 并行度1
env.setParallelism(1);
// 处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 监听本地9999端口,读取字符串
DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);
// 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到
DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream
// 对收到的字符串用空格做分割,得到多个单词
.flatMap(new Splitter())
// 设置时间戳分配器,用当前时间作为时间戳
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
@Override
public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
// 使用当前系统时间作为时间戳
return System.currentTimeMillis();
}
@Override
public Watermark getCurrentWatermark() {
// 本例不需要watermark,返回null
return null;
}
})
// 将单词作为key分区
.keyBy(0)
// 按单词分区后的数据,交给自定义KeyedProcessFunction处理
.process(new CountWithTimeoutFunction());
// 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来
timeOutWord.print();
env.execute("ProcessFunction demo : KeyedProcessFunction");
}
public static String time(long timeStamp) {
return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
}
}
定时器原理
public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement {
// Key 表示KeyedStream中提取的Key
@Nonnull
private final K key;
// Namespace 表示命名空间,在普通的KeyedStream中是固定的VoidNamespace,在WindowedStream表示的是Window
@Nonnull
private final N namespace;
// Timestamp表示触发的时间戳,在优先级队列中升序排序
// 由于该类重写了equals方法,在插入队列,即使尝试重复插入相同的TimerHeapInternalTimer对象多次,也会确保只有一个TimerHeapInternalTimer对象入队成功。详情见下文。
private final long timestamp;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o instanceof InternalTimer) {
InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
return timestamp == timer.getTimestamp()
&& key.equals(timer.getKey())
&& namespace.equals(timer.getNamespace());
}
return false;
}
}
static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {
// 每来一条数据,都会经过此方法进行处理
@Override
public void processElement(
Tuple2<String, Integer> value,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
...
long timer = current.lastModified + 10000;
// 注册定时器
ctx.timerService().registerProcessingTimeTimer(timer);
...
}
// 定时器回调函数
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
...
CountWithTimestamp result = state.value();
// 定时器到达时间,则往下游发送数据
out.collect(new Tuple2<String, Long>(result.key, result.count));
...
}
}
public class SimpleTimerService implements TimerService {
@Override
public void registerProcessingTimeTimer(long time) {
// 注册定时器
internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
}
}
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
// 定时器入队
// 一旦if条件满足,则证明入队成功且入队的是小顶堆的堆顶元素,要针对小顶堆堆顶元素创建延迟调用
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// check if we need to re-schedule our timer to earlier
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
}
}
}
}
@Override
public boolean add(@Nonnull T toAdd) {
// 定时器入队
addInternal(toAdd);
// 如果入队后的定时器是堆顶节点,则返回true,后面的逻辑会根据这里是否返回true,来判断是否需要建立ScheduledThreadPoolExecutor延迟调用;换言之,延迟调用只会根据堆顶节点来建立
return toAdd.getInternalIndex() == getHeadElementIndex();
}
}
@Override
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback callback) {
// 这里延迟调用的时间颇有讲究,并非是用定时器时间减去当前时间这么简单,而是要将相减的值再加1ms
// 这么做是为了与watemark的语义保持一致(虽然基于processtime的定时器用不到watermark)
// 例如,在窗口 [20000, 30000)中,30000这个时间点是不会触发窗口计算的,只有当watermark至少为30001时,才会触发窗口操作。有兴趣的同学可以看一下该方法源码中的注释。
long delay = ProcessingTimeServiceUtil.getProcessingTimeDelay(timestamp, getCurrentProcessingTime());
// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
// 这里wrapOnTimerCallback(callback, timestamp)中的一波lambda操作秀我一脸,看似复杂,其实直接看作是callback即可
// 往上追溯一下,你就会发现callback其实就是InternalTimerServiceImpl#onProcessingTime方法
return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(delay);
}
else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
}
else {
// something else happened, so propagate the exception
throw e;
}
}
}
}
@Override
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
// 从小顶堆堆顶依次弹出到达时间的定时器,调用用户自定义的KeyedProcessFunction#onTimer方法
// 一旦堆顶元素不满足触发时间,则重新针对堆顶元素建立延迟调用
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
// 这段逻辑调用processingTimeService实现类SystemProcessingTimeService中的registerTimer方法,该方法中将上次遍历中的最后一个timer的触发时间注册到ScheduledThreadPoolExecutor线程池中,实现再次延迟调用当前 InternalTimerServiceImpl#onProcessingTime,以此实现while逻辑的不断执行,即优先级队列的不断遍历
if (timer != null && nextTimer == null) {
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
}
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧!