[case51] Talk about flink’s StateDescriptor

  flink

Order

This article mainly studies flink’s StateDescriptor

RuntimeContext.getState

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/functions/RuntimeContext.java

/**
 * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
 * of the function will have a context through which it can access static contextual information (such as
 * the current parallelism) and other constructs like accumulators and broadcast variables.
 *
 * <p>A function can, during runtime, obtain the RuntimeContext via a call to
 * {@link AbstractRichFunction#getRuntimeContext()}.
 */
@Public
public interface RuntimeContext {
    //......

    @PublicEvolving
    <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);

    @PublicEvolving
    <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);

    @PublicEvolving
    <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);

    @PublicEvolving
    <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);

    @PublicEvolving
    @Deprecated
    <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);

    @PublicEvolving
    <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);

}
  • The RuntimeContext provides a get method according to the corresponding StateDescriptor for various state, for example, the getState method is provided, and the ValueState; stator parameter is used to obtain the valuestate; GetListState gets ListState; through ListStateDescriptor; GetReducingState obtains ReducingState; through ReducingStateDescriptor; GetAggregatingState acquires AggregatingState; through AggregatingStateDescriptor; GetFoldingState obtains FoldingState; through FoldingStateDescriptor; GetMapState gets MapState through MapStateDescriptor.

StateDescriptor

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/StateDescriptor.java

/**
 * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
 * {@link State} in stateful operations.
 *
 * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
 *
 * @param <S> The type of the State objects created from this {@code StateDescriptor}.
 * @param <T> The type of the value of the state object described by this state descriptor.
 */
@PublicEvolving
public abstract class StateDescriptor<S extends State, T> implements Serializable {

    /**
     * An enumeration of the types of supported states. Used to identify the state type
     * when writing and restoring checkpoints and savepoints.
     */
    // IMPORTANT: Do not change the order of the elements in this enum, ordinal is used in serialization
    public enum Type {
        /**
         * @deprecated Enum for migrating from old checkpoints/savepoint versions.
         */
        @Deprecated
        UNKNOWN,
        VALUE,
        LIST,
        REDUCING,
        FOLDING,
        AGGREGATING,
        MAP
    }

    private static final long serialVersionUID = 1L;

    // ------------------------------------------------------------------------

    /** Name that uniquely identifies state created from this StateDescriptor. */
    protected final String name;

    /** The serializer for the type. May be eagerly initialized in the constructor,
     * or lazily once the {@link #initializeSerializerUnlessSet(ExecutionConfig)} method
     * is called. */
    @Nullable
    protected TypeSerializer<T> serializer;

    /** The type information describing the value type. Only used to if the serializer
     * is created lazily. */
    @Nullable
    private TypeInformation<T> typeInfo;

    /** Name for queries against state created from this StateDescriptor. */
    @Nullable
    private String queryableStateName;

    /** Name for queries against state created from this StateDescriptor. */
    @Nonnull
    private StateTtlConfig ttlConfig = StateTtlConfig.DISABLED;

    /** The default value returned by the state when no other value is bound to a key. */
    @Nullable
    protected transient T defaultValue;

    // ------------------------------------------------------------------------

    /**
     * Create a new {@code StateDescriptor} with the given name and the given type serializer.
     *
     * @param name The name of the {@code StateDescriptor}.
     * @param serializer The type serializer for the values in the state.
     * @param defaultValue The default value that will be set when requesting state without setting
     *                     a value before.
     */
    protected StateDescriptor(String name, TypeSerializer<T> serializer, @Nullable T defaultValue) {
        this.name = checkNotNull(name, "name must not be null");
        this.serializer = checkNotNull(serializer, "serializer must not be null");
        this.defaultValue = defaultValue;
    }

    /**
     * Create a new {@code StateDescriptor} with the given name and the given type information.
     *
     * @param name The name of the {@code StateDescriptor}.
     * @param typeInfo The type information for the values in the state.
     * @param defaultValue The default value that will be set when requesting state without setting
     *                     a value before.
     */
    protected StateDescriptor(String name, TypeInformation<T> typeInfo, @Nullable T defaultValue) {
        this.name = checkNotNull(name, "name must not be null");
        this.typeInfo = checkNotNull(typeInfo, "type information must not be null");
        this.defaultValue = defaultValue;
    }

