Timer简介
Timer(定时器)是Flink Streaming API提供的用于感知并利用处理时间/事件时间变化的机制。官网上给出的描述如下:
Timers are what make Flink streaming applications reactive and adaptable to processing and event time changes.
对于普通用户来说,最常见的显式利用Timer的地方就是KeyedProcess function 。我们在其processElement()方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。根据时间特征的不同:
- 处理时间——调用Context.timerService().registerProcessingTimeTimer()注册;onTimer()在系统时间戳达到Timer设定的时间戳时触发。
- 事件时间——调用Context.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部水印达到或超过Timer设定的时间戳时触发。
举个例子,按天实时统计指标并存储在状态中,每天0点清除状态重新统计,就可以在processElement()方法里注册Timer。
ctx.timerService().registerProcessingTimeTimer( | |
tomorrowZeroTimestampMs(System.currentTimeMillis(),) + 1 | |
); | |
public static long tomorrowZeroTimestampMs(long now, int timeZone) { | |
return now - (now + timeZone *) % 86400000 + 86400000; | |
} |
再在onTimer()方法里执行state.clear()。so easy。
除了KeyedProcessFunction之外,Timer在窗口机制中也有重要的地位。提起窗口自然就能想到Trigger,即触发器。来看下Flink自带的EventTimeTrigger的部分代码,它是事件时间特征下的默认触发器。
@ Override | |
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { | |
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { | |
return TriggerResult.FIRE; | |
} else { | |
ctx.registerEventTimeTimer(window.maxTimestamp()); | |
return TriggerResult.CONTINUE; | |
} | |
} | |
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { | |
return time == window.maxTimestamp() ? | |
TriggerResult.FIRE : | |
TriggerResult.CONTINUE; | |
} |
可见,当水印还没有到达窗口右边沿时,就注册以窗口右边沿为时间戳的Timer。Timer到期后触发onEventTime()方法,进而触发该窗口相关联的Trigger。
文章开头引用的blog从用户的角度给出了Flink Timer的4大特点,如下图所示。
经由上面的介绍,我们有了两个入手点(KeyedProcessFunction、Trigger)来分析Timer的细节。接下来从前者入手,let’s get our hands dirty。
TimerService、InternalTimerService
负责实际执行KeyedProcessFunction的算子是KeyedProcessOperator,其中以内部类的形式实现了KeyedProcessFunction需要的上下文类Context,如下所示。
private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context { | |
private final TimerService timerService; | |
private StreamRecord<IN> element; | |
ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) { | |
function.super(); | |
this.timerService = checkNotNull(timerService); | |
} | |
public Long timestamp() { | |
checkState(element != null); | |
if (element.hasTimestamp()) { | |
return element.getTimestamp(); | |
} else { | |
return null; | |
} | |
} | |
public TimerService timerService() { | |
return timerService; | |
} | |
// 以下略... | |
} |
可见timerService()方法返回的是外部传入的TimerService实例,那么我们就回到KeyedProcessOperator看一下它的实现,顺便放个类图。
public class KeyedProcessOperator<K, IN, OUT> | |
extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>> | |
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> { | |
private static final long serialVersionUID =L; | |
private transient TimestampedCollector<OUT> collector; | |
private transient ContextImpl context; | |
private transient OnTimerContextImpl onTimerContext; | |
public KeyedProcessOperator(KeyedProcessFunction<K, IN, OUT> function) { | |
super(function); | |
chainingStrategy = ChainingStrategy.ALWAYS; | |
} | |
public void open() throws Exception { | |
super.open(); | |
collector = new TimestampedCollector<>(output); | |
InternalTimerService<VoidNamespace> internalTimerService = | |
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); | |
TimerService timerService = new SimpleTimerService(internalTimerService); | |
context = new ContextImpl(userFunction, timerService); | |
onTimerContext = new OnTimerContextImpl(userFunction, timerService); | |
} | |
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception { | |
collector.setAbsoluteTimestamp(timer.getTimestamp()); | |
invokeUserFunction(TimeDomain.EVENT_TIME, timer); | |
} | |
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception { | |
collector.eraseTimestamp(); | |
invokeUserFunction(TimeDomain.PROCESSING_TIME, timer); | |
} | |
public void processElement(StreamRecord<IN> element) throws Exception { | |
collector.setTimestamp(element); | |
context.element = element; | |
userFunction.processElement(element.getValue(), context, collector); | |
context.element = null; | |
} | |
private void invokeUserFunction( | |
TimeDomain timeDomain, | |
InternalTimer<K, VoidNamespace> timer) throws Exception { | |
onTimerContext.timeDomain = timeDomain; | |
onTimerContext.timer = timer; | |
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); | |
onTimerContext.timeDomain = null; | |
onTimerContext.timer = null; | |
} | |
// 以下略... | |
} |
通过阅读上述代码,可以总结出:
- TimerService接口的实现类为SimpleTimerService,它实际上又是InternalTimerService的非常简单的代理(真的很简单,代码略去)。
- InternalTimerService的实例由getInternalTimerService()方法取得,该方法定义在所有算子的基类AbstractStreamOperator中。它比较重要,后面再提。
- KeyedProcessOperator.processElement()方法调用用户自定义函数的processElement()方法,顺便将上下文实例ContextImpl传了进去,所以用户可以由它获得TimerService来注册Timer。
- Timer在代码中叫做InternalTimer(是个接口)。
- 当Timer触发时,实际上是根据时间特征调用onProcessingTime()/onEventTime()方法(这两个方法来自Triggerable接口),进而触发用户函数的onTimer()回调逻辑。后面还会见到它们。
接下来就看看InternalTimerService是如何取得的。
/** | |
* Returns a {@link InternalTimerService} that can be used to query current processing time | |
* and event time and to set timers. An operator can have several timer services, where | |
* each has its own namespace serializer. Timer services are differentiated by the string | |
* key that is given when requesting them, if you call this method with the same key | |
* multiple times you will get the same timer service instance in subsequent requests. | |
* | |
* <p>Timers are always scoped to a key, the currently active key of a keyed stream operation. | |
* When a timer fires, this key will also be set as the currently active key. | |
* | |
* <p>Each timer has attached metadata, the namespace. Different timer services | |
* can have a different namespace type. If you don't need namespace differentiation you | |
* can use {@link VoidNamespaceSerializer} as the namespace serializer. | |
* | |
* @param name The name of the requested timer service. If no service exists under the given | |
* name a new one will be created and returned. | |
* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. | |
* @param triggerable The {@link Triggerable} that should be invoked when timers fire | |
* @param <N> The type of the timer namespace. | |
*/ public <K, N> InternalTimerService<N> getInternalTimerService( | |
String name, | |
TypeSerializer<N> namespaceSerializer, | |
Triggerable<K, N> triggerable) { | |
checkTimerServiceInitialization(); | |
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend(); | |
TypeSerializer<K> keySerializer = keyedStateBackend.getKeySerializer(); | |
InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager; | |
TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer); | |
return keyedTimeServiceHandler.getInternalTimerService(name, timerSerializer, triggerable); | |
} |
该方法的注释描述非常清楚,所以一起粘贴过来。简单来讲:
- 每个算子可以有一个或多个InternalTimerService。
- InternalTimerService的四要素是:名称、 命名空间 类型N(及其序列化器)、键类型K(及其序列化器),还有上文所述Triggerable接口的实现。
- InternalTimerService经由InternalTimeServiceManager.getInternalTimerService()方法取得。
例如,上文KeyedProcessOperator初始化的InternalTimerService,名称为”user-timers”,命名空间类型为空(VoidNamespace),Triggerable实现类则是其本身。如果是WindowOperator的话,其InternalTimerService的名称就是”window-timers”,命名空间类型则是Window。
InternalTimerService在代码中仍然是一个接口,其代码如下。方法的签名除了多了命名空间之外(命名空间对用户透明),其他都与TimerService提供的相同。
public interface InternalTimerService<N> { | |
long currentProcessingTime(); | |
long currentWatermark(); | |
void registerProcessingTimeTimer(N namespace, long time); | |
void deleteProcessingTimeTimer(N namespace, long time); | |
void registerEventTimeTimer(N namespace, long time); | |
void deleteEventTimeTimer(N namespace, long time); | |
// ... | |
} |
下面更进一步,看看InternalTimeServiceManager是如何实现的。
InternalTimeServiceManager、TimerHeapInternalTimer
顾名思义,InternalTimeServiceManager用于管理各个InternalTimeService。部分代码如下:
public class InternalTimeServiceManager<K> { | |
static final String TIMER_STATE_PREFIX = "_timer_state"; | |
static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_"; | |
static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_"; | |
private final KeyGroupRange localKeyGroupRange; | |
private final KeyContext keyContext; | |
private final PriorityQueueSetFactory priorityQueueSetFactory; | |
private final ProcessingTimeService processingTimeService; | |
private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices; | |
private final boolean useLegacySynchronousSnapshots; | |
"unchecked") | (|
public <N> InternalTimerService<N> getInternalTimerService( | |
String name, | |
TimerSerializer<K, N> timerSerializer, | |
Triggerable<K, N> triggerable) { | |
InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer); | |
timerService.startTimerService( | |
timerSerializer.getKeySerializer(), | |
timerSerializer.getNamespaceSerializer(), | |
triggerable); | |
return timerService; | |
} | |
"unchecked") | (|
<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) { | |
InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name); | |
if (timerService == null) { | |
timerService = new InternalTimerServiceImpl<>( | |
localKeyGroupRange, | |
keyContext, | |
processingTimeService, | |
createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer), | |
createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer)); | |
timerServices.put(name, timerService); | |
} | |
return timerService; | |
} | |
private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue( | |
String name, | |
TimerSerializer<K, N> timerSerializer) { | |
return priorityQueueSetFactory.create( | |
name, | |
timerSerializer); | |
} | |
// 以下略... | |
} |
从上面的代码可以得知:
- Flink中InternalTimerService的最终实现实际上是InternalTimerServiceImpl类,而InternalTimer的最终实现是TimerHeapInternalTimer类。
- InternalTimeServiceManager会用 HashMap 维护一个特定键类型K下所有InternalTimerService的名称与实例映射。如果名称已经存在,就会直接返回,不会重新创建。
- 初始化InternalTimerServiceImpl时,会同时创建两个包含TimerHeapInternalTimer的优先队列(该优先队列是Flink自己实现的),分别用于维护事件时间和处理时间的Timer。
说了这么多,最需要注意的是,Timer是维护在JVM堆内存中的,如果频繁注册大量Timer,或者同时触发大量Timer,也是一笔不小的开销。
TimerHeapInternalTimer的实现比较简单,主要就是4个字段和1个方法。为了少打点字,把注释也弄过来。
/** | |
* The key for which the timer is scoped. | |
*/ @Nonnull | |
private final K key; | |
/** | |
* The namespace for which the timer is scoped. | |
*/ @Nonnull | |
private final N namespace; | |
/** | |
* The expiration timestamp. | |
*/ private final long timestamp; | |
/** | |
* This field holds the current physical index of this timer when it is managed by a timer heap so that we can | |
* support fast deletes. | |
*/ private transient int timerHeapIndex; | |
@Override | |
public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) { | |
return Long.compare(timestamp, other.getTimestamp()); | |
} | |
} |
可见,Timer的scope有两个,一是数据的key,二是命名空间。但是用户不会感知到命名空间的存在,所以我们可以简单地认为Timer是以key级别注册的(Timer四大特点之1)。正确估计key的量可以帮助我们控制Timer的量。
timerHeapIndex是这个Timer在优先队列里存储的下标。优先队列通常用二叉堆实现,而二叉堆可以直接用数组存储,所以让Timer持有其对应的下标可以较快地从队列里删除它。
comparePriorityTo()方法则用于确定Timer的优先级,显然Timer的优先队列是一个按Timer时间戳为关键字排序的最小堆。下面粗略看看该最小堆的实现。
HeapPriorityQueueSet
上面代码中PriorityQueueSetFactory.create()方法创建的优先队列实际上的类型是HeapPriorityQueueSet。它的基本思路与Java自带的PriorityQueue相同,但是在其基础上加入了按key去重的逻辑(Timer四大特点之2)。不妨列出它的部分代码。
private final HashMap<T, T>[] deduplicationMapsByKeyGroup; | |
private final KeyGroupRange keyGroupRange; | |
public T poll () { | |
final T toRemove = super.poll(); | |
return toRemove != null ? getDedupMapForElement(toRemove).remove(toRemove) : null; | |
} | |
public boolean add( { T element) | |
return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element); | |
} | |
public boolean remove( { T toRemove) | |
T storedElement = getDedupMapForElement(toRemove).remove(toRemove); | |
return storedElement != null && super.remove(storedElement); | |
} | |
private HashMap<T, T> getDedupMapForKeyGroup( | |
int keyGroupId) { | |
return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupId)]; | |
} | |
private HashMap<T, T> getDedupMapForElement(T element) { | |
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup( | |
keyExtractor.extractKeyFromElement(element), | |
totalNumberOfKeyGroups); | |
return getDedupMapForKeyGroup(keyGroup); | |
} | |
private int globalKeyGroupToLocalIndex(int keyGroup) { | |
checkArgument(keyGroupRange.contains(keyGroup), "%s does not contain key group %s", keyGroupRange, keyGroup); | |
return keyGroup - keyGroupRange.getStartKeyGroup(); | |
} |
要搞懂它,必须解释一下KeyGroup和KeyGroupRange。KeyGroup是Flink内部KeyedState的原子单位,亦即一些key的组合。一个Flink App的KeyGroup数量与最大并行度相同,将key分配到KeyGroup的操作则是经典的取hashCode+取模。而KeyGroupRange则是一些连续KeyGroup的范围,每个Flink sub-task都只包含一个KeyGroupRange。也就是说,KeyGroupRange可以看做当前sub-task在本地维护的所有key。
解释完毕。容易得知,上述代码中的那个HashMap<T, T>[]数组就是在KeyGroup级别对key进行去重的容器,数组中每个元素对应一个KeyGroup。以插入一个Timer的流程为例:
- 从Timer中取出key,计算该key属于哪一个KeyGroup;
- 计算出该KeyGroup在整个KeyGroupRange中的偏移量,按该偏移量定位到HashMap<T, T>[]数组的下标;
- 根据putIfAbsent()方法的语义,只有当对应HashMap不存在该Timer的key时,才将Timer插入最小堆中。
接下来回到主流程,InternalTimerServiceImpl。
InternalTimerServiceImpl
在这里,我们终于可以看到注册和移除Timer方法的最底层实现了。注意ProcessingTimeService是Flink内部产生处理时间的时间戳的服务。
private final ProcessingTimeService processingTimeService; | |
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue; | |
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue; | |
private ScheduledFuture<?> nextTimer; | |
@Override | |
public void registerProcessingTimeTimer(N namespace, long time) { | |
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek(); | |
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) { | |
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE; | |
if (time < nextTriggerTime) { | |
if (nextTimer != null) { | |
nextTimer.cancel(false); | |
} | |
nextTimer = processingTimeService.registerTimer(time, this); | |
} | |
} | |
} | |
@Override | |
public void registerEventTimeTimer(N namespace, long time) { | |
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); | |
} | |
@Override | |
public void deleteProcessingTimeTimer(N namespace, long time) { | |
processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); | |
} | |
@Override | |
public void deleteEventTimeTimer(N namespace, long time) { | |
eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); | |
} |
由此可见,注册Timer实际上就是为它们赋予对应的时间戳、key和命名空间,并将它们加入对应的优先队列。特别地,当注册基于处理时间的Timer时,会先检查要注册的Timer时间戳与当前在最小堆堆顶的Timer的时间戳的大小关系。如果前者比后者要早,就会用前者替代掉后者,因为处理时间是永远线性增长的。
Timer注册好了之后是如何触发的呢?先来看处理时间的情况。
InternalTimerServiceImpl类继承了ProcessingTimeCallback接口,表示它可以触发处理时间的回调。该接口只要求实现一个方法,如下。
private Triggerable<K, N> triggerTarget; | |
public void onProcessingTime(long time) throws Exception { | |
nextTimer = null; | |
InternalTimer<K, N> timer; | |
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { | |
processingTimeTimersQueue.poll(); | |
keyContext.setCurrentKey(timer.getKey()); | |
triggerTarget.onProcessingTime(timer); | |
} | |
if (timer != null && nextTimer == null) { | |
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this); | |
} | |
} |
可见,当onProcessingTime()方法被触发回调时,就会按顺序从队列中获取到比时间戳time小的所有Timer,并挨个执行Triggerable.onProcessingTime()方法,也就是在上文KeyedProcessOperator的同名方法,用户自定义的onTimer()逻辑也就被执行了。
最后来到ProcessingTimeService的实现类SystemProcessingTimeService,它是用调度线程池实现回调的。相关的代码如下。
private final ScheduledThreadPoolExecutor timerService; | |
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) { | |
long delay = Math.max(timestamp - getCurrentProcessingTime(),) + 1; | |
try { | |
return timerService.schedule( | |
new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS); | |
} catch (RejectedExecutionException e) { | |
final int status = this.status.get(); | |
if (status == STATUS_QUIESCED) { | |
return new NeverCompleteFuture(delay); | |
} else if (status == STATUS_SHUTDOWN) { | |
throw new IllegalStateException("Timer service is shut down"); | |
} else { | |
throw e; | |
} | |
} | |
} | |
// 注意:这个是TriggerTask线程的run()方法 | |
public void run() { | |
synchronized (lock) { | |
try { | |
if (serviceStatus.get() == STATUS_ALIVE) { | |
target.onProcessingTime(timestamp); | |
} | |
} catch (Throwable t) { | |
TimerException asyncException = new TimerException(t); | |
exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException); | |
} | |
} | |
} |
可见,onProcessingTime()在TriggerTask线程中被回调,而TriggerTask线程按照Timer的时间戳来调度。到这里,处理时间Timer的情况就讲述完毕了。
再来看事件时间的情况。事件时间与内部时间戳无关,而与水印有关。以下是InternalTimerServiceImpl.advanceWatermark()方法的代码。
public void advanceWatermark(long time) throws Exception { | |
currentWatermark = time; | |
InternalTimer<K, N> timer; | |
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { | |
eventTimeTimersQueue.poll(); | |
keyContext.setCurrentKey(timer.getKey()); | |
triggerTarget.onEventTime(timer); | |
} | |
} |
该逻辑与处理时间相似,只不过从回调onProcessingTime()变成了回调onEventTime()而已。然后追踪它的调用链,回到InternalTimeServiceManager的同名方法。
public void advanceWatermark(Watermark watermark) throws Exception { | |
for (InternalTimerServiceImpl , service : timerServices.values()) { | |
service.advanceWatermark(watermark.getTimestamp()); | |
} | |
} |
继续向上追溯,到达终点:算子基类AbstractStreamOperator中处理水印的方法processWatermark()。当水印到来时,就会按着上述调用链流转到InternalTimerServiceImpl中,并触发所有早于水印时间戳的Timer了。
public void processWatermark(Watermark mark) throws Exception { | |
if (timeServiceManager != null) { | |
timeServiceManager.advanceWatermark(mark); | |
} | |
output.emitWatermark(mark); | |
} |
至此,我们算是基本打通了Flink Timer机制的实现细节。