Talk about flink’s StateTtlConfig

  flink

Order

This article mainly studies flink’s StateTtlConfig

Example

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
  • Here StateTtlConfig is created with builder, and then the config is passed through StateDescriptor’s enableTimeToLive method.

StateTtlConfig

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/StateTtlConfig.java

/**
 * Configuration of state TTL logic.
 *
 * <p>Note: The map state with TTL currently supports {@code null} user values
 * only if the user value serializer can handle {@code null} values.
 * If the serializer does not support {@code null} values,
 * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
 * at the cost of an extra byte in the serialized form.
 */
public class StateTtlConfig implements Serializable {

    private static final long serialVersionUID = -7592693245044289793L;

    public static final StateTtlConfig DISABLED =
        newBuilder(Time.milliseconds(Long.MAX_VALUE)).setUpdateType(UpdateType.Disabled).build();

    /**
     * This option value configures when to update last access timestamp which prolongs state TTL.
     */
    public enum UpdateType {
        /** TTL is disabled. State does not expire. */
        Disabled,
        /** Last access timestamp is initialised when state is created and updated on every write operation. */
        OnCreateAndWrite,
        /** The same as <code>OnCreateAndWrite</code> but also updated on read. */
        OnReadAndWrite
    }

    /**
     * This option configures whether expired user value can be returned or not.
     */
    public enum StateVisibility {
        /** Return expired user value if it is not cleaned up yet. */
        ReturnExpiredIfNotCleanedUp,
        /** Never return expired user value. */
        NeverReturnExpired
    }

    /**
     * This option configures time scale to use for ttl.
     */
    public enum TimeCharacteristic {
        /** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
        ProcessingTime
    }

    private final UpdateType updateType;
    private final StateVisibility stateVisibility;
    private final TimeCharacteristic timeCharacteristic;
    private final Time ttl;
    private final CleanupStrategies cleanupStrategies;

    private StateTtlConfig(
        UpdateType updateType,
        StateVisibility stateVisibility,
        TimeCharacteristic timeCharacteristic,
        Time ttl,
        CleanupStrategies cleanupStrategies) {
        this.updateType = Preconditions.checkNotNull(updateType);
        this.stateVisibility = Preconditions.checkNotNull(stateVisibility);
        this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic);
        this.ttl = Preconditions.checkNotNull(ttl);
        this.cleanupStrategies = cleanupStrategies;
        Preconditions.checkArgument(ttl.toMilliseconds() > 0,
            "TTL is expected to be positive");
    }

    @Nonnull
    public UpdateType getUpdateType() {
        return updateType;
    }

    @Nonnull
    public StateVisibility getStateVisibility() {
        return stateVisibility;
    }

    @Nonnull
    public Time getTtl() {
        return ttl;
    }

    @Nonnull
    public TimeCharacteristic getTimeCharacteristic() {
        return timeCharacteristic;
    }

    public boolean isEnabled() {
        return updateType != UpdateType.Disabled;
    }

    @Nonnull
    public CleanupStrategies getCleanupStrategies() {
        return cleanupStrategies;
    }

    @Override
    public String toString() {
        return "StateTtlConfig{" +
            "updateType=" + updateType +
            ", stateVisibility=" + stateVisibility +
            ", timeCharacteristic=" + timeCharacteristic +
            ", ttl=" + ttl +
            '}';
    }

    @Nonnull
    public static Builder newBuilder(@Nonnull Time ttl) {
        return new Builder(ttl);
    }

    /**
     * Builder for the {@link StateTtlConfig}.
     */
    public static class Builder {

        private UpdateType updateType = OnCreateAndWrite;
        private StateVisibility stateVisibility = NeverReturnExpired;
        private TimeCharacteristic timeCharacteristic = ProcessingTime;
        private Time ttl;
        private CleanupStrategies cleanupStrategies = new CleanupStrategies();

        public Builder(@Nonnull Time ttl) {
            this.ttl = ttl;
        }

        /**
         * Sets the ttl update type.
         *
         * @param updateType The ttl update type configures when to update last access timestamp which prolongs state TTL.
         */
        @Nonnull
        public Builder setUpdateType(UpdateType updateType) {
            this.updateType = updateType;
            return this;
        }

        @Nonnull
        public Builder updateTtlOnCreateAndWrite() {
            return setUpdateType(UpdateType.OnCreateAndWrite);
        }

