Talk about flink’s PartitionableListState

  flink

Order

This article mainly studies flink’s PartitionableListState

PartitionableListState

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

    /**
     * Implementation of operator list state.
     *
     * @param <S> the type of an operator state partition.
     */
    static final class PartitionableListState<S> implements ListState<S> {

        /**
         * Meta information of the state, including state name, assignment mode, and serializer
         */
        private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;

        /**
         * The internal list the holds the elements of the state
         */
        private final ArrayList<S> internalList;

        /**
         * A serializer that allows to perform deep copies of internalList
         */
        private final ArrayListSerializer<S> internalListCopySerializer;

        PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
            this(stateMetaInfo, new ArrayList<S>());
        }

        private PartitionableListState(
                RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo,
                ArrayList<S> internalList) {

            this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
            this.internalList = Preconditions.checkNotNull(internalList);
            this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
        }

        private PartitionableListState(PartitionableListState<S> toCopy) {

            this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
        }

        public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
            this.stateMetaInfo = stateMetaInfo;
        }

        public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() {
            return stateMetaInfo;
        }

        public PartitionableListState<S> deepCopy() {
            return new PartitionableListState<>(this);
        }

        @Override
        public void clear() {
            internalList.clear();
        }

        @Override
        public Iterable<S> get() {
            return internalList;
        }

        @Override
        public void add(S value) {
            Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
            internalList.add(value);
        }

        @Override
        public String toString() {
            return "PartitionableListState{" +
                    "stateMetaInfo=" + stateMetaInfo +
                    ", internalList=" + internalList +
                    '}';
        }

        public long[] write(FSDataOutputStream out) throws IOException {

            long[] partitionOffsets = new long[internalList.size()];

            DataOutputView dov = new DataOutputViewStreamWrapper(out);

            for (int i = 0; i < internalList.size(); ++i) {
                S element = internalList.get(i);
                partitionOffsets[i] = out.getPos();
                getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
            }

            return partitionOffsets;
        }

        @Override
        public void update(List<S> values) {
            internalList.clear();

            addAll(values);
        }

        @Override
        public void addAll(List<S> values) {
            if (values != null && !values.isEmpty()) {
                internalList.addAll(values);
            }
        }
    }
  • PartitionableListState is a ListState implementation used by DefaultOperatorStateBackend, with ArrayList (internalList) to store the state, while stateMetaInfo uses registeredoperatorstatebackkendmetainfo; Its write method serializes the data of the internalList to FSDataOutputStream and returns the offset array corresponding to each record (partitionOffsets)

ListState

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/ListState.java

/**
 * {@link State} interface for partitioned list state in Operations.
 * The state is accessed and modified by user functions, and checkpointed consistently
 * by the system as part of the distributed snapshots.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 *
 * @param <T> Type of values that this list state keeps.
 */