    /**
     * Create a new {@code StateDescriptor} with the given name and the given type information.
     *
     * <p>If this constructor fails (because it is not possible to describe the type via a class),
     * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
     *
     * @param name The name of the {@code StateDescriptor}.
     * @param type The class of the type of values in the state.
     * @param defaultValue The default value that will be set when requesting state without setting
     *                     a value before.
     */
    protected StateDescriptor(String name, Class<T> type, @Nullable T defaultValue) {
        this.name = checkNotNull(name, "name must not be null");
        checkNotNull(type, "type class must not be null");

        try {
            this.typeInfo = TypeExtractor.createTypeInfo(type);
        } catch (Exception e) {
            throw new RuntimeException(
                    "Could not create the type information for '" + type.getName() + "'. " +
                    "The most common reason is failure to infer the generic type information, due to Java's type erasure. " +
                    "In that case, please pass a 'TypeHint' instead of a class to describe the type. " +
                    "For example, to describe 'Tuple2<String, String>' as a generic type, use " +
                    "'new PravegaDeserializationSchema<>(new TypeHint<Tuple2<String, String>>(){}, serializer);'", e);
        }

        this.defaultValue = defaultValue;
    }

    // ------------------------------------------------------------------------

    /**
     * Returns the name of this {@code StateDescriptor}.
     */
    public String getName() {
        return name;
    }

    /**
     * Returns the default value.
     */
    public T getDefaultValue() {
        if (defaultValue != null) {
            if (serializer != null) {
                return serializer.copy(defaultValue);
            } else {
                throw new IllegalStateException("Serializer not yet initialized.");
            }
        } else {
            return null;
        }
    }

    /**
     * Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
     * Note that the serializer may initialized lazily and is only guaranteed to exist after
     * calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
     */
    public TypeSerializer<T> getSerializer() {
        if (serializer != null) {
            return serializer.duplicate();
        } else {
            throw new IllegalStateException("Serializer not yet initialized.");
        }
    }

    /**
     * Sets the name for queries of state created from this descriptor.
     *
     * <p>If a name is set, the created state will be published for queries
     * during runtime. The name needs to be unique per job. If there is another
     * state instance published under the same name, the job will fail during runtime.
     *
     * @param queryableStateName State name for queries (unique name per job)
     * @throws IllegalStateException If queryable state name already set
     */
    public void setQueryable(String queryableStateName) {
        Preconditions.checkArgument(
            ttlConfig.getUpdateType() == StateTtlConfig.UpdateType.Disabled,
            "Queryable state is currently not supported with TTL");
        if (this.queryableStateName == null) {
            this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name");
        } else {
            throw new IllegalStateException("Queryable state name already set");
        }
    }

    /**
     * Returns the queryable state name.
     *
     * @return Queryable state name or <code>null</code> if not set.
     */
    @Nullable
    public String getQueryableStateName() {
        return queryableStateName;
    }

    /**
     * Returns whether the state created from this descriptor is queryable.
     *
     * @return <code>true</code> if state is queryable, <code>false</code>
     * otherwise.
     */
    public boolean isQueryable() {
        return queryableStateName != null;
    }

    /**
     * Configures optional activation of state time-to-live (TTL).
     *
     * <p>State user value will expire, become unavailable and be cleaned up in storage
     * depending on configured {@link StateTtlConfig}.
     *
     * @param ttlConfig configuration of state TTL
     */
    public void enableTimeToLive(StateTtlConfig ttlConfig) {
        Preconditions.checkNotNull(ttlConfig);
        Preconditions.checkArgument(
            ttlConfig.getUpdateType() != StateTtlConfig.UpdateType.Disabled &&
                queryableStateName == null,
            "Queryable state is currently not supported with TTL");
        this.ttlConfig = ttlConfig;
    }