        @Nonnull
        public Builder updateTtlOnReadAndWrite() {
            return setUpdateType(UpdateType.OnReadAndWrite);
        }

        /**
         * Sets the state visibility.
         *
         * @param stateVisibility The state visibility configures whether expired user value can be returned or not.
         */
        @Nonnull
        public Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {
            this.stateVisibility = stateVisibility;
            return this;
        }

        @Nonnull
        public Builder returnExpiredIfNotCleanedUp() {
            return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);
        }

        @Nonnull
        public Builder neverReturnExpired() {
            return setStateVisibility(StateVisibility.NeverReturnExpired);
        }

        /**
         * Sets the time characteristic.
         *
         * @param timeCharacteristic The time characteristic configures time scale to use for ttl.
         */
        @Nonnull
        public Builder setTimeCharacteristic(@Nonnull TimeCharacteristic timeCharacteristic) {
            this.timeCharacteristic = timeCharacteristic;
            return this;
        }

        @Nonnull
        public Builder useProcessingTime() {
            return setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        }

        /** Cleanup expired state in full snapshot on checkpoint. */
        @Nonnull
        public Builder cleanupFullSnapshot() {
            cleanupStrategies.strategies.put(
                CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT,
                new CleanupStrategies.CleanupStrategy() {  });
            return this;
        }

        /**
         * Sets the ttl time.
         * @param ttl The ttl time.
         */
        @Nonnull
        public Builder setTtl(@Nonnull Time ttl) {
            this.ttl = ttl;
            return this;
        }

        @Nonnull
        public StateTtlConfig build() {
            return new StateTtlConfig(
                updateType,
                stateVisibility,
                timeCharacteristic,
                ttl,
                cleanupStrategies);
        }
    }

    /**
     * TTL cleanup strategies.
     *
     * <p>This class configures when to cleanup expired state with TTL.
     * By default, state is always cleaned up on explicit read access if found expired.
     * Currently cleanup of state full snapshot can be additionally activated.
     */
    public static class CleanupStrategies implements Serializable {
        private static final long serialVersionUID = -1617740467277313524L;

        /** Fixed strategies ordinals in {@code strategies} config field. */
        enum Strategies {
            FULL_STATE_SCAN_SNAPSHOT
        }

        /** Base interface for cleanup strategies configurations. */
        interface CleanupStrategy extends Serializable {

        }

        final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class);

        public boolean inFullSnapshot() {
            return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
        }
    }
}
  • StateTtlConfig is used to set TTL attribute of state. here, three enumerations are defined, which are UpdateType (Disabled、OnCreateAndWrite、OnReadAndWrite)、StateVisibility(ReturnExpiredIfNotCleanedUp、NeverReturnExpired)、TimeCharacteristic(ProcessingTime)
  • StateTtlConfig defines CleanupStrategies, i.e. TTL state’s cleaning policy, which will be cleaned up when the expired state is read by default. currently, it is additionally provided to clean up when FULL_STATE_SCAN_SNAPSHOT (Clears the expired state in full snapshot when checkpoint point) option
  • StateTtlConfig also provides a Builder for quickly setting UpdateType, StateVisibility, TimeCharacteristic, Time, CleanupStrategies.

AbstractKeyedStateBackend

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/AbstractKeyedStateBackend.java

    /**
     * @see KeyedStateBackend
     */
    @Override
    @SuppressWarnings("unchecked")
    public <N, S extends State, V> S getOrCreateKeyedState(
            final TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, V> stateDescriptor) throws Exception {
        checkNotNull(namespaceSerializer, "Namespace serializer");
        checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
                "This operation cannot use partitioned state.");

        InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(executionConfig);
            }
            kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
                namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
            keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }
        return (S) kvState;
    }
  • In the getOrCreateKeyedState method of AbstractKeyedStateBackend, TTLSTATEFACTORY. CreateStateAndWriteWithTTLifeEnabled is used to create InternalKvState.

TtlStateFactory

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/ttl/TtlStateFactory.java

/**
 * This state factory wraps state objects, produced by backends, with TTL logic.
 */