@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {

    /**
     * Updates the operator state accessible by {@link #get()} by updating existing values to
     * to the given list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value will be null.
     *
     * @param values The new values for the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void update(List<T> values) throws Exception;

    /**
     * Updates the operator state accessible by {@link #get()} by adding the given values
     * to existing list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value remains unchanged.
     *
     * @param values The new values to be added to the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void addAll(List<T> values) throws Exception;
}
  • ListState is mainly used for operation to store partitioned list state, which inherits the MergingState interface (Specifies that the generic type of OUT is iteratable < t >), while declaring two methods; Update is used to update the state in full quantity. if the parameter is null or empty, the state will be cleared. The addAll method is used for incremental update. if the parameter is null or empty, it will remain unchanged, otherwise, the given values will be added.

MergingState

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/MergingState.java

/**
 * Extension of {@link AppendingState} that allows merging of state. That is, two instances
 * of {@link MergingState} can be combined into a single instance that contains all the
 * information of the two merged states.
 *
 * @param <IN> Type of the value that can be added to the state.
 * @param <OUT> Type of the value that can be retrieved from the state.
 */
@PublicEvolving
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> { }
  • MergingState interface only inherits AppendingState interface, and the name of the interface indicates that the state supports state merging.

AppendingState

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/AppendingState.java

/**
 * Base interface for partitioned state that supports adding elements and inspecting the current
 * state. Elements can either be kept in a buffer (list-like) or aggregated into one value.
 *
 * <p>The state is accessed and modified by user functions, and checkpointed consistently
 * by the system as part of the distributed snapshots.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 *
 * @param <IN> Type of the value that can be added to the state.
 * @param <OUT> Type of the value that can be retrieved from the state.
 */
@PublicEvolving
public interface AppendingState<IN, OUT> extends State {

    /**
     * Returns the current value for the state. When the state is not
     * partitioned the returned value is the same for all inputs in a given
     * operator instance. If state partitioning is applied, the value returned
     * depends on the current operator input, as the operator maintains an
     * independent state for each partition.
     *
     * <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
     * should return {@code null}.
     *
     * @return The operator state value corresponding to the current input or {@code null}
     * if the state is empty.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    OUT get() throws Exception;

    /**
     * Updates the operator state accessible by {@link #get()} by adding the given value
     * to the list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null is passed in, the state value will remain unchanged.
     *
     * @param value The new value for the state.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    void add(IN value) throws Exception;

}
  • AppendingState is the basic interface of partitioned state. It inherits the State interface and declares two methods: get and add. The get method is u sed to return the value of the current state, or null; if empty; The add method is used to add a value to state

State

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/State.java

/**
 * Interface that different types of partitioned state must implement.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 */
@PublicEvolving
public interface State {

    /**
     * Removes the value mapped under the current key.
     */
    void clear();
}
  • The state interface defines the methods that all different partitioned state implementations must implement, and the clear method is defined here to clear all values of the current State.

RegisteredOperatorStateBackendMetaInfo

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java

/**
 * Compound meta information for a registered state in an operator state backend.
 * This contains the state name, assignment mode, and state partition serializer.
 *
 * @param <S> Type of the state.
 */
public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMetaInfoBase {

    /**
     * The mode how elements in this state are assigned to tasks during restore
     */
    @Nonnull
    private final OperatorStateHandle.Mode assignmentMode;

    /**
     * The type serializer for the elements in the state list
     */
    @Nonnull
    private final TypeSerializer<S> partitionStateSerializer;

    public RegisteredOperatorStateBackendMetaInfo(
            @Nonnull String name,
            @Nonnull TypeSerializer<S> partitionStateSerializer,
            @Nonnull OperatorStateHandle.Mode assignmentMode) {
        super(name);
        this.partitionStateSerializer = partitionStateSerializer;
        this.assignmentMode = assignmentMode;
    }

    private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo<S> copy) {
        this(
            Preconditions.checkNotNull(copy).name,
            copy.partitionStateSerializer.duplicate(),
            copy.assignmentMode);
    }

    @SuppressWarnings("unchecked")
    public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
        this(
            snapshot.getName(),
            (TypeSerializer<S>) Preconditions.checkNotNull(
                snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
            OperatorStateHandle.Mode.valueOf(
                snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
        Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
    }

    /**
     * Creates a deep copy of the itself.
     */
    @Nonnull
    public RegisteredOperatorStateBackendMetaInfo<S> deepCopy() {
        return new RegisteredOperatorStateBackendMetaInfo<>(this);
    }

    @Nonnull
    @Override
    public StateMetaInfoSnapshot snapshot() {
        return computeSnapshot();
    }

    //......

    @Nonnull
    private StateMetaInfoSnapshot computeSnapshot() {
        Map<String, String> optionsMap = Collections.singletonMap(
            StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
            assignmentMode.toString());
        String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
        Map<String, TypeSerializer<?>> serializerMap =
            Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
        Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap =
            Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());

        return new StateMetaInfoSnapshot(
            name,
            StateMetaInfoSnapshot.BackendStateType.OPERATOR,
            optionsMap,
            serializerConfigSnapshotsMap,
            serializerMap);
    }
}
  • RegistereDoperatorStateBackkendmetainfo inherits the abstract class RegisteredStateMetaInfoBase and implements the abstract method of snapshot, which is implemented by the computeSnapshot method. The computeSnapshot method is mainly to construct optionsMap, serializerConfigSnapshotsMap, serializerMap required by StateMetaInfoSnapshot.

Summary

  • Flink’s manageed operator state only supports ListState, while DefaultOperatorStateBackend uses a ListState implementation that is PartitionableListState and an ArrayList (internalList) to store the state, while stateMetaInfo uses registeredoperatorstatebackkendmetainfo.
  • PartitionableListState implements the ListState interface (Update, addAll methods); The ListState interface inherits the MergingState interface (Specifies that the generic type of OUT is iteratable < t >); The MergingState interface does not declare other methods; it inherits the AppendingState interface; The AppendingState interface inherits the State interface and declares the get and add methods at the same time. The State interface defines the clear method
  • RegistereDoperatorStateBackkendmetainfo inherits the abstract class RegisteredStateMetaInfoBase and implements the abstract method of snapshot, which is implemented by the computeSnapshot method. The computeSnapshot method is mainly to construct optionsMap, serializerConfigSnapshotsMap, serializerMap required by StateMetaInfoSnapshot.

doc