Talk about flink’s OperatorStateBackend

  flink

Order

This article mainly studies flink’s OperatorStateBackend

OperatorStateBackend

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/OperatorStateBackend.java

/**
 * Interface that combines both, the user facing {@link OperatorStateStore} interface and the system interface
 * {@link Snapshotable}
 *
 */
public interface OperatorStateBackend extends
    OperatorStateStore,
    Snapshotable<SnapshotResult<OperatorStateHandle>, Collection<OperatorStateHandle>>,
    Closeable,
    Disposable {

    @Override
    void dispose();
}
  • The OperatorStateBackend interface inherits the OperatorStateStore, Snapshotable, Closeable, Disposable interfaces.

OperatorStateStore

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/OperatorStateStore.java

/**
 * This interface contains methods for registering operator state with a managed store.
 */
@PublicEvolving
public interface OperatorStateStore {

    <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception;

    <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;

    <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;

    Set<String> getRegisteredStateNames();

    Set<String> getRegisteredBroadcastStateNames();

    // -------------------------------------------------------------------------------------------
    //  Deprecated methods
    // -------------------------------------------------------------------------------------------

    @Deprecated
    <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception;


    @Deprecated
    <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception;
}
  • OperatorStateStore defines getBroadcastState, getListState, getUnionListState methods for create or restore BroadcastState or ListState; ; At the same time, it also defines the names of getRegisteredstateNames and GetRegisteredRoadStateNames for returning the currently registered State.

Snapshotable

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/Snapshotable.java

/**
 * Interface for operators that can perform snapshots of their state.
 *
 * @param <S> Generic type of the state object that is created as handle to snapshots.
 * @param <R> Generic type of the state object that used in restore.
 */
@Internal
public interface Snapshotable<S extends StateObject, R> extends SnapshotStrategy<S> {

    /**
     * Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state
     * handles from which the old state is read.
     *
     * @param state the old state to restore.
     */
    void restore(@Nullable R state) throws Exception;
}
  • The Snapshotable interface inherits the SnapshotStrategy interface and defines the restore method for restore state.

SnapshotStrategy

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/SnapshotStrategy.java

/**
 * Interface for different snapshot approaches in state backends. Implementing classes should ideally be stateless or at
 * least threadsafe, i.e. this is a functional interface and is can be called in parallel by multiple checkpoints.
 *
 * @param <S> type of the returned state object that represents the result of the snapshot operation.
 */
@Internal
public interface SnapshotStrategy<S extends StateObject> {

    /**
     * Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and
     * returns a @{@link RunnableFuture} that gives a state handle to the snapshot. It is up to the implementation if
     * the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed
     * first before obtaining the handle.
     *
     * @param checkpointId      The ID of the checkpoint.
     * @param timestamp         The timestamp of the checkpoint.
     * @param streamFactory     The factory that we can use for writing our state to streams.
     * @param checkpointOptions Options for how to perform this checkpoint.
     * @return A runnable future that will yield a {@link StateObject}.
     */
    @Nonnull
    RunnableFuture<S> snapshot(
        long checkpointId,
        long timestamp,
        @Nonnull CheckpointStreamFactory streamFactory,
        @Nonnull CheckpointOptions checkpointOptions) throws Exception;
}
  • SnapshotStrategy defines the snapshot method and implements it for different snapshot strategies. The type returned by Snapshot results is required to be StateObject type.

AbstractSnapshotStrategy

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/AbstractSnapshotStrategy.java

/**
 * Abstract base class for implementing {@link SnapshotStrategy}, that gives a consistent logging across state backends.
 *
 * @param <T> type of the snapshot result.
 */
public abstract class AbstractSnapshotStrategy<T extends StateObject> implements SnapshotStrategy<SnapshotResult<T>> {

    private static final Logger LOG = LoggerFactory.getLogger(AbstractSnapshotStrategy.class);

    private static final String LOG_SYNC_COMPLETED_TEMPLATE = "{} ({}, synchronous part) in thread {} took {} ms.";
    private static final String LOG_ASYNC_COMPLETED_TEMPLATE = "{} ({}, asynchronous part) in thread {} took {} ms.";

