Talk about flink’s Managed Keyed State

  flink

Order

This article focuses on flink’s Managed Keyed State.

State

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/State.java

/**
 * Interface that different types of partitioned state must implement.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 */
@PublicEvolving
public interface State {

    /**
     * Removes the value mapped under the current key.
     */
    void clear();
}
  • State is an interface that all different types of State must implement. It defines the clear method

ValueState

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/ValueState.java

@PublicEvolving
public interface ValueState<T> extends State {

    /**
     * Returns the current value for the state. When the state is not
     * partitioned the returned value is the same for all inputs in a given
     * operator instance. If state partitioning is applied, the value returned
     * depends on the current operator input, as the operator maintains an
     * independent state for each partition.
     *
     * <p>If you didn't specify a default value when creating the {@link ValueStateDescriptor}
     * this will return {@code null} when to value was previously set using {@link #update(Object)}.
     *
     * @return The state value corresponding to the current input.
     *
     * @throws IOException Thrown if the system cannot access the state.
     */
    T value() throws IOException;

    /**
     * Updates the operator state accessible by {@link #value()} to the given
     * value. The next time {@link #value()} is called (for the same state
     * partition) the returned state will represent the updated value. When a
     * partitioned state is updated with null, the state for the current key
     * will be removed and the default value is returned on the next access.
     *
     * @param value The new value for the state.
     *
     * @throws IOException Thrown if the system cannot access the state.
     */
    void update(T value) throws IOException;

}
  • ValueState inherits the State interface, which defines two methods: Value and update, one for taking values and the other for updating values

AppendingState

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/AppendingState.java

@PublicEvolving
public interface AppendingState<IN, OUT> extends State {

    /**
     * Returns the current value for the state. When the state is not
     * partitioned the returned value is the same for all inputs in a given
     * operator instance. If state partitioning is applied, the value returned
     * depends on the current operator input, as the operator maintains an
     * independent state for each partition.
     *
     * <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
     * should return {@code null}.
     *
     * @return The operator state value corresponding to the current input or {@code null}
     * if the state is empty.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    OUT get() throws Exception;

    /**
     * Updates the operator state accessible by {@link #get()} by adding the given value
     * to the list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null is passed in, the state value will remain unchanged.
     *
     * @param value The new value for the state.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    void add(IN value) throws Exception;

}
  • AppendINgState inherits the State interface, which defines get and add methods. the State receives in and OUT generics

FoldingState

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/FoldingState.java

@PublicEvolving
@Deprecated
public interface FoldingState<T, ACC> extends AppendingState<T, ACC> {}
  • FoldingState inherits AppendingState, where outgeneric represents ACC, i.e. cumulative value; FoldingState is marked as obsolete in Flink version 1.4 and will be removed later. AggregatingState can be used instead.

MergingState

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/MergingState.java

/**
 * Extension of {@link AppendingState} that allows merging of state. That is, two instances
 * of {@link MergingState} can be combined into a single instance that contains all the
 * information of the two merged states.
 *
 * @param <IN> Type of the value that can be added to the state.
 * @param <OUT> Type of the value that can be retrieved from the state.
 */
@PublicEvolving
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> { }
  • MergingState inherits AppendingState, which is named here to express the meaning of merge state. it has several subinterfaces, namely ListState, ReducingState and AggregatingState.

ListState

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/ListState.java

@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {

    /**
     * Updates the operator state accessible by {@link #get()} by updating existing values to
     * to the given list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value will be null.
     *
     * @param values The new values for the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void update(List<T> values) throws Exception;

    /**
     * Updates the operator state accessible by {@link #get()} by adding the given values
     * to existing list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value remains unchanged.
     *
     * @param values The new values to be added to the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void addAll(List<T> values) throws Exception;
}
  • ListState inherits MergingState and its OUT type is iterable < in >; It is mainly used for operation to store the partitioned list state. it inherits the MergingState interface (specifies OUT’s generics as iterateable < t >), and declares two methods at the same time. Update is used to update the state in full quantity. if the parameter is null or empty, the state will be cleared. The addAll method is used for incremental update. if the parameter is null or empty, it will remain unchanged, otherwise, the given values will be added.

ReducingState

flink-core/1.7.0/flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/ReducingState.java

@PublicEvolving
public interface ReducingState<T> extends MergingState<T, T> {}
  • ReducINgState inherits MergingState and has the same in and OUT types.

AggregatingState

flink-core/1.7.0/flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/AggregatingState.java

@PublicEvolving
public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {}
  • AggregatINgState inherits MergingState, which is different from ReducingState, and in and OUT types can be different.

MapState

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/MapState.java

@PublicEvolving
public interface MapState<UK, UV> extends State {

    /**
     * Returns the current value associated with the given key.
     *
     * @param key The key of the mapping
     * @return The value of the mapping with the given key
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    UV get(UK key) throws Exception;

    /**
     * Associates a new value with the given key.
     *
     * @param key The key of the mapping
     * @param value The new value of the mapping
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    void put(UK key, UV value) throws Exception;

    /**
     * Copies all of the mappings from the given map into the state.
     *
     * @param map The mappings to be stored in this state
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    void putAll(Map<UK, UV> map) throws Exception;

    /**
     * Deletes the mapping of the given key.
     *
     * @param key The key of the mapping
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    void remove(UK key) throws Exception;

    /**
     * Returns whether there exists the given mapping.
     *
     * @param key The key of the mapping
     * @return True if there exists a mapping whose key equals to the given key
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    boolean contains(UK key) throws Exception;

    /**
     * Returns all the mappings in the state.
     *
     * @return An iterable view of all the key-value pairs in the state.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    Iterable<Map.Entry<UK, UV>> entries() throws Exception;

    /**
     * Returns all the keys in the state.
     *
     * @return An iterable view of all the keys in the state.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    Iterable<UK> keys() throws Exception;

    /**
     * Returns all the values in the state.
     *
     * @return An iterable view of all the values in the state.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    Iterable<UV> values() throws Exception;

    /**
     * Iterates over all the mappings in the state.
     *
     * @return An iterator over all the mappings in the state
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
}
  • MapState directly inherits State and receives UK and UV generics, which are the key and value types of map respectively

Summary

  • Flink provides several different types of Managed Keyed State, including valuesstate < t >, ListState<T >, ReducingState<T >, AggregatingState<IN, OUT >, FoldingState<T, ACC >, MapState<UK, UV >
  • ValueState<T > and MapState<UK, UV > are directly inherited State interfaces. FoldingState inherits AppendingState<IN, OUT > (AppendingState directly inherits State.); ListState, ReducingState, AggregatingState inherited MergingState<IN, OUT > (MergingState inherits AppendingState.)
  • FoldingState is marked as obsolete in Flink version 1.4 and will be removed later. AggregatingState can be used instead.

doc