Flink Timer(定时器)机制及实现详解

Java
208
0
0
2024-03-21

Timer简介

Timer(定时器)是Flink Streaming API提供的用于感知并利用处理时间/事件时间变化的机制。官网上给出的描述如下:

Timers are what make Flink streaming applications reactive and adaptable to processing and event time changes.

对于普通用户来说,最常见的显式利用Timer的地方就是KeyedProcess function 。我们在其processElement()方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。根据时间特征的不同:

  • 处理时间——调用Context.timerService().registerProcessingTimeTimer()注册;onTimer()在系统时间戳达到Timer设定的时间戳时触发。
  • 事件时间——调用Context.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部水印达到或超过Timer设定的时间戳时触发。

举个例子,按天实时统计指标并存储在状态中,每天0点清除状态重新统计,就可以在processElement()方法里注册Timer。

 ctx.timerService().registerProcessingTimeTimer(
  tomorrowZeroTimestampMs(System.currentTimeMillis(),) + 1
);

public static long tomorrowZeroTimestampMs(long now, int timeZone) {
  return now - (now + timeZone *) % 86400000 + 86400000;
} 

再在onTimer()方法里执行state.clear()。so easy。

除了KeyedProcessFunction之外,Timer在窗口机制中也有重要的地位。提起窗口自然就能想到Trigger,即触发器。来看下Flink自带的EventTimeTrigger的部分代码,它是事件时间特征下的默认触发器。

 @ Override 
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        return TriggerResult.FIRE;
    } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}
 @Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
    return time == window.maxTimestamp() ?
        TriggerResult.FIRE :
        TriggerResult.CONTINUE;
} 

可见,当水印还没有到达窗口右边沿时,就注册以窗口右边沿为时间戳的Timer。Timer到期后触发onEventTime()方法,进而触发该窗口相关联的Trigger。

文章开头引用的blog从用户的角度给出了Flink Timer的4大特点,如下图所示。

Flink Timer(定时器)机制及实现详解

经由上面的介绍,我们有了两个入手点(KeyedProcessFunction、Trigger)来分析Timer的细节。接下来从前者入手,let’s get our hands dirty。

TimerService、InternalTimerService

负责实际执行KeyedProcessFunction的算子是KeyedProcessOperator,其中以内部类的形式实现了KeyedProcessFunction需要的上下文类Context,如下所示。

 private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {
    private final TimerService timerService;
    private StreamRecord<IN> element;
     ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
        function.super();
        this.timerService = checkNotNull(timerService);
    }
     @Override
    public Long timestamp() {
        checkState(element != null);
        if (element.hasTimestamp()) {
            return element.getTimestamp();
        } else {
            return null;
        }
    }
     @Override
    public TimerService timerService() {
        return timerService;
    }
     // 以下略...
} 

可见timerService()方法返回的是外部传入的TimerService实例,那么我们就回到KeyedProcessOperator看一下它的实现,顺便放个类图。

 public class KeyedProcessOperator<K, IN, OUT>
    extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
    private static final long serialVersionUID =L;
    private transient TimestampedCollector<OUT> collector;
    private transient ContextImpl context;
    private transient OnTimerContextImpl onTimerContext;

    public KeyedProcessOperator(KeyedProcessFunction<K, IN, OUT> function) {
        super(function);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
        InternalTimerService<VoidNamespace> internalTimerService =
            getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        TimerService timerService = new SimpleTimerService(internalTimerService);
        context = new ContextImpl(userFunction, timerService);
        onTimerContext = new OnTimerContextImpl(userFunction, timerService);
    }

    @Override
    public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.setAbsoluteTimestamp(timer.getTimestamp());
        invokeUserFunction(TimeDomain.EVENT_TIME, timer);
    }

    @Override
    public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.eraseTimestamp();
        invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement(element.getValue(), context, collector);
        context.element = null;
    }

    private void invokeUserFunction(
        TimeDomain timeDomain,
        InternalTimer<K, VoidNamespace> timer) throws Exception {
        onTimerContext.timeDomain = timeDomain;
        onTimerContext.timer = timer;
        userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
        onTimerContext.timeDomain = null;
        onTimerContext.timer = null;
    }

    // 以下略...
} 

通过阅读上述代码,可以总结出:

  • TimerService接口的实现类为SimpleTimerService,它实际上又是InternalTimerService的非常简单的代理(真的很简单,代码略去)。
  • InternalTimerService的实例由getInternalTimerService()方法取得,该方法定义在所有算子的基类AbstractStreamOperator中。它比较重要,后面再提。
  • KeyedProcessOperator.processElement()方法调用用户自定义函数的processElement()方法,顺便将上下文实例ContextImpl传了进去,所以用户可以由它获得TimerService来注册Timer。
  • Timer在代码中叫做InternalTimer(是个接口)。
  • 当Timer触发时,实际上是根据时间特征调用onProcessingTime()/onEventTime()方法(这两个方法来自Triggerable接口),进而触发用户函数的onTimer()回调逻辑。后面还会见到它们。