    /** Descriptive name of the snapshot strategy that will appear in the log outputs and {@link #toString()}. */
    @Nonnull
    protected final String description;

    protected AbstractSnapshotStrategy(@Nonnull String description) {
        this.description = description;
    }

    /**
     * Logs the duration of the synchronous snapshot part from the given start time.
     */
    public void logSyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) {
        logCompletedInternal(LOG_SYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime);
    }

    /**
     * Logs the duration of the asynchronous snapshot part from the given start time.
     */
    public void logAsyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) {
        logCompletedInternal(LOG_ASYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime);
    }

    private void logCompletedInternal(
        @Nonnull String template,
        @Nonnull Object checkpointOutDescription,
        long startTime) {

        long duration = (System.currentTimeMillis() - startTime);

        LOG.debug(
            template,
            description,
            checkpointOutDescription,
            Thread.currentThread(),
            duration);
    }

    @Override
    public String toString() {
        return "SnapshotStrategy {" + description + "}";
    }
}
  • Abstractsnapshot is an abstract class. It does not implement the Snapshot method defined by SnapshotStrategy, but only provides logSyncCompleted method to print debug information.

StateObject

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/StateObject.java

/**
 * Base of all handles that represent checkpointed state in some form. The object may hold
 * the (small) state directly, or contain a file path (state is in the file), or contain the
 * metadata to access the state stored in some external database.
 *
 * <p>State objects define how to {@link #discardState() discard state} and how to access the
 * {@link #getStateSize() size of the state}.
 * 
 * <p>State Objects are transported via RPC between <i>JobManager</i> and
 * <i>TaskManager</i> and must be {@link java.io.Serializable serializable} to support that.
 * 
 * <p>Some State Objects are stored in the checkpoint/savepoint metadata. For long-term
 * compatibility, they are not stored via {@link java.io.Serializable Java Serialization},
 * but through custom serializers.
 */
public interface StateObject extends Serializable {

    void discardState() throws Exception;

    long getStateSize();
}
  • StateObject inherits the Serializable interface because it is transferred between JobManager and TaskManager via rpc; This interface defines the discardstate and getStateSize methods. discardState is used to clean up resources, while getStateSize is used to return the size of the State.

StreamStateHandle

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/StreamStateHandle.java

/**
 * A {@link StateObject} that represents state that was written to a stream. The data can be read
 * back via {@link #openInputStream()}.
 */
public interface StreamStateHandle extends StateObject {

    /**
     * Returns an {@link FSDataInputStream} that can be used to read back the data that
     * was previously written to the stream.
     */
    FSDataInputStream openInputStream() throws IOException;
}
  • StreamStateHandle inherits the StateObject interface and defines many openInputStream methods.

OperatorStateHandle

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/OperatorStateHandle.java

/**
 * Interface of a state handle for operator state.
 */
public interface OperatorStateHandle extends StreamStateHandle {

    /**
     * Returns a map of meta data for all contained states by their name.
     */
    Map<String, StateMetaInfo> getStateNameToPartitionOffsets();

    /**
     * Returns an input stream to read the operator state information.
     */
    @Override
    FSDataInputStream openInputStream() throws IOException;

    /**
     * Returns the underlying stream state handle that points to the state data.
     */
    StreamStateHandle getDelegateStateHandle();

    //......
}
  • OperatorStateHandle inherits StreamStateHandle, which mostly defines Getstate nameOpartitionoffset and getDelegateStateHandle methods, where GetStateNameOpartitionOffsets provides mapping information from StateName to Offsets of available partitions

OperatorStreamStateHandle

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/OperatorStreamStateHandle.java

/**
 * State handle for partitionable operator state. Besides being a {@link StreamStateHandle}, this also provides a
 * map that contains the offsets to the partitions of named states in the stream.
 */
public class OperatorStreamStateHandle implements OperatorStateHandle {

    private static final long serialVersionUID = 35876522969227335L;

    /**
     * unique state name -> offsets for available partitions in the handle stream
     */
    private final Map<String, StateMetaInfo> stateNameToPartitionOffsets;
    private final StreamStateHandle delegateStateHandle;

