Talk about flink’s InternalTimeServiceManager

  flink

Order

This article mainly studies flink’s InternalTimeServiceManager.

InternalTimeServiceManager

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java

@Internal
public class InternalTimeServiceManager<K> {

    @VisibleForTesting
    static final String TIMER_STATE_PREFIX = "_timer_state";
    @VisibleForTesting
    static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
    @VisibleForTesting
    static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";

    private final KeyGroupRange localKeyGroupRange;
    private final KeyContext keyContext;

    private final PriorityQueueSetFactory priorityQueueSetFactory;
    private final ProcessingTimeService processingTimeService;

    private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;

    private final boolean useLegacySynchronousSnapshots;

    InternalTimeServiceManager(
        KeyGroupRange localKeyGroupRange,
        KeyContext keyContext,
        PriorityQueueSetFactory priorityQueueSetFactory,
        ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {

        this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange);
        this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);
        this.keyContext = Preconditions.checkNotNull(keyContext);
        this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
        this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;

        this.timerServices = new HashMap<>();
    }

    @SuppressWarnings("unchecked")
    public <N> InternalTimerService<N> getInternalTimerService(
        String name,
        TimerSerializer<K, N> timerSerializer,
        Triggerable<K, N> triggerable) {

        InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);

        timerService.startTimerService(
            timerSerializer.getKeySerializer(),
            timerSerializer.getNamespaceSerializer(),
            triggerable);

        return timerService;
    }

    @SuppressWarnings("unchecked")
    <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
        InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
        if (timerService == null) {

            timerService = new InternalTimerServiceImpl<>(
                localKeyGroupRange,
                keyContext,
                processingTimeService,
                createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
                createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));

            timerServices.put(name, timerService);
        }
        return timerService;
    }

    Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {
        return Collections.unmodifiableMap(timerServices);
    }

    private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
        String name,
        TimerSerializer<K, N> timerSerializer) {
        return priorityQueueSetFactory.create(
            name,
            timerSerializer);
    }

    public void advanceWatermark(Watermark watermark) throws Exception {
        for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
            service.advanceWatermark(watermark.getTimestamp());
        }
    }

    //////////////////                Fault Tolerance Methods                ///////////////////

    public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {
        Preconditions.checkState(useLegacySynchronousSnapshots);
        InternalTimerServiceSerializationProxy<K> serializationProxy =
            new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);

        serializationProxy.write(stream);
    }

    public void restoreStateForKeyGroup(
            InputStream stream,
            int keyGroupIdx,
            ClassLoader userCodeClassLoader) throws IOException {

        InternalTimerServiceSerializationProxy<K> serializationProxy =
            new InternalTimerServiceSerializationProxy<>(
                this,
                userCodeClassLoader,
                keyGroupIdx);

        serializationProxy.read(stream);
    }

    ////////////////////            Methods used ONLY IN TESTS                ////////////////////

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        int count = 0;
        for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
            count += timerService.numProcessingTimeTimers();
        }
        return count;
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        int count = 0;
        for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
            count += timerService.numEventTimeTimers();
        }
        return count;
    }
}
  • InternalTimeServiceManager is used to manage timerService to be used by all keyed operators. It uses map to maintain the mapping between the name of ti merService and InternalTimerServiceImpl in memory.
  • The getInternalTimerService method first calls the registerOrGetTimerService method to obtain or create the InternalTimerServiceImpl with the specified name, then calls timerService.startTimerService to initialize and return.
  • The registerOrGetTimerService method first looks up the InternalTimerServiceImpl with the specified name from the map named timerServices, creates one if not, and then puts it into the map named timerServices. When creating InternalTimerServiceImpl, createTimerPriorityQueue is used here to create processingTimeTimersQueue and eventTimeTimersQueue; of KeyGroupEdinteralTimerPriorityQueue type. CreateTimerPriorityQueue was created through priorityQueueSetFactory.

PriorityQueueSetFactory

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/PriorityQueueSetFactory.java

public interface PriorityQueueSetFactory {

    @Nonnull
    <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
        @Nonnull String stateName,
        @Nonnull TypeSerializer<T> byteOrderedElementSerializer);
}
  • PriorityQueueSetFactory defines the create method and creates keygroupedinternalpriorityqueue, in which t’s generic requirement is to inherit or implement the three interfaces of HeapPriorityQueueElement, PriorityComparable and Keyed at the same time

HeapPriorityQueueElement

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java

@Internal
public interface HeapPriorityQueueElement {

    /**
     * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any
     * {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when
     * elements are removed from a {@link HeapPriorityQueue}.
     */
    int NOT_CONTAINED = Integer.MIN_VALUE;

    /**
     * Returns the current index of this object in the internal array of {@link HeapPriorityQueue}.
     */
    int getInternalIndex();

    /**
     * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning
     * {@link HeapPriorityQueue}.
     *
     * @param newIndex the new index in the timer heap.
     */
    void setInternalIndex(int newIndex);
}
  • The HeapPriorityQueueElement interface defines the element types required by HeapPriorityQueue, and it defines the getInternalIndex and setInternalIndex methods

PriorityComparable

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/PriorityComparable.java

public interface PriorityComparable<T> {

    int comparePriorityTo(@Nonnull T other);
}
  • PriorityComparable defines the comparePriorityTo method for comparison based on priority

Keyed

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/Keyed.java