public class TtlStateFactory<N, SV, S extends State, IS extends S> {
    public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, SV> stateDesc,
        KeyedStateFactory originalStateFactory,
        TtlTimeProvider timeProvider) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer);
        Preconditions.checkNotNull(stateDesc);
        Preconditions.checkNotNull(originalStateFactory);
        Preconditions.checkNotNull(timeProvider);
        return  stateDesc.getTtlConfig().isEnabled() ?
            new TtlStateFactory<N, SV, S, IS>(
                namespaceSerializer, stateDesc, originalStateFactory, timeProvider)
                .createState() :
            originalStateFactory.createInternalState(namespaceSerializer, stateDesc);
    }

    private final Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> stateFactories;

    private final TypeSerializer<N> namespaceSerializer;
    private final StateDescriptor<S, SV> stateDesc;
    private final KeyedStateFactory originalStateFactory;
    private final StateTtlConfig ttlConfig;
    private final TtlTimeProvider timeProvider;
    private final long ttl;

    private TtlStateFactory(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, SV> stateDesc,
        KeyedStateFactory originalStateFactory,
        TtlTimeProvider timeProvider) {
        this.namespaceSerializer = namespaceSerializer;
        this.stateDesc = stateDesc;
        this.originalStateFactory = originalStateFactory;
        this.ttlConfig = stateDesc.getTtlConfig();
        this.timeProvider = timeProvider;
        this.ttl = ttlConfig.getTtl().toMilliseconds();
        this.stateFactories = createStateFactories();
    }

    private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
        return Stream.of(
            Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
            Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
            Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
            Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
            Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState),
            Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState)
        ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    }

    private IS createState() throws Exception {
        SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s",
                stateDesc.getClass(), TtlStateFactory.class);
            throw new FlinkRuntimeException(message);
        }
        return stateFactory.get();
    }

    @SuppressWarnings("unchecked")
    private IS createValueState() throws Exception {
        ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
            stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlValueState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <T> IS createListState() throws Exception {
        ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
        ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
            stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
        return (IS) new TtlListState<>(
            originalStateFactory.createInternalState(
                namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, listStateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <UK, UV> IS createMapState() throws Exception {
        MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
        MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
            stateDesc.getName(),
            mapStateDesc.getKeySerializer(),
            new TtlSerializer<>(mapStateDesc.getValueSerializer()));
        return (IS) new TtlMapState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, mapStateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private IS createReducingState() throws Exception {
        ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
        ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
            stateDesc.getName(),
            new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
            new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlReducingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <IN, OUT> IS createAggregatingState() throws Exception {
        AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
            (AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
        TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
            aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
        AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
            stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlAggregatingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
    }

    @SuppressWarnings({"deprecation", "unchecked"})
    private <T> IS createFoldingState() throws Exception {
        FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
        SV initAcc = stateDesc.getDefaultValue();
        TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
        FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
            stateDesc.getName(),
            ttlInitAcc,
            new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),
            new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlFoldingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    //......
}
  • TtlstateFactory’s CreateStateAndWriteWithttlifeEnabled method here creates a State according to StateDesc.GetTTLConfig (). IsEnabled () . If TTL is turned on, new TtlStateFactory<N, SV, S is called. Is > (namespace serializer, statedesc, originalstatefactory, timeprovider). createstate (), otherwise call originalstatefactory.createinternalstate (namespace serializer, statedesc)
  • Here, createStateFactories creates map of corresponding creation methods for different types of StateDescriptor. During createState, the corresponding SupplierWithException is automatically called according to the specified type, thus omitting the judgment of if else.
  • ValueStateDescriptor corresponds to createValueState method, which creates TtlValueState; ; ListStateDescriptor corresponds to createListState method, which creates TtlListState; ; MapStateDescriptor corresponds to createMapState method, which creates TtlMapState; ; ReducingStateDescriptor corresponds to the createReducingState method, which creates TtlReducingState; ; AggregatingStateDescriptor corresponds to createAggregatingState method, which creates TtlAggregatingState; ; FoldingStateDescriptor corresponds to the createFoldingState method, which creates TtlFoldingState

Summary

  • StateTtlConfig is used to set TTL attribute of state. here, UpdateType, StateVisibility, TimeCharacteristic, Time, CleanupStrategies are mainly set.
  • In the getOrCreateKeyedState method of AbstractKeyedStateBackend, TTLSTATEFACTORY. CreateStateAndWriteWithTTLifeEnabled is used to create InternalKvState.
  • Ttlstate; Factory’s CreateStateAndWriteWithTTLifeEnabled method here creates the corresponding State according to StateDesc. GetTTLConfig (). IsEnabled (); TtlStateFactory’s createState creates ttl state of the corresponding type according to different types of StateDescriptor

doc