    public OperatorStreamStateHandle(
            Map<String, StateMetaInfo> stateNameToPartitionOffsets,
            StreamStateHandle delegateStateHandle) {

        this.delegateStateHandle = Preconditions.checkNotNull(delegateStateHandle);
        this.stateNameToPartitionOffsets = Preconditions.checkNotNull(stateNameToPartitionOffsets);
    }

    @Override
    public Map<String, StateMetaInfo> getStateNameToPartitionOffsets() {
        return stateNameToPartitionOffsets;
    }

    @Override
    public void discardState() throws Exception {
        delegateStateHandle.discardState();
    }

    @Override
    public long getStateSize() {
        return delegateStateHandle.getStateSize();
    }

    @Override
    public FSDataInputStream openInputStream() throws IOException {
        return delegateStateHandle.openInputStream();
    }

    @Override
    public StreamStateHandle getDelegateStateHandle() {
        return delegateStateHandle;
    }

    //......
}
  • OperatorStreamStateHandle implements the OperatorStateHandle interface, which defines the stateNameToPartitionOffsets property (Map<String, StateMetaInfo>), and the getStateNameToPartitionOffsets method is the returned stateNameToPartitionOffsets property

SnapshotResult

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/SnapshotResult.java

/**
 * This class contains the combined results from the snapshot of a state backend:
 * <ul>
 *   <li>A state object representing the state that will be reported to the Job Manager to acknowledge the checkpoint.</li>
 *   <li>A state object that represents the state for the {@link TaskLocalStateStoreImpl}.</li>
 * </ul>
 *
 * Both state objects are optional and can be null, e.g. if there was no state to snapshot in the backend. A local
 * state object that is not null also requires a state to report to the job manager that is not null, because the
 * Job Manager always owns the ground truth about the checkpointed state.
 */
public class SnapshotResult<T extends StateObject> implements StateObject {

    private static final long serialVersionUID = 1L;

    /** An singleton instance to represent an empty snapshot result. */
    private static final SnapshotResult<?> EMPTY = new SnapshotResult<>(null, null);

    /** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */
    private final T jobManagerOwnedSnapshot;

    /** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */
    private final T taskLocalSnapshot;

    /**
     * Creates a {@link SnapshotResult} for the given jobManagerOwnedSnapshot and taskLocalSnapshot. If the
     * jobManagerOwnedSnapshot is null, taskLocalSnapshot must also be null.
     *
     * @param jobManagerOwnedSnapshot Snapshot for report to job manager. Can be null.
     * @param taskLocalSnapshot Snapshot for report to local state manager. This is optional and requires
     *                             jobManagerOwnedSnapshot to be not null if this is not also null.
     */
    private SnapshotResult(T jobManagerOwnedSnapshot, T taskLocalSnapshot) {

        if (jobManagerOwnedSnapshot == null && taskLocalSnapshot != null) {
            throw new IllegalStateException("Cannot report local state snapshot without corresponding remote state!");
        }

        this.jobManagerOwnedSnapshot = jobManagerOwnedSnapshot;
        this.taskLocalSnapshot = taskLocalSnapshot;
    }

    public T getJobManagerOwnedSnapshot() {
        return jobManagerOwnedSnapshot;
    }

    public T getTaskLocalSnapshot() {
        return taskLocalSnapshot;
    }

    @Override
    public void discardState() throws Exception {

        Exception aggregatedExceptions = null;

        if (jobManagerOwnedSnapshot != null) {
            try {
                jobManagerOwnedSnapshot.discardState();
            } catch (Exception remoteDiscardEx) {
                aggregatedExceptions = remoteDiscardEx;
            }
        }

        if (taskLocalSnapshot != null) {
            try {
                taskLocalSnapshot.discardState();
            } catch (Exception localDiscardEx) {
                aggregatedExceptions = ExceptionUtils.firstOrSuppressed(localDiscardEx, aggregatedExceptions);
            }
        }

        if (aggregatedExceptions != null) {
            throw aggregatedExceptions;
        }
    }

    @Override
    public long getStateSize() {
        return jobManagerOwnedSnapshot != null ? jobManagerOwnedSnapshot.getStateSize() : 0L;
    }