接下来就看看InternalTimerService是如何取得的。

 /**
 * Returns a {@link InternalTimerService} that can be used to query current processing time
 * and event time and to set timers. An operator can have several timer services, where
 * each has its own namespace serializer. Timer services are differentiated by the string
 * key that is given when requesting them, if you call this method with the same key
 * multiple times you will get the same timer service instance in subsequent requests.
 *
 * <p>Timers are always scoped to a key, the currently active key of a keyed stream operation.
 * When a timer fires, this key will also be set as the currently active key.
 *
 * <p>Each timer has attached metadata, the namespace. Different timer services
 * can have a different namespace type. If you don't need namespace differentiation you
 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
 *
 * @param name                The name of the requested timer service. If no service exists under the given
 *                            name a new one will be created and returned.
 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
 * @param triggerable         The {@link Triggerable} that should be invoked when timers fire
 * @param <N>                 The type of the timer namespace.
 */    public <K, N> InternalTimerService<N> getInternalTimerService(
    String name,
    TypeSerializer<N> namespaceSerializer,
    Triggerable<K, N> triggerable) {
    checkTimerServiceInitialization();
     KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
    TypeSerializer<K> keySerializer = keyedStateBackend.getKeySerializer();
    InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager;
    TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer);
    return keyedTimeServiceHandler.getInternalTimerService(name, timerSerializer, triggerable);
} 

该方法的注释描述非常清楚,所以一起粘贴过来。简单来讲:

  • 每个算子可以有一个或多个InternalTimerService。
  • InternalTimerService的四要素是:名称、 命名空间 类型N(及其序列化器)、键类型K(及其序列化器),还有上文所述Triggerable接口的实现。
  • InternalTimerService经由InternalTimeServiceManager.getInternalTimerService()方法取得。

例如,上文KeyedProcessOperator初始化的InternalTimerService,名称为”user-timers”,命名空间类型为空(VoidNamespace),Triggerable实现类则是其本身。如果是WindowOperator的话,其InternalTimerService的名称就是”window-timers”,命名空间类型则是Window。

InternalTimerService在代码中仍然是一个接口,其代码如下。方法的签名除了多了命名空间之外(命名空间对用户透明),其他都与TimerService提供的相同。

 public interface InternalTimerService<N> {
    long currentProcessingTime();
    long currentWatermark();

    void registerProcessingTimeTimer(N namespace, long time);
    void deleteProcessingTimeTimer(N namespace, long time);

    void registerEventTimeTimer(N namespace, long time);
    void deleteEventTimeTimer(N namespace, long time);

    // ...
} 

下面更进一步,看看InternalTimeServiceManager是如何实现的。

InternalTimeServiceManager、TimerHeapInternalTimer

顾名思义,InternalTimeServiceManager用于管理各个InternalTimeService。部分代码如下:

 public class InternalTimeServiceManager<K> {
    @VisibleForTesting
    static final String TIMER_STATE_PREFIX = "_timer_state";
    @VisibleForTesting
    static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
    @VisibleForTesting
    static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";

    private final KeyGroupRange localKeyGroupRange;
    private final KeyContext keyContext;
    private final PriorityQueueSetFactory priorityQueueSetFactory;
    private final ProcessingTimeService processingTimeService;
    private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
    private final boolean useLegacySynchronousSnapshots;

    @SuppressWarnings("unchecked")
    public <N> InternalTimerService<N> getInternalTimerService(
        String name,
        TimerSerializer<K, N> timerSerializer,
        Triggerable<K, N> triggerable) {
        InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);
        timerService.startTimerService(
            timerSerializer.getKeySerializer(),
            timerSerializer.getNamespaceSerializer(),
            triggerable);
        return timerService;
    }

    @SuppressWarnings("unchecked")
    <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
        InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
        if (timerService == null) {
            timerService = new InternalTimerServiceImpl<>(
                localKeyGroupRange,
                keyContext,
                processingTimeService,
                createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
                createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
            timerServices.put(name, timerService);
        }
        return timerService;
    }

    private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
        String name,
        TimerSerializer<K, N> timerSerializer) {
        return priorityQueueSetFactory.create(
            name,
            timerSerializer);
    }

    // 以下略...
} 

