Talk about flink’s TimerService

  flink

Order

This article mainly studies flink’s TimerService

TimerService

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/TimerService.java

@PublicEvolving
public interface TimerService {

    String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";

    String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";

    long currentProcessingTime();

    long currentWatermark();

    void registerProcessingTimeTimer(long time);

    void registerEventTimeTimer(long time);

    void deleteProcessingTimeTimer(long time);

    void deleteEventTimeTimer(long time);
}
  • TimerService interface defines currentProcessingTime, currentWatermark, registerProcessingTimeTimer, registerEventTimeTimer, deleteProcessingTimeTimer, deleteEventTimeTimer interface

SimpleTimerService

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/SimpleTimerService.java

@Internal
public class SimpleTimerService implements TimerService {

    private final InternalTimerService<VoidNamespace> internalTimerService;

    public SimpleTimerService(InternalTimerService<VoidNamespace> internalTimerService) {
        this.internalTimerService = internalTimerService;
    }

    @Override
    public long currentProcessingTime() {
        return internalTimerService.currentProcessingTime();
    }

    @Override
    public long currentWatermark() {
        return internalTimerService.currentWatermark();
    }

    @Override
    public void registerProcessingTimeTimer(long time) {
        internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
    }

    @Override
    public void registerEventTimeTimer(long time) {
        internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);
    }

    @Override
    public void deleteProcessingTimeTimer(long time) {
        internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time);
    }

    @Override
    public void deleteEventTimeTimer(long time) {
        internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time);
    }
}
  • SimpleTimerService implements TimerService, which is implemented by entrusting InternalTimerService.

InternalTimerService

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/InternalTimerService.java

@Internal
public interface InternalTimerService<N> {

    long currentProcessingTime();

    long currentWatermark();

    void registerProcessingTimeTimer(N namespace, long time);

    void deleteProcessingTimeTimer(N namespace, long time);

    void registerEventTimeTimer(N namespace, long time);

    void deleteEventTimeTimer(N namespace, long time);
}
  • InternalTimerService is the interface of the internal version of TimerService. Compared with TimerService, it defines namespace. In the methods of registerProcessingTimeTimer, deleteProcessingTimeTimer, registerEventTimeTimer and deleteEventTimeTimer, there is one more parameter named SAPCE.

InternalTimerServiceImpl

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java