    @SuppressWarnings("unchecked")
    public static <T extends StateObject> SnapshotResult<T> empty() {
        return (SnapshotResult<T>) EMPTY;
    }

    public static <T extends StateObject> SnapshotResult<T> of(@Nullable T jobManagerState) {
        return jobManagerState != null ? new SnapshotResult<>(jobManagerState, null) : empty();
    }

    public static <T extends StateObject> SnapshotResult<T> withLocalState(
        @Nonnull T jobManagerState,
        @Nonnull T localState) {
        return new SnapshotResult<>(jobManagerState, localState);
    }
}
  • The snapshotResult class implements the StateObject interface, which wraps the Snapshot results, including jobManagerOwnedSnapshot, taskLocalSnapshot; ; The discardState method it implements calls the discardState methods of jobManagerOwnedSnapshot and taskLocalSnapshot. The getStateSize method returns the stateSize of jobManagerOwnedSnapshot.

DefaultOperatorStateBackend

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

/**
 * Default implementation of OperatorStateStore that provides the ability to make snapshots.
 */
@Internal
public class DefaultOperatorStateBackend implements OperatorStateBackend {
    
    private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);

    /**
     * The default namespace for state in cases where no state name is provided
     */
    public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";

    /**
     * Map for all registered operator states. Maps state name -> state
     */
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;

    /**
     * Map for all registered operator broadcast states. Maps state name -> state
     */
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;

    /**
     * CloseableRegistry to participate in the tasks lifecycle.
     */
    private final CloseableRegistry closeStreamOnCancelRegistry;

    /**
     * Default serializer. Only used for the default operator state.
     */
    private final JavaSerializer<Serializable> javaSerializer;

    /**
     * The user code classloader.
     */
    private final ClassLoader userClassloader;

    /**
     * The execution configuration.
     */
    private final ExecutionConfig executionConfig;

    /**
     * Flag to de/activate asynchronous snapshots.
     */
    private final boolean asynchronousSnapshots;

    /**
     * Map of state names to their corresponding restored state meta info.
     *
     * <p>TODO this map can be removed when eager-state registration is in place.
     * TODO we currently need this cached to check state migration strategies when new serializers are registered.
     */
    private final Map<String, StateMetaInfoSnapshot> restoredOperatorStateMetaInfos;

    /**
     * Map of state names to their corresponding restored broadcast state meta info.
     */
    private final Map<String, StateMetaInfoSnapshot> restoredBroadcastStateMetaInfos;

    /**
     * Cache of already accessed states.
     *
     * <p>In contrast to {@link #registeredOperatorStates} and {@link #restoredOperatorStateMetaInfos} which may be repopulated
     * with restored state, this map is always empty at the beginning.
     *
     * <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends.
     *
     * @see <a href="https://issues.apache.org/jira/browse/FLINK-6849">FLINK-6849</a>
     */
    private final HashMap<String, PartitionableListState<?>> accessedStatesByName;

    private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName;

    private final AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy;

    public DefaultOperatorStateBackend(
        ClassLoader userClassLoader,
        ExecutionConfig executionConfig,
        boolean asynchronousSnapshots) {

        this.closeStreamOnCancelRegistry = new CloseableRegistry();
        this.userClassloader = Preconditions.checkNotNull(userClassLoader);
        this.executionConfig = executionConfig;
        this.javaSerializer = new JavaSerializer<>();
        this.registeredOperatorStates = new HashMap<>();
        this.registeredBroadcastStates = new HashMap<>();
        this.asynchronousSnapshots = asynchronousSnapshots;
        this.accessedStatesByName = new HashMap<>();
        this.accessedBroadcastStatesByName = new HashMap<>();
        this.restoredOperatorStateMetaInfos = new HashMap<>();
        this.restoredBroadcastStateMetaInfos = new HashMap<>();
        this.snapshotStrategy = new DefaultOperatorStateBackendSnapshotStrategy();
    }

    @Override
    public Set<String> getRegisteredStateNames() {
        return registeredOperatorStates.keySet();
    }

    @Override
    public Set<String> getRegisteredBroadcastStateNames() {
        return registeredBroadcastStates.keySet();
    }

    @Override
    public void close() throws IOException {
        closeStreamOnCancelRegistry.close();
    }

    @Override
    public void dispose() {
        IOUtils.closeQuietly(closeStreamOnCancelRegistry);
        registeredOperatorStates.clear();
        registeredBroadcastStates.clear();
    }

    // -------------------------------------------------------------------------------------------
    //  State access methods
    // -------------------------------------------------------------------------------------------

    @SuppressWarnings("unchecked")
    @Override
    public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
        //......
    }

    @Override
    public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    @Override
    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
    }

    @Nonnull
    @Override
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
        long checkpointId,
        long timestamp,
        @Nonnull CheckpointStreamFactory streamFactory,
        @Nonnull CheckpointOptions checkpointOptions) throws Exception {

        long syncStartTime = System.currentTimeMillis();

        RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner =
            snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);

        snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime);
        return snapshotRunner;
    }

    //......
}
  • DefaultOperatorStateBackend implements the OperatorStateBackend interface
  • The getRegisteredStateNames method returned RegisterDoperatorstates.keyset (); The getRegisterDbRoadcastStateNames method returns RegisterDbRoadcastStatues. KeySet (), which can be seen that both are implemented by memory-based Map
  • The close method is mainly to call the close method of closeStreamOnCancelRegistry; The dispose method also closes closeStreamOnCancelRegistry and empties registeredOperatorStates and registeredBroadcastStates.
  • The getListState and getUnionListState methods both call the getliststate (liststatedescriptor < s > statedescriptor, operatorstatehandle.modemode) method.
  • The snapshotStrategy used by the snapshot method is defaultoperatorstatebackkendsnapshotstrategy