从上面的代码可以得知:

  • Flink中InternalTimerService的最终实现实际上是InternalTimerServiceImpl类,而InternalTimer的最终实现是TimerHeapInternalTimer类。
  • InternalTimeServiceManager会用 HashMap 维护一个特定键类型K下所有InternalTimerService的名称与实例映射。如果名称已经存在,就会直接返回,不会重新创建。
  • 初始化InternalTimerServiceImpl时,会同时创建两个包含TimerHeapInternalTimer的优先队列(该优先队列是Flink自己实现的),分别用于维护事件时间和处理时间的Timer。

说了这么多,最需要注意的是,Timer是维护在JVM堆内存中的,如果频繁注册大量Timer,或者同时触发大量Timer,也是一笔不小的开销。

TimerHeapInternalTimer的实现比较简单,主要就是4个字段和1个方法。为了少打点字,把注释也弄过来。

 /**
 * The key for which the timer is scoped.
 */    @Nonnull
private final K key;
/**
 * The namespace for which the timer is scoped.
 */    @Nonnull
private final N namespace;
/**
 * The expiration timestamp.
 */    private final long timestamp;
/**
 * This field holds the current physical index of this timer when it is managed by a timer heap so that we can
 * support fast deletes.
 */    private transient int timerHeapIndex;
 @Override
public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
    return Long.compare(timestamp, other.getTimestamp());
}
} 

可见,Timer的scope有两个,一是数据的key,二是命名空间。但是用户不会感知到命名空间的存在,所以我们可以简单地认为Timer是以key级别注册的(Timer四大特点之1)。正确估计key的量可以帮助我们控制Timer的量。

timerHeapIndex是这个Timer在优先队列里存储的下标。优先队列通常用二叉堆实现,而二叉堆可以直接用数组存储,所以让Timer持有其对应的下标可以较快地从队列里删除它。

comparePriorityTo()方法则用于确定Timer的优先级,显然Timer的优先队列是一个按Timer时间戳为关键字排序的最小堆。下面粗略看看该最小堆的实现。

HeapPriorityQueueSet

上面代码中PriorityQueueSetFactory.create()方法创建的优先队列实际上的类型是HeapPriorityQueueSet。它的基本思路与Java自带的PriorityQueue相同,但是在其基础上加入了按key去重的逻辑(Timer四大特点之2)。不妨列出它的部分代码。

 private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
private final KeyGroupRange keyGroupRange;
 @Override
@Nullable
public T  poll () {
    final T toRemove = super.poll();
    return toRemove != null ? getDedupMapForElement(toRemove).remove(toRemove) : null;
}
 @Override
public boolean add(@Nonnull T element) {
    return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
}
 @Override
public boolean remove(@Nonnull T toRemove) {
    T storedElement = getDedupMapForElement(toRemove).remove(toRemove);
    return storedElement != null && super.remove(storedElement);
}
 private HashMap<T, T> getDedupMapForKeyGroup(
    @Nonnegative int keyGroupId) {
    return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupId)];
}
 private HashMap<T, T> getDedupMapForElement(T element) {
    int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(
        keyExtractor.extractKeyFromElement(element),
        totalNumberOfKeyGroups);
    return getDedupMapForKeyGroup(keyGroup);
}
 private int globalKeyGroupToLocalIndex(int keyGroup) {
    checkArgument(keyGroupRange.contains(keyGroup), "%s does not contain key group %s", keyGroupRange, keyGroup);
    return keyGroup - keyGroupRange.getStartKeyGroup();
} 

要搞懂它,必须解释一下KeyGroup和KeyGroupRange。KeyGroup是Flink内部KeyedState的原子单位,亦即一些key的组合。一个Flink App的KeyGroup数量与最大并行度相同,将key分配到KeyGroup的操作则是经典的取hashCode+取模。而KeyGroupRange则是一些连续KeyGroup的范围,每个Flink sub-task都只包含一个KeyGroupRange。也就是说,KeyGroupRange可以看做当前sub-task在本地维护的所有key。

解释完毕。容易得知,上述代码中的那个HashMap<T, T>[]数组就是在KeyGroup级别对key进行去重的容器,数组中每个元素对应一个KeyGroup。以插入一个Timer的流程为例:

  • 从Timer中取出key,计算该key属于哪一个KeyGroup;
  • 计算出该KeyGroup在整个KeyGroupRange中的偏移量,按该偏移量定位到HashMap<T, T>[]数组的下标;
  • 根据putIfAbsent()方法的语义,只有当对应HashMap不存在该Timer的key时,才将Timer插入最小堆中。

接下来回到主流程,InternalTimerServiceImpl。

InternalTimerServiceImpl