    @Nonnull
    @Internal
    public StateTtlConfig getTtlConfig() {
        return ttlConfig;
    }

    // ------------------------------------------------------------------------

    /**
     * Checks whether the serializer has been initialized. Serializer initialization is lazy,
     * to allow parametrization of serializers with an {@link ExecutionConfig} via
     * {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
     *
     * @return True if the serializers have been initialized, false otherwise.
     */
    public boolean isSerializerInitialized() {
        return serializer != null;
    }

    /**
     * Initializes the serializer, unless it has been initialized before.
     *
     * @param executionConfig The execution config to use when creating the serializer.
     */
    public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
        if (serializer == null) {
            checkState(typeInfo != null, "no serializer and no type info");

            // instantiate the serializer
            serializer = typeInfo.createSerializer(executionConfig);

            // we can drop the type info now, no longer needed
            typeInfo  = null;
        }
    }

    // ------------------------------------------------------------------------
    //  Standard Utils
    // ------------------------------------------------------------------------

    @Override
    public final int hashCode() {
        return name.hashCode() + 31 * getClass().hashCode();
    }

    @Override
    public final boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        else if (o != null && o.getClass() == this.getClass()) {
            final StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o;
            return this.name.equals(that.name);
        }
        else {
            return false;
        }
    }

    @Override
    public String toString() {
        return getClass().getSimpleName() +
                "{name=" + name +
                ", defaultValue=" + defaultValue +
                ", serializer=" + serializer +
                (isQueryable() ? ", queryableStateName=" + queryableStateName + "" : "") +
                '}';
    }

    public abstract Type getType();

    // ------------------------------------------------------------------------
    //  Serialization
    // ------------------------------------------------------------------------

    private void writeObject(final ObjectOutputStream out) throws IOException {
        // write all the non-transient fields
        out.defaultWriteObject();

        // write the non-serializable default value field
        if (defaultValue == null) {
            // we don't have a default value
            out.writeBoolean(false);
        } else {
            // we have a default value
            out.writeBoolean(true);

            byte[] serializedDefaultValue;
            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) {

                TypeSerializer<T> duplicateSerializer = serializer.duplicate();
                duplicateSerializer.serialize(defaultValue, outView);

                outView.flush();
                serializedDefaultValue = baos.toByteArray();
            }
            catch (Exception e) {
                throw new IOException("Unable to serialize default value of type " +
                        defaultValue.getClass().getSimpleName() + ".", e);
            }

            out.writeInt(serializedDefaultValue.length);
            out.write(serializedDefaultValue);
        }
    }

    private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
        // read the non-transient fields
        in.defaultReadObject();

        // read the default value field
        boolean hasDefaultValue = in.readBoolean();
        if (hasDefaultValue) {
            int size = in.readInt();

            byte[] buffer = new byte[size];

            in.readFully(buffer);

            try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
                    DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {

                defaultValue = serializer.deserialize(inView);
            }
            catch (Exception e) {
                throw new IOException("Unable to deserialize default value.", e);
            }
        } else {
            defaultValue = null;
        }
    }
}
  • StateDescriptor is the base class of ValueStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, FoldingStateDescriptor, AggregatingStateDescriptor, MapStateDescriptor. It defines an abstract method that returns Type (VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAPFor each subclass to express its own Type type
  • StateDescriptor provides several constructors to pass name, TypeSerializer or TypeInformation or Class type information, defaultValue
  • StateDescriptor rewrites the equals and hashCode methods; It also implements the Serializable interface and customizes the serialization process through writeObject and readObject.

Summary

  • The RuntimeContext provides a get method according to the corresponding StateDescriptor for various state, such as getState, getListState, getReducingState, getAggregatingState, getFoldingState, getMapState
  • StateDescriptor is the base class of ValueStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, FoldingStateDescriptor, AggregatingStateDescriptor, MapStateDescriptor. It defines an abstract method that returns Type (VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAPFor each subclass to express its own Type type
  • StateDescriptor rewrites the equals and hashCode methods; It also implements the Serializable interface and customizes the serialization process through writeObject and readObject.

doc