DefaultOperatorStateBackend.getListState

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

    private <S> ListState<S> getListState(
            ListStateDescriptor<S> stateDescriptor,
            OperatorStateHandle.Mode mode) throws StateMigrationException {

        Preconditions.checkNotNull(stateDescriptor);
        String name = Preconditions.checkNotNull(stateDescriptor.getName());

        @SuppressWarnings("unchecked")
        PartitionableListState<S> previous = (PartitionableListState<S>) accessedStatesByName.get(name);
        if (previous != null) {
            checkStateNameAndMode(
                    previous.getStateMetaInfo().getName(),
                    name,
                    previous.getStateMetaInfo().getAssignmentMode(),
                    mode);
            return previous;
        }

        // end up here if its the first time access after execution for the
        // provided state name; check compatibility of restored state, if any
        // TODO with eager registration in place, these checks should be moved to restore()

        stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
        TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());

        @SuppressWarnings("unchecked")
        PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredOperatorStates.get(name);

        if (null == partitionableListState) {
            // no restored state for the state name; simply create new state holder

            partitionableListState = new PartitionableListState<>(
                new RegisteredOperatorStateBackendMetaInfo<>(
                    name,
                    partitionStateSerializer,
                    mode));

            registeredOperatorStates.put(name, partitionableListState);
        } else {
            // has restored state; check compatibility of new state access

            checkStateNameAndMode(
                    partitionableListState.getStateMetaInfo().getName(),
                    name,
                    partitionableListState.getStateMetaInfo().getAssignmentMode(),
                    mode);

            StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name);
            RegisteredOperatorStateBackendMetaInfo<S> metaInfo =
                new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);

            // check compatibility to determine if state migration is required
            TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();

            @SuppressWarnings("unchecked")
            TypeSerializerSnapshot<S> stateSerializerSnapshot = Preconditions.checkNotNull(
                (TypeSerializerSnapshot<S>) restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));

            TypeSerializerSchemaCompatibility<S> stateCompatibility =
                stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);
            if (stateCompatibility.isIncompatible()) {
                throw new StateMigrationException("The new state serializer for operator state must not be incompatible.");
            }

            partitionableListState.setStateMetaInfo(
                new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
        }

        accessedStatesByName.put(name, partitionableListState);
        return partitionableListState;
    }
  • Get the corresponding PartitionableListState from registeredOperatorStates, create it if not, check the compatibility if not, and then set stateMetaInfo to partitionableListState