public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {

    private final ProcessingTimeService processingTimeService;

    private final KeyContext keyContext;

    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;

    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;

    private final KeyGroupRange localKeyGroupRange;

    private final int localKeyGroupRangeStartIdx;

    private long currentWatermark = Long.MIN_VALUE;

    private ScheduledFuture<?> nextTimer;

    // Variables to be set when the service is started.

    private TypeSerializer<K> keySerializer;

    private TypeSerializer<N> namespaceSerializer;

    private Triggerable<K, N> triggerTarget;

    private volatile boolean isInitialized;

    private TypeSerializer<K> keyDeserializer;

    private TypeSerializer<N> namespaceDeserializer;

    private InternalTimersSnapshot<K, N> restoredTimersSnapshot;

    InternalTimerServiceImpl(
        KeyGroupRange localKeyGroupRange,
        KeyContext keyContext,
        ProcessingTimeService processingTimeService,
        KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
        KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {

        this.keyContext = checkNotNull(keyContext);
        this.processingTimeService = checkNotNull(processingTimeService);
        this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
        this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
        this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);

        // find the starting index of the local key-group range
        int startIdx = Integer.MAX_VALUE;
        for (Integer keyGroupIdx : localKeyGroupRange) {
            startIdx = Math.min(keyGroupIdx, startIdx);
        }
        this.localKeyGroupRangeStartIdx = startIdx;
    }

    public void startTimerService(
            TypeSerializer<K> keySerializer,
            TypeSerializer<N> namespaceSerializer,
            Triggerable<K, N> triggerTarget) {

        if (!isInitialized) {

            if (keySerializer == null || namespaceSerializer == null) {
                throw new IllegalArgumentException("The TimersService serializers cannot be null.");
            }

            if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) {
                throw new IllegalStateException("The TimerService has already been initialized.");
            }

            // the following is the case where we restore
            if (restoredTimersSnapshot != null) {
                CompatibilityResult<K> keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
                    this.keyDeserializer,
                    null,
                    restoredTimersSnapshot.getKeySerializerConfigSnapshot(),
                    keySerializer);

                CompatibilityResult<N> namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
                    this.namespaceDeserializer,
                    null,
                    restoredTimersSnapshot.getNamespaceSerializerConfigSnapshot(),
                    namespaceSerializer);

                if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) {
                    throw new IllegalStateException("Tried to initialize restored TimerService " +
                        "with incompatible serializers than those used to snapshot its state.");
                }
            }

            this.keySerializer = keySerializer;
            this.namespaceSerializer = namespaceSerializer;
            this.keyDeserializer = null;
            this.namespaceDeserializer = null;

            this.triggerTarget = Preconditions.checkNotNull(triggerTarget);

            // re-register the restored timers (if any)
            final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();
            if (headTimer != null) {
                nextTimer = processingTimeService.registerTimer(headTimer.getTimestamp(), this);
            }
            this.isInitialized = true;
        } else {
            if (!(this.keySerializer.equals(keySerializer) && this.namespaceSerializer.equals(namespaceSerializer))) {
                throw new IllegalArgumentException("Already initialized Timer Service " +
                    "tried to be initialized with different key and namespace serializers.");
            }
        }
    }

    @Override
    public long currentProcessingTime() {
        return processingTimeService.getCurrentProcessingTime();
    }

    @Override
    public long currentWatermark() {
        return currentWatermark;
    }

    @Override
    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            // check if we need to re-schedule our timer to earlier
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
                nextTimer = processingTimeService.registerTimer(time, this);
            }
        }
    }

    @Override
    public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

    @Override
    public void deleteProcessingTimeTimer(N namespace, long time) {
        processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

    @Override
    public void deleteEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

    @Override
    public void onProcessingTime(long time) throws Exception {
        // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
        // inside the callback.
        nextTimer = null;

        InternalTimer<K, N> timer;

        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }

        if (timer != null && nextTimer == null) {
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
        }
    }

    public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;

        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }

    public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) {
        return new InternalTimersSnapshot<>(
            keySerializer,
            keySerializer.snapshotConfiguration(),
            namespaceSerializer,
            namespaceSerializer.snapshotConfiguration(),
            eventTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx),
            processingTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx));
    }

    @SuppressWarnings("unchecked")
    public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredSnapshot, int keyGroupIdx) {
        this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredSnapshot;

        if (areSnapshotSerializersIncompatible(restoredSnapshot)) {
            throw new IllegalArgumentException("Tried to restore timers " +
                "for the same service with different serializers.");
        }

        this.keyDeserializer = restoredTimersSnapshot.getKeySerializer();
        this.namespaceDeserializer = restoredTimersSnapshot.getNamespaceSerializer();

        checkArgument(localKeyGroupRange.contains(keyGroupIdx),
            "Key Group " + keyGroupIdx + " does not belong to the local range.");

        // restore the event time timers
        eventTimeTimersQueue.addAll(restoredTimersSnapshot.getEventTimeTimers());

        // restore the processing time timers
        processingTimeTimersQueue.addAll(restoredTimersSnapshot.getProcessingTimeTimers());
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        return this.processingTimeTimersQueue.size();
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        return this.eventTimeTimersQueue.size();
    }

    @VisibleForTesting
    public int numProcessingTimeTimers(N namespace) {
        return countTimersInNamespaceInternal(namespace, processingTimeTimersQueue);
    }

    @VisibleForTesting
    public int numEventTimeTimers(N namespace) {
        return countTimersInNamespaceInternal(namespace, eventTimeTimersQueue);
    }

    private int countTimersInNamespaceInternal(N namespace, InternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) {
        int count = 0;
        try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) {
            while (iterator.hasNext()) {
                final TimerHeapInternalTimer<K, N> timer = iterator.next();
                if (timer.getNamespace().equals(namespace)) {
                    count++;
                }
            }
        } catch (Exception e) {
            throw new FlinkRuntimeException("Exception when closing iterator.", e);
        }
        return count;
    }

    @VisibleForTesting
    int getLocalKeyGroupRangeStartIdx() {
        return this.localKeyGroupRangeStartIdx;
    }

    @VisibleForTesting
    List<Set<TimerHeapInternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() {
        return partitionElementsByKeyGroup(eventTimeTimersQueue);
    }

    @VisibleForTesting
    List<Set<TimerHeapInternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() {
        return partitionElementsByKeyGroup(processingTimeTimersQueue);
    }

    private <T> List<Set<T>> partitionElementsByKeyGroup(KeyGroupedInternalPriorityQueue<T> keyGroupedQueue) {
        List<Set<T>> result = new ArrayList<>(localKeyGroupRange.getNumberOfKeyGroups());
        for (int keyGroup : localKeyGroupRange) {
            result.add(Collections.unmodifiableSet(keyGroupedQueue.getSubsetForKeyGroup(keyGroup)));
        }
        return result;
    }

    private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> restoredSnapshot) {
        return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
            (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer()));
    }
}
  • InternalTimerServiceImpl implements InternalTimerService and ProcessingTimeCallback (OnProcessingTime method is defined) interface
  • The startTimerService method mainly initializes keySerializer, namespaceSerializer, triggerTarget attributes; RegisterEventTimeTimer and deleteEventTimeTimer methods use eventTimeTimersQueue;; RegisterProcessingTimeTimer and deleteProcessingTimeTimer methods use processingTimeTimersQueue (The types of eventTimeTimersQueue and processingTimeTimersQueue are keygroupedinternalpriorityqueue, and the element type of queue is TimerHeapInternalTimer)
  • The trigger of eventTimerTimer is mainly in the advanceWatermark method (The processWatermark method of AbstractStreamOperator calls the advanceWatermark method of InternalTimeServiceManager, which calls the advanceWatermark method of InternalTimerServiceImpl), it removes eventTimerTimer whose timestamp is less than or equal to the specified time, and then calls back the triggerTarget.onEventTime method; The trigger of processingTimeTimer is in onProcessingTime method (The TriggerTask of SystemProcessingTimeService and the Timer Task of RepeatedTriggerTask will call back the onProcessingTime method of ProcessingTimeCallback.), it removes processingTimeTimer whose timestamp is less than or equal to the specified time, and then calls back the triggerTarget.onProcessingTime method