在这里,我们终于可以看到注册和移除Timer方法的最底层实现了。注意ProcessingTimeService是Flink内部产生处理时间的时间戳的服务。

 private final ProcessingTimeService processingTimeService;
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
private ScheduledFuture<?> nextTimer;
 @Override
public void registerProcessingTimeTimer(N namespace, long time) {
    InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
    if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
        long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
        if (time < nextTriggerTime) {
            if (nextTimer != null) {
                nextTimer.cancel(false);
            }
            nextTimer = processingTimeService.registerTimer(time, this);
        }
    }
}
 @Override
public void registerEventTimeTimer(N namespace, long time) {
    eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
 @Override
public void deleteProcessingTimeTimer(N namespace, long time) {
    processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
 @Override
public void deleteEventTimeTimer(N namespace, long time) {
    eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
} 

由此可见,注册Timer实际上就是为它们赋予对应的时间戳、key和命名空间,并将它们加入对应的优先队列。特别地,当注册基于处理时间的Timer时,会先检查要注册的Timer时间戳与当前在最小堆堆顶的Timer的时间戳的大小关系。如果前者比后者要早,就会用前者替代掉后者,因为处理时间是永远线性增长的。

Timer注册好了之后是如何触发的呢?先来看处理时间的情况。

InternalTimerServiceImpl类继承了ProcessingTimeCallback接口,表示它可以触发处理时间的回调。该接口只要求实现一个方法,如下。

 @Override
private Triggerable<K, N> triggerTarget;
 public void onProcessingTime(long time) throws Exception {
    nextTimer = null;
    InternalTimer<K, N> timer;
     while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
        processingTimeTimersQueue.poll();
        keyContext.setCurrentKey(timer.getKey());
        triggerTarget.onProcessingTime(timer);
    }
     if (timer != null && nextTimer == null) {
        nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
    }
} 

可见,当onProcessingTime()方法被触发回调时,就会按顺序从队列中获取到比时间戳time小的所有Timer,并挨个执行Triggerable.onProcessingTime()方法,也就是在上文KeyedProcessOperator的同名方法,用户自定义的onTimer()逻辑也就被执行了。

最后来到ProcessingTimeService的实现类SystemProcessingTimeService,它是用调度线程池实现回调的。相关的代码如下。

 private final ScheduledThreadPoolExecutor timerService;
 @Override
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
    long delay = Math.max(timestamp - getCurrentProcessingTime(),) + 1;
     try {
        return timerService.schedule(
            new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
    } catch (RejectedExecutionException e) {
        final int status = this.status.get();
        if (status == STATUS_QUIESCED) {
            return new NeverCompleteFuture(delay);
        } else if (status == STATUS_SHUTDOWN) {
            throw new IllegalStateException("Timer service is shut down");
        } else {
            throw e;
        }
    }
}
 // 注意:这个是TriggerTask线程的run()方法
@Override
public void run() {
    synchronized (lock) {
        try {
            if (serviceStatus.get() == STATUS_ALIVE) {
                target.onProcessingTime(timestamp);
            }
        } catch (Throwable t) {
            TimerException asyncException = new TimerException(t);
            exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
        }
    }
} 

可见,onProcessingTime()在TriggerTask线程中被回调,而TriggerTask线程按照Timer的时间戳来调度。到这里,处理时间Timer的情况就讲述完毕了。

再来看事件时间的情况。事件时间与内部时间戳无关,而与水印有关。以下是InternalTimerServiceImpl.advanceWatermark()方法的代码。

 public void advanceWatermark(long time) throws Exception {
    currentWatermark = time;
    InternalTimer<K, N> timer;
     while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
        eventTimeTimersQueue.poll();
        keyContext.setCurrentKey(timer.getKey());
        triggerTarget.onEventTime(timer);
    }
} 

该逻辑与处理时间相似,只不过从回调onProcessingTime()变成了回调onEventTime()而已。然后追踪它的调用链,回到InternalTimeServiceManager的同名方法。

 public void advanceWatermark(Watermark watermark) throws Exception {
    for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
        service.advanceWatermark(watermark.getTimestamp());
    }
} 

继续向上追溯,到达终点:算子基类AbstractStreamOperator中处理水印的方法processWatermark()。当水印到来时,就会按着上述调用链流转到InternalTimerServiceImpl中,并触发所有早于水印时间戳的Timer了。

 public void processWatermark(Watermark mark) throws Exception {
    if (timeServiceManager != null) {
        timeServiceManager.advanceWatermark(mark);
    }
    output.emitWatermark(mark);
} 

至此,我们算是基本打通了Flink Timer机制的实现细节。