Talk about flink’s AbstractTtlState

  flink

Order

This paper mainly studies flink’s AbstractTtlState

InternalKvState

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/internal/InternalKvState.java

/**
 * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the
 * {@link State} being the root of the public API state hierarchy.
 * 
 * <p>The internal state classes give access to the namespace getters and setters and access to
 * additional functionality, like raw value access or state merging.
 * 
 * <p>The public API state hierarchy is intended to be programmed against by Flink applications.
 * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not
 * intended to be used by user applications. These internal methods are considered of limited use to users and
 * only confusing, and are usually not regarded as stable across releases.
 * 
 * <p>Each specific type in the internal state hierarchy extends the type from the public
 * state hierarchy:
 * 
 * <pre>
 *             State
 *               |
 *               +-------------------InternalKvState
 *               |                         |
 *          MergingState                   |
 *               |                         |
 *               +-----------------InternalMergingState
 *               |                         |
 *      +--------+------+                  |
 *      |               |                  |
 * ReducingState    ListState        +-----+-----------------+
 *      |               |            |                       |
 *      +-----------+   +-----------   -----------------InternalListState
 *                  |                |
 *                  +---------InternalReducingState
 * </pre>
 *
 * @param <K> The type of key the state is associated to
 * @param <N> The type of the namespace
 * @param <V> The type of values kept internally in state
 */
public interface InternalKvState<K, N, V> extends State {

    TypeSerializer<K> getKeySerializer();

    TypeSerializer<N> getNamespaceSerializer();

    TypeSerializer<V> getValueSerializer();

    void setCurrentNamespace(N namespace);

    byte[] getSerializedValue(
            final byte[] serializedKeyAndNamespace,
            final TypeSerializer<K> safeKeySerializer,
            final TypeSerializer<N> safeNamespaceSerializer,
            final TypeSerializer<V> safeValueSerializer) throws Exception;
}
  • The InternalKvState interface defines the methods to be implemented by the internal kvState, mainly getKeySerializer, getNamespaceSerializer, getValueSerializer, setCurrentNamespace, getSerializedValue.

AbstractTtlState

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/ttl/AbstractTtlState.java

/**
 * Base class for TTL logic wrappers of state objects.
 *
 * @param <K> The type of key the state is associated to
 * @param <N> The type of the namespace
 * @param <SV> The type of values kept internally in state without TTL
 * @param <TTLSV> The type of values kept internally in state with TTL
 * @param <S> Type of originally wrapped state object
 */
abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N, TTLSV>>
    extends AbstractTtlDecorator<S>
    implements InternalKvState<K, N, SV> {
    private final TypeSerializer<SV> valueSerializer;

    AbstractTtlState(S original, StateTtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
        super(original, config, timeProvider);
        this.valueSerializer = valueSerializer;
    }

    <SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate(
        SupplierWithException<TtlValue<T>, SE> getter,
        ThrowingConsumer<TtlValue<T>, CE> updater) throws SE, CE {
        return getWithTtlCheckAndUpdate(getter, updater, original::clear);
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return original.getKeySerializer();
    }

    @Override
    public TypeSerializer<N> getNamespaceSerializer() {
        return original.getNamespaceSerializer();
    }

    @Override
    public TypeSerializer<SV> getValueSerializer() {
        return valueSerializer;
    }

    @Override
    public void setCurrentNamespace(N namespace) {
        original.setCurrentNamespace(namespace);
    }

    @Override
    public byte[] getSerializedValue(
        byte[] serializedKeyAndNamespace,
        TypeSerializer<K> safeKeySerializer,
        TypeSerializer<N> safeNamespaceSerializer,
        TypeSerializer<SV> safeValueSerializer) {
        throw new FlinkRuntimeException("Queryable state is not currently supported with TTL.");
    }

    @Override
    public void clear() {
        original.clear();
    }
}
  • AbstractTtlState implements the method of InternalKvState interface and inherits AbstractTtlDecorator; . It provides the getWithttlcheckandupate method, which mainly calls GetWithttlcheckandupate of AbstractTtlDecorator to implement TTL logic

AbstractTtlDecorator

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java

/**
 * Base class for TTL logic wrappers.
 *
 * @param <T> Type of originally wrapped object
 */
abstract class AbstractTtlDecorator<T> {
    /** Wrapped original state handler. */
    final T original;

    final StateTtlConfig config;

    final TtlTimeProvider timeProvider;

    /** Whether to renew expiration timestamp on state read access. */
    final boolean updateTsOnRead;

    /** Whether to renew expiration timestamp on state read access. */
    final boolean returnExpired;

    /** State value time to live in milliseconds. */
    final long ttl;