DefaultOperatorStateBackendSnapshotStrategy

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

    /**
     * Snapshot strategy for this backend.
     */
    private class DefaultOperatorStateBackendSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {

        protected DefaultOperatorStateBackendSnapshotStrategy() {
            super("DefaultOperatorStateBackend snapshot");
        }

        @Nonnull
        @Override
        public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
            final long checkpointId,
            final long timestamp,
            @Nonnull final CheckpointStreamFactory streamFactory,
            @Nonnull final CheckpointOptions checkpointOptions) throws IOException {

            if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
                return DoneFuture.of(SnapshotResult.empty());
            }

            final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
                new HashMap<>(registeredOperatorStates.size());
            final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
                new HashMap<>(registeredBroadcastStates.size());

            ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(userClassloader);
            try {
                // eagerly create deep copies of the list and the broadcast states (if any)
                // in the synchronous phase, so that we can use them in the async writing.

                if (!registeredOperatorStates.isEmpty()) {
                    for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
                        PartitionableListState<?> listState = entry.getValue();
                        if (null != listState) {
                            listState = listState.deepCopy();
                        }
                        registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
                    }
                }

                if (!registeredBroadcastStates.isEmpty()) {
                    for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
                        BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
                        if (null != broadcastState) {
                            broadcastState = broadcastState.deepCopy();
                        }
                        registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
                    }
                }
            } finally {
                Thread.currentThread().setContextClassLoader(snapshotClassLoader);
            }

            AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
                new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {

                    @Override
                    protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {

                        CheckpointStreamFactory.CheckpointStateOutputStream localOut =
                            streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
                        registerCloseableForCancellation(localOut);

                        // get the registered operator state infos ...
                        List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =
                            new ArrayList<>(registeredOperatorStatesDeepCopies.size());

                        for (Map.Entry<String, PartitionableListState<?>> entry :
                            registeredOperatorStatesDeepCopies.entrySet()) {
                            operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                        }

                        // ... get the registered broadcast operator state infos ...
                        List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =
                            new ArrayList<>(registeredBroadcastStatesDeepCopies.size());

                        for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                            registeredBroadcastStatesDeepCopies.entrySet()) {
                            broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                        }

                        // ... write them all in the checkpoint stream ...
                        DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

                        OperatorBackendSerializationProxy backendSerializationProxy =
                            new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);

                        backendSerializationProxy.write(dov);

                        // ... and then go for the states ...

                        // we put BOTH normal and broadcast state metadata here
                        int initialMapCapacity =
                            registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
                        final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
                            new HashMap<>(initialMapCapacity);

                        for (Map.Entry<String, PartitionableListState<?>> entry :
                            registeredOperatorStatesDeepCopies.entrySet()) {

                            PartitionableListState<?> value = entry.getValue();
                            long[] partitionOffsets = value.write(localOut);
                            OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                            writtenStatesMetaData.put(
                                entry.getKey(),
                                new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                        }

                        // ... and the broadcast states themselves ...
                        for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                            registeredBroadcastStatesDeepCopies.entrySet()) {

                            BackendWritableBroadcastState<?, ?> value = entry.getValue();
                            long[] partitionOffsets = {value.write(localOut)};
                            OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                            writtenStatesMetaData.put(
                                entry.getKey(),
                                new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                        }

                        // ... and, finally, create the state handle.
                        OperatorStateHandle retValue = null;

                        if (unregisterCloseableFromCancellation(localOut)) {

                            StreamStateHandle stateHandle = localOut.closeAndGetHandle();

                            if (stateHandle != null) {
                                retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
                            }

                            return SnapshotResult.of(retValue);
                        } else {
                            throw new IOException("Stream was already unregistered.");
                        }
                    }

                    @Override
                    protected void cleanupProvidedResources() {
                        // nothing to do
                    }

                    @Override
                    protected void logAsyncSnapshotComplete(long startTime) {
                        if (asynchronousSnapshots) {
                            logAsyncCompleted(streamFactory, startTime);
                        }
                    }
                };

            final FutureTask<SnapshotResult<OperatorStateHandle>> task =
                snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);

            if (!asynchronousSnapshots) {
                task.run();
            }

            return task;
        }
    }
  • DefaultOperatorStateBackkendsNapshotStrategy inherits AbstractsnapshotStrategy. The Snapshot method it implements is mainly to create RegisterDoperator sStatesDeepcopies and RegisterDbRoadStatesDeepcopies, and then to implement them through AsyncSnapshotCallable.
  • The AsyncSnapshotCallable abstract class implements the call method of the Callable interface, which calls the callInternal method and then executes the logAsyncSnapshotComplete method
  • AsyncSnapshotCallable’s callInternal method returns snapshot result < operatorstatehandle >, which mainly writes the data of registerdoperatorstatesdeepcopies and registerdbroadstatesdeepcopies into CheckpointStreamFactory (For example, MemCheckpointStreamFactory). CheckpointStateOutputStream and WritenstateStateData, and finally create OperatorStreamStateHandle return through CheckpointStateOutputStream’s closeAndGetHandle and WritenstateStateData