public interface Keyed<K> {

    K getKey();
}
  • The Keyed interface defines the getKey method to return the key of the object

InternalTimer

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/InternalTimer.java

@Internal
public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {

    /** Function to extract the key from a {@link InternalTimer}. */
    KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;

    /** Function to compare instances of {@link InternalTimer}. */
    PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR =
        (left, right) -> Long.compare(left.getTimestamp(), right.getTimestamp());
    /**
     * Returns the timestamp of the timer. This value determines the point in time when the timer will fire.
     */
    long getTimestamp();

    /**
     * Returns the key that is bound to this timer.
     */
    @Nonnull
    @Override
    K getKey();

    /**
     * Returns the namespace that is bound to this timer.
     */
    @Nonnull
    N getNamespace();
}
  • InternalTimer inherits the PriorityComparable and Keyed interfaces. It defines getTimestamp, getKey and getNamespace methods. At the same time, it has built-in Key _ Extrator _ Function and TIMER_COMPARATOR.

TimerHeapInternalTimer

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java

@Internal
public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement {

    /** The key for which the timer is scoped. */
    @Nonnull
    private final K key;

    /** The namespace for which the timer is scoped. */
    @Nonnull
    private final N namespace;

    /** The expiration timestamp. */
    private final long timestamp;

    private transient int timerHeapIndex;

    TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {
        this.timestamp = timestamp;
        this.key = key;
        this.namespace = namespace;
        this.timerHeapIndex = NOT_CONTAINED;
    }

    @Override
    public long getTimestamp() {
        return timestamp;
    }

    @Nonnull
    @Override
    public K getKey() {
        return key;
    }

    @Nonnull
    @Override
    public N getNamespace() {
        return namespace;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }

        if (o instanceof InternalTimer) {
            InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
            return timestamp == timer.getTimestamp()
                && key.equals(timer.getKey())
                && namespace.equals(timer.getNamespace());
        }

        return false;
    }

    @Override
    public int getInternalIndex() {
        return timerHeapIndex;
    }

    @Override
    public void setInternalIndex(int newIndex) {
        this.timerHeapIndex = newIndex;
    }

    void removedFromTimerQueue() {
        setInternalIndex(NOT_CONTAINED);
    }

    @Override
    public int hashCode() {
        int result = (int) (timestamp ^ (timestamp >>> 32));
        result = 31 * result + key.hashCode();
        result = 31 * result + namespace.hashCode();
        return result;
    }

    @Override
    public String toString() {
        return "Timer{" +
                "timestamp=" + timestamp +
                ", key=" + key +
                ", namespace=" + namespace +
                '}';
    }

    @Override
    public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
        return Long.compare(timestamp, other.getTimestamp());
    }
}
  • TimerHeapInternalTimer implements InternalTimer and HeapPriorityQueueElement interfaces. Here, the removedFromTimerQueue interface calls setinternalndex (NOT_CONTAINED), i.e. changes its index to not _ contained, and logically deletes it

HeapPriorityQueueSetFactory

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java

public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {

    @Nonnull
    private final KeyGroupRange keyGroupRange;

    @Nonnegative
    private final int totalKeyGroups;

    @Nonnegative
    private final int minimumCapacity;

    public HeapPriorityQueueSetFactory(
        @Nonnull KeyGroupRange keyGroupRange,
        @Nonnegative int totalKeyGroups,
        @Nonnegative int minimumCapacity) {

        this.keyGroupRange = keyGroupRange;
        this.totalKeyGroups = totalKeyGroups;
        this.minimumCapacity = minimumCapacity;
    }

    @Nonnull
    @Override
    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create(
        @Nonnull String stateName,
        @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {

        return new HeapPriorityQueueSet<>(
            PriorityComparator.forPriorityComparableObjects(),
            KeyExtractorFunction.forKeyedObjects(),
            minimumCapacity,
            keyGroupRange,
            totalKeyGroups);
    }
}
  • HeapPriorityQueueSetFactory implements the PriorityQueueSetFactory interface, and its create method creates HeapPriorityQueueSet

Summary

  • InternalTimeServiceManager is used to manage timerService to be used by all keyed operators. It uses map in memory to maintain the mapping between the name of timerService and InternalTimerServiceImpl; The getInternalTimerService method first calls the registerOrGetTimerService method to obtain or create the InternalTimerServiceImpl with the specified name, then calls timerService.startTimerService to initialize and return.
  • The registerOrGetTimerService method first looks up the InternalTimerServiceImpl with the specified name from the map named timerServices, creates one if not, and then puts it into the map named timerServices. When creating InternalTimerServiceImpl, createTimerPriorityQueue is used here to create processingTimeTimersQueue and eventTimeTimersQueue; of KeyGroupEdinteralTimerPriorityQueue type. CreateTimerPriorityQueue was created through priorityQueueSetFactory.
  • PriorityQueueSetFactory defines the create method, creating keygroupedinternalpriorityqueue, where t’s generic requirement is to inherit or implement the three interfaces of HeapPriorityQueueElement, PriorityComparable, and Keyed (InternalTimer inherits the PriorityComparable and Keyed interfaces. TimerHeapInternalTimer implements the InternalTimer and HeapPriorityQueueElement interfaces); HeapPriorityQueueSetFactory implements the PriorityQueueSetFactory interface, and its create method creates HeapPriorityQueueSet

doc