Triggerable

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/Triggerable.java

@Internal
public interface Triggerable<K, N> {

    /**
     * Invoked when an event-time timer fires.
     */
    void onEventTime(InternalTimer<K, N> timer) throws Exception;

    /**
     * Invoked when a processing-time timer fires.
     */
    void onProcessingTime(InternalTimer<K, N> timer) throws Exception;
}
  • Triggerable interface defines onEventTime and onProcessingTime methods that InternalTimerService will call; Operators such as WindowOperator, IntervalJoinOperator, KeyedProcessor, KeyedCoProcessOperator, etc. have implemented Triggerable interface, which can respond to timer’s onEventTime or onProcessingTime callback.

Summary

  • TimerService interface defines currentProcessingTime, currentWatermark, registerProcessingTimeTimer, registerEventTimeTimer, deleteProcessingTimeTimer, deleteEventTimeTimer interface; One of its implementation classes is SimpleTimerService, which is mainly implemented by delegation to InternalTimerService.
  • InternalTimerService is the interface of the internal version of TimerService. Compared with TimerService, it defines namespace. In the methods of registerProcessingTimeTimer, deleteProcessingTimeTimer, registerEventTimeTimer and deleteEventTimeTimer, there is one more parameter named SAPCE. Its implementation class is InternalTimerServiceImpl;; InternalTimerServiceImpl implements InternalTimerService and ProcessingTimeCallback (OnProcessingTime method is defined) interface, whose registerEventTimeTimer and deleteEventTimeTimer methods use eventTimeTimersQueue;; RegisterProcessingTimeTimer and deleteProcessingTimeTimer methods use processingTimeTimersQueue (The types of eventTimeTimersQueue and processingTimeTimersQueue are keygroupedinternalpriorityqueue, and the element type of queue is TimerHeapInternalTimer)
  • The trigger of eventTimerTimer of InternalTimerServiceImpl is mainly in the advanceWatermark method (The processWatermark method of AbstractStreamOperator calls the advanceWatermark method of InternalTimeServiceManager, which calls the advanceWatermark method of InternalTimerServiceImpl), it removes eventTimerTimer whose timestamp is less than or equal to the specified time, and then calls back the triggerTarget.onEventTime method.
  • The trigger of processingTimeTimer of InternalTimerServiceImpl is in onProcessingTime method (The TriggerTask of SystemProcessingTimeService and the Timer Task of RepeatedTriggerTask will call back the onProcessingTime method of ProcessingTimeCallback.), it removes processingTimeTimer whose timestamp is less than or equal to the specified time, and then calls back the triggerTarget.onProcessingTime method
  • Triggerable interface defines onEventTime and onProcessingTime methods that InternalTimerService will call; Operators such as WindowOperator, IntervalJoinOperator, KeyedProcessor, KeyedCoProcessOperator, etc. have implemented Triggerable interface, which can respond to timer’s onEventTime or onProcessingTime callback.

doc