Summary

  • The OperatorStateBackend interface inherits the OperatorStateStore, Snapshotable, Closeable, Disposable interfaces.
  • OperatorStateStore defines getBroadcastState, getListState, getUnionListState methods for create or restore BroadcastState or ListState;; At the same time, it also defines the names of getRegisteredstateNames and GetRegisteredRoadStateNames used to return the currently registered State. DefaultOperatorStateBackend implements the OperatorStateStore interface, and the getRegisteredStateNames method returns RegisterDoperatorStates. KeySet (); The getregisterdbroadcaststatenames method returns registerdbroadcaststates.keyset () (RegisteredOperatorStates and registeredBroadcastStates are both Map of memory); The getListState and getUnionListState methods both call the getliststate (liststatedescriptor < s > statedescriptor, operatorstatehandle.modemode) method.
  • The Snapshotable interface inherits the SnapshotStrategy interface and defines the restore method to be used in the restore state; . SnapshotStrategy defines the snapshot method and implements it for different snapshot strategies. The type returned by Snapshot results is required to be StateObject type. Abstractsnapshot is an abstract class. It does not implement the Snapshot method defined by SnapshotStrategy, but only provides logSyncCompleted method to print debug information.
  • DefaultOperatorStateBackend implements the snapshotable interface. The snapshotStrategy used by the Snapshot method is DefaultPerAtorstateBackedSnapshotStrategy; DefaultPeratorStateBackkendsNapshotStrategy inherited AbstractSnapshotStrategy. The snapshot method implemented by it is mainly to create RegisterDoperatorsStatesDeepCopies and RegisterDbRoadcastStatesDeepCopies, which are then implemented by AsyncSnapshotCallable. It mainly writes the data of RegisterDoperatorsStatesDeepCopies and RegisterDbRoadcastStatesDeepCopies into CheckpointStreamFactory (For example, MemCheckpointStreamFactory). CheckpointStateOutputStream and writtenStatesMetaData
  • The Snapshotable interface requires that the source’s generic is of type StateObject, which inherits the Serializable interface because it will be transferred between JobManager and TaskManager through rpc. When OperatorStateBackend in herits the Snapshotable interface, the source is specified as snapshotresult < operatorstatehandle >, and the result is of type Collection<OperatorStateHandle >
  • StreamStateHandle inherits the StateObject interface and defines the openInputStream method. OperatorStateHandle inherits StreamStateHandle, which mostly defines Getstate nameOpartitionoffset and getDelegateStateHandle methods, where GetStateNameOpartitionOffsets provides mapping information from StateName to Offsets of available partitions; OperatorStreamStateHandle implements the OperatorStateHandle interface, which defines the stateNameToPartitionOffsets property (Map<String,StateMetaInfo>), and the getStateNameToPartitionOffsets method is the returned stateNameToPartitionOffsets property
  • The snapshotResult class implements the StateObject interface, which wraps the Snapshot results, including jobManagerOwnedSnapshot, taskLocalSnapshot; ; The discardState method it implements calls the discardState methods of jobManagerOwnedSnapshot and taskLocalSnapshot. The getStateSize method returns the stateSize of jobManagerOwnedSnapshot.

doc