    AbstractTtlDecorator(
        T original,
        StateTtlConfig config,
        TtlTimeProvider timeProvider) {
        Preconditions.checkNotNull(original);
        Preconditions.checkNotNull(config);
        Preconditions.checkNotNull(timeProvider);
        this.original = original;
        this.config = config;
        this.timeProvider = timeProvider;
        this.updateTsOnRead = config.getUpdateType() == StateTtlConfig.UpdateType.OnReadAndWrite;
        this.returnExpired = config.getStateVisibility() == StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp;
        this.ttl = config.getTtl().toMilliseconds();
    }

    <V> V getUnexpired(TtlValue<V> ttlValue) {
        return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    }

    <V> boolean expired(TtlValue<V> ttlValue) {
        return TtlUtils.expired(ttlValue, ttl, timeProvider);
    }

    <V> TtlValue<V> wrapWithTs(V value) {
        return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp());
    }

    <V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
        return wrapWithTs(ttlValue.getUserValue());
    }

    <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
        SupplierWithException<TtlValue<V>, SE> getter,
        ThrowingConsumer<TtlValue<V>, CE> updater,
        ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
        TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear);
        return ttlValue == null ? null : ttlValue.getUserValue();
    }

    <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
        SupplierWithException<TtlValue<V>, SE> getter,
        ThrowingConsumer<TtlValue<V>, CE> updater,
        ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
        TtlValue<V> ttlValue = getter.get();
        if (ttlValue == null) {
            return null;
        } else if (expired(ttlValue)) {
            stateClear.run();
            if (!returnExpired) {
                return null;
            }
        } else if (updateTsOnRead) {
            updater.accept(rewrapWithNewTs(ttlValue));
        }
        return ttlValue;
    }
}
  • AbstractTtlDecorator encapsulates null logic. Its main logic is in the GetWrapedWithTTLChecandDupdate method, which determines whether it expired (TtlUtils.expired(ttlValue, ttl, timeProvider)), call stateClear (ThrowingRunnable type, here is original::clearFor non-returnExpired, null; is directly returned; If there is no expired, it is determined whether to updateTsOnRead, if so, it is called updater for processing, and finally ttlValue is returned.

TtlUtils.expired

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/ttl/TtlUtils.java

/** Common functions related to State TTL. */
class TtlUtils {
    static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
        return expired(ttlValue, ttl, timeProvider.currentTimestamp());
    }

    static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
        return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
    }

    private static boolean expired(long ts, long ttl, long currentTimestamp) {
        return getExpirationTimestamp(ts, ttl) <= currentTimestamp;
    }

    private static long getExpirationTimestamp(long ts, long ttl) {
        long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl;
        return ts + ttlWithoutOverflow;
    }

    //......
}
  • TtlUtils’ expired method mainly uses getExpirationTimestamp to obtain the expiration time, and then compares it with currentTimestamp. GetExpirationTimestamp here is based on ttl Value. GetLastTaccessTimestamp () and TTL value. long.MAX_VALUE is used here to handle the overflow situation to prevent the final value from exceeding the maximum range of Long type.

ThrowingRunnable

flink-core-1.7.0-sources.jar! /org/apache/flink/util/function/ThrowingRunnable.java

/**
 * Similar to a {@link Runnable}, this interface is used to capture a block of code
 * to be executed. In contrast to {@code Runnable}, this interface allows throwing
 * checked exceptions.
 */
@PublicEvolving
@FunctionalInterface
public interface ThrowingRunnable<E extends Throwable> {

    /**
     * The work method.
     *
     * @throws E Exceptions may be thrown.
     */
    void run() throws E;

    /**
     * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions
     * as unchecked.
     *
     * @param throwingRunnable to convert into a {@link Runnable}
     * @return {@link Runnable} which throws all checked exceptions as unchecked.
     */
    static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
        return () -> {
            try {
                throwingRunnable.run();
            } catch (Throwable t) {
                ExceptionUtils.rethrow(t);
            }
        };
    }
}
  • StateClear is the type of ThrowingRunnable, which is different from Runnable. ThrowingRunnable allows to throw checked exceptions. It provides an unchecked static method for throwing out non-Err or and non-Runtime Exceptions into Runtime Exceptions, thus converting ThrowingRunnable into Runnable

Summary

  • The InternalKvState interface defines the methods to be implemented by the internal kvState, mainly getKeySerializer, getNamespaceSerializer, getValueSerializer, setCurrentNamespace, getSerializedValue.
  • AbstractTtlState implements the method of InternalKvState interface and inherits AbstractTtlDecorator; . It provides the getWithttlcheckandupate method, which mainly calls GetWithttlcheckandupate of AbstractTtlDecorator to implement TTL logic
  • The GetWrapedWithTTLChecandDupdate method of AbstractTtlDecorator null first determine whether to expired (TtlUtils.expired(ttlValue, ttl, timeProvider)), call stateClear (ThrowingRunnable type, here is original::clearFor non-returnExpired, null; is directly returned; If there is no expired, it is determined whether to updateTsOnRead, if so, it is called updater for processing, and finally ttlValue is returned.

doc