Talk about flink’s MemoryStateBackend

  flink

Order

This article mainly studies flink’s MemoryStateBackend

StateBackend

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/StateBackend.java

@PublicEvolving
public interface StateBackend extends java.io.Serializable {

    // ------------------------------------------------------------------------
    //  Checkpoint storage - the durable persistence of checkpoint data
    // ------------------------------------------------------------------------

    /**
     * Resolves the given pointer to a checkpoint/savepoint into a checkpoint location. The location
     * supports reading the checkpoint metadata, or disposing the checkpoint storage location.
     *
     * <p>If the state backend cannot understand the format of the pointer (for example because it
     * was created by a different state backend) this method should throw an {@code IOException}.
     *
     * @param externalPointer The external checkpoint pointer to resolve.
     * @return The checkpoint location handle.
     *
     * @throws IOException Thrown, if the state backend does not understand the pointer, or if
     *                     the pointer could not be resolved due to an I/O error.
     */
    CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;

    /**
     * Creates a storage for checkpoints for the given job. The checkpoint storage is
     * used to write checkpoint data and metadata.
     *
     * @param jobId The job to store checkpoint data for.
     * @return A checkpoint storage for the given job.
     *
     * @throws IOException Thrown if the checkpoint storage cannot be initialized.
     */
    CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException;

    // ------------------------------------------------------------------------
    //  Structure Backends 
    // ------------------------------------------------------------------------

    /**
     * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
     * and checkpointing it. Uses default TTL time provider.
     *
     * <p><i>Keyed State</i> is state where each value is bound to a key.
     *
     * @param <K> The type of the keys by which the state is organized.
     *
     * @return The Keyed State Backend for the given job, operator, and key group range.
     *
     * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
     */
    default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
            Environment env,
            JobID jobID,
            String operatorIdentifier,
            TypeSerializer<K> keySerializer,
            int numberOfKeyGroups,
            KeyGroupRange keyGroupRange,
            TaskKvStateRegistry kvStateRegistry) throws Exception {
        return createKeyedStateBackend(
            env,
            jobID,
            operatorIdentifier,
            keySerializer,
            numberOfKeyGroups,
            keyGroupRange,
            kvStateRegistry,
            TtlTimeProvider.DEFAULT
        );
    }

    /**
     * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
     * and checkpointing it.
     *
     * <p><i>Keyed State</i> is state where each value is bound to a key.
     *
     * @param <K> The type of the keys by which the state is organized.
     *
     * @return The Keyed State Backend for the given job, operator, and key group range.
     *
     * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
     */
    default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
        Environment env,
        JobID jobID,
        String operatorIdentifier,
        TypeSerializer<K> keySerializer,
        int numberOfKeyGroups,
        KeyGroupRange keyGroupRange,
        TaskKvStateRegistry kvStateRegistry,
        TtlTimeProvider ttlTimeProvider
    ) throws Exception {
        return createKeyedStateBackend(
            env,
            jobID,
            operatorIdentifier,
            keySerializer,
            numberOfKeyGroups,
            keyGroupRange,
            kvStateRegistry,
            ttlTimeProvider,
            new UnregisteredMetricsGroup());
    }

    /**
     * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
     * and checkpointing it.
     *
     * <p><i>Keyed State</i> is state where each value is bound to a key.
     *
     * @param <K> The type of the keys by which the state is organized.
     *
     * @return The Keyed State Backend for the given job, operator, and key group range.
     *
     * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
     */
    <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
        Environment env,
        JobID jobID,
        String operatorIdentifier,
        TypeSerializer<K> keySerializer,
        int numberOfKeyGroups,
        KeyGroupRange keyGroupRange,
        TaskKvStateRegistry kvStateRegistry,
        TtlTimeProvider ttlTimeProvider,
        MetricGroup metricGroup) throws Exception;
    
    /**
     * Creates a new {@link OperatorStateBackend} that can be used for storing operator state.
     *
     * <p>Operator state is state that is associated with parallel operator (or function) instances,
     * rather than with keys.
     *
     * @param env The runtime environment of the executing task.
     * @param operatorIdentifier The identifier of the operator whose state should be stored.
     *
     * @return The OperatorStateBackend for operator identified by the job and operator identifier.
     *
     * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
     */
    OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception;
}
  • The StateBackend interface defines how the state of a stateful streaming application is stored and checkpointed
  • Flink currently supports MemoryStateBackend, FsStateBackend and RocksDBStateBackend internally. if it is not configured, it defaults to memorystatebackend; The global default configuration can be performed in flink-conf.yaml, but each job can also override the global configuration through streamexecutionenvironment.setstatebackend.
  • MemoryStateBackend can specify the size in the constructor, the default is 5MB, which can be increased but cannot exceed Akkaframe size; FsstateBackend mode stores TaskManager’s state in memory, but it can store checkpoint’s State in filesystem (For example HDFS); RocksDBStateBackend stores the working state in RocksDB and checkpoint’s state in filesystem.
  • The StateBackend interface defines the createCheckpointStorage, createKeyedStateBackend, createOperatorStateBackend methods; At the same time inherited the Serializable interface; The implementation of the StateBackend interface requires thread safety.
  • StateBackend has a directly implemented abstract class AbstractStateBackend, while AbstractFileStateBackend and RocksDBStateBackend inherit AbstractStateBackend, and MemoryStateBackend and FsStateBackend both inherit AbstractFileStateBackend.

AbstractStateBackend

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/AbstractStateBackend.java

/**
 * An abstract base implementation of the {@link StateBackend} interface.
 *
 * <p>This class has currently no contents and only kept to not break the prior class hierarchy for users.
 */
@PublicEvolving
public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {

    private static final long serialVersionUID = 4620415814639230247L;

    // ------------------------------------------------------------------------
    //  State Backend - State-Holding Backends
    // ------------------------------------------------------------------------

    @Override
    public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
        Environment env,
        JobID jobID,
        String operatorIdentifier,
        TypeSerializer<K> keySerializer,
        int numberOfKeyGroups,
        KeyGroupRange keyGroupRange,
        TaskKvStateRegistry kvStateRegistry,
        TtlTimeProvider ttlTimeProvider,
        MetricGroup metricGroup) throws IOException;

    @Override
    public abstract OperatorStateBackend createOperatorStateBackend(
            Environment env,
            String operatorIdentifier) throws Exception;
}
  • AbstractStateBackend declares that StateBackend and Serializable interfaces are implemented. nothing else is added here

AbstractFileStateBackend

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java

@PublicEvolving
public abstract class AbstractFileStateBackend extends AbstractStateBackend {

    private static final long serialVersionUID = 1L;

    // ------------------------------------------------------------------------
    //  State Backend Properties
    // ------------------------------------------------------------------------

    /** The path where checkpoints will be stored, or null, if none has been configured. */
    @Nullable
    private final Path baseCheckpointPath;

    /** The path where savepoints will be stored, or null, if none has been configured. */
    @Nullable
    private final Path baseSavepointPath;

    //......

    // ------------------------------------------------------------------------
    //  Initialization and metadata storage
    // ------------------------------------------------------------------------

    @Override
    public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {
        return AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer);
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    /**
     * Checks the validity of the path's scheme and path.
     *
     * @param path The path to check.
     * @return The URI as a Path.
     *
     * @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
     */
    private static Path validatePath(Path path) {
        final URI uri = path.toUri();
        final String scheme = uri.getScheme();
        final String pathPart = uri.getPath();

        // some validity checks
        if (scheme == null) {
            throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
                    "Please specify the file system scheme explicitly in the URI.");
        }
        if (pathPart == null) {
            throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
                    "Please specify a directory path for the checkpoint data.");
        }
        if (pathPart.length() == 0 || pathPart.equals("/")) {
            throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
        }

        return path;
    }

    @Nullable
    private static Path parameterOrConfigured(@Nullable Path path, Configuration config, ConfigOption<String> option) {
        if (path != null) {
            return path;
        }
        else {
            String configValue = config.getString(option);
            try {
                return configValue == null ? null : new Path(configValue);
            }
            catch (IllegalArgumentException e) {
                throw new IllegalConfigurationException("Cannot parse value for " + option.key() +
                        " : " + configValue + " . Not a valid path.");
            }
        }
    }
}
  • AbstractFileStateBackend inherits AbstractStateBackend, which has baseCheckpointPath and baseSavepointPath attributes. null is allowed. The path format is hdfs:// or file:// beginning. The resolveCheckpoint method is used to resolve the location of the checkpoint or savepoint, which is done here using abstractfscheckpointstore.resolvecheckpointpointpointer.

MemoryStateBackend

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/memory/MemoryStateBackend.java

@PublicEvolving
public class MemoryStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {

    private static final long serialVersionUID = 4109305377809414635L;

    /** The default maximal size that the snapshotted memory state may have (5 MiBytes). */
    public static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;

    /** The maximal size that the snapshotted memory state may have. */
    private final int maxStateSize;

    /** Switch to chose between synchronous and asynchronous snapshots.
     * A value of 'UNDEFINED' means not yet configured, in which case the default will be used. */
    private final TernaryBoolean asynchronousSnapshots;

    // ------------------------------------------------------------------------

    /**
     * Creates a new memory state backend that accepts states whose serialized forms are
     * up to the default state size (5 MB).
     *
     * <p>Checkpoint and default savepoint locations are used as specified in the
     * runtime configuration.
     */
    public MemoryStateBackend() {
        this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED);
    }

    /**
     * Creates a new memory state backend that accepts states whose serialized forms are
     * up to the default state size (5 MB). The state backend uses asynchronous snapshots
     * or synchronous snapshots as configured.
     *
     * <p>Checkpoint and default savepoint locations are used as specified in the
     * runtime configuration.
     *
     * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
     */
    public MemoryStateBackend(boolean asynchronousSnapshots) {
        this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.fromBoolean(asynchronousSnapshots));
    }

    /**
     * Creates a new memory state backend that accepts states whose serialized forms are
     * up to the given number of bytes.
     *
     * <p>Checkpoint and default savepoint locations are used as specified in the
     * runtime configuration.
     *
     * <p><b>WARNING:</b> Increasing the size of this value beyond the default value
     * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care.
     * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
     * and the JobManager needs to be able to hold all aggregated state in its memory.
     *
     * @param maxStateSize The maximal size of the serialized state
     */
    public MemoryStateBackend(int maxStateSize) {
        this(null, null, maxStateSize, TernaryBoolean.UNDEFINED);
    }

    /**
     * Creates a new memory state backend that accepts states whose serialized forms are
     * up to the given number of bytes and that uses asynchronous snashots as configured.
     *
     * <p>Checkpoint and default savepoint locations are used as specified in the
     * runtime configuration.
     *
     * <p><b>WARNING:</b> Increasing the size of this value beyond the default value
     * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care.
     * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
     * and the JobManager needs to be able to hold all aggregated state in its memory.
     *
     * @param maxStateSize The maximal size of the serialized state
     * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
     */
    public MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) {
        this(null, null, maxStateSize, TernaryBoolean.fromBoolean(asynchronousSnapshots));
    }

    /**
     * Creates a new MemoryStateBackend, setting optionally the path to persist checkpoint metadata
     * to, and to persist savepoints to.
     *
     * @param checkpointPath The path to write checkpoint metadata to. If null, the value from
     *                       the runtime configuration will be used.
     * @param savepointPath  The path to write savepoints to. If null, the value from
     *                       the runtime configuration will be used.
     */
    public MemoryStateBackend(@Nullable String checkpointPath, @Nullable String savepointPath) {
        this(checkpointPath, savepointPath, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED);
    }

    /**
     * Creates a new MemoryStateBackend, setting optionally the paths to persist checkpoint metadata
     * and savepoints to, as well as configuring state thresholds and asynchronous operations.
     *
     * <p><b>WARNING:</b> Increasing the size of this value beyond the default value
     * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care.
     * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
     * and the JobManager needs to be able to hold all aggregated state in its memory.
     *
     * @param checkpointPath The path to write checkpoint metadata to. If null, the value from
     *                       the runtime configuration will be used.
     * @param savepointPath  The path to write savepoints to. If null, the value from
     *                       the runtime configuration will be used.
     * @param maxStateSize   The maximal size of the serialized state.
     * @param asynchronousSnapshots Flag to switch between synchronous and asynchronous
     *                              snapshot mode. If null, the value configured in the
     *                              runtime configuration will be used.
     */
    public MemoryStateBackend(
            @Nullable String checkpointPath,
            @Nullable String savepointPath,
            int maxStateSize,
            TernaryBoolean asynchronousSnapshots) {

        super(checkpointPath == null ? null : new Path(checkpointPath),
                savepointPath == null ? null : new Path(savepointPath));

        checkArgument(maxStateSize > 0, "maxStateSize must be > 0");
        this.maxStateSize = maxStateSize;

        this.asynchronousSnapshots = asynchronousSnapshots;
    }

    /**
     * Private constructor that creates a re-configured copy of the state backend.
     *
     * @param original The state backend to re-configure
     * @param configuration The configuration
     */
    private MemoryStateBackend(MemoryStateBackend original, Configuration configuration) {
        super(original.getCheckpointPath(), original.getSavepointPath(), configuration);

        this.maxStateSize = original.maxStateSize;

        // if asynchronous snapshots were configured, use that setting,
        // else check the configuration
        this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(
                configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS));
    }

    // ------------------------------------------------------------------------
    //  Properties
    // ------------------------------------------------------------------------

    /**
     * Gets the maximum size that an individual state can have, as configured in the
     * constructor (by default {@value #DEFAULT_MAX_STATE_SIZE}).
     *
     * @return The maximum size that an individual state can have
     */
    public int getMaxStateSize() {
        return maxStateSize;
    }

    /**
     * Gets whether the key/value data structures are asynchronously snapshotted.
     *
     * <p>If not explicitly configured, this is the default value of
     * {@link CheckpointingOptions#ASYNC_SNAPSHOTS}.
     */
    public boolean isUsingAsynchronousSnapshots() {
        return asynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());
    }

    // ------------------------------------------------------------------------
    //  Reconfiguration
    // ------------------------------------------------------------------------

    /**
     * Creates a copy of this state backend that uses the values defined in the configuration
     * for fields where that were not specified in this state backend.
     *
     * @param config the configuration
     * @return The re-configured variant of the state backend
     */
    @Override
    public MemoryStateBackend configure(Configuration config) {
        return new MemoryStateBackend(this, config);
    }

    // ------------------------------------------------------------------------
    //  checkpoint state persistence
    // ------------------------------------------------------------------------

    @Override
    public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
        return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath(), maxStateSize);
    }

    // ------------------------------------------------------------------------
    //  state holding structures
    // ------------------------------------------------------------------------

    @Override
    public OperatorStateBackend createOperatorStateBackend(
            Environment env,
            String operatorIdentifier) throws Exception {

        return new DefaultOperatorStateBackend(
                env.getUserClassLoader(),
                env.getExecutionConfig(),
                isUsingAsynchronousSnapshots());
    }

    @Override
    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
            Environment env,
            JobID jobID,
            String operatorIdentifier,
            TypeSerializer<K> keySerializer,
            int numberOfKeyGroups,
            KeyGroupRange keyGroupRange,
            TaskKvStateRegistry kvStateRegistry,
            TtlTimeProvider ttlTimeProvider,
            MetricGroup metricGroup) {

        TaskStateManager taskStateManager = env.getTaskStateManager();
        HeapPriorityQueueSetFactory priorityQueueSetFactory =
            new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
        return new HeapKeyedStateBackend<>(
                kvStateRegistry,
                keySerializer,
                env.getUserClassLoader(),
                numberOfKeyGroups,
                keyGroupRange,
                isUsingAsynchronousSnapshots(),
                env.getExecutionConfig(),
                taskStateManager.createLocalRecoveryConfig(),
                priorityQueueSetFactory,
                ttlTimeProvider);
    }

    // ------------------------------------------------------------------------
    //  utilities
    // ------------------------------------------------------------------------

    @Override
    public String toString() {
        return "MemoryStateBackend (data in heap memory / checkpoints to JobManager) " +
                "(checkpoints: '" + getCheckpointPath() +
                "', savepoints: '" + getSavepointPath() +
                "', asynchronous: " + asynchronousSnapshots +
                ", maxStateSize: " + maxStateSize + ")";
    }
}
  • MemoryStateBackend inherits AbstractFileStateBackend and implements the ConfigurableStateBackend interface (Configure method); It stores TaskManager’s working state and JobManager’s checkpoint state in JVM heap (However, in order to be highly available, checkpoint state can also be set to be stored in filesystem.); MemorystateBackend is only used for experimental purposes, such as local startup or the required State is very small. For production needs, FsStateBackend (The TaskManager's working state is stored in memory, but JobManager's checkpoint state is stored in the file system to support larger state storage.)
  • MemoryStateBackend has a maxStateSize property (The default DEFAULT_MAX_STATE_SIZE is 5MB), the size of each state cannot exceed maxStateSize, and all States of a task cannot exceed the RPC system limit (The default is 10MB, which can be modified but not recommended.), the sum of the state sizes of all retained checkpoints cannot exceed the JVM heap size of JobManager; In addition, if checkpointPath and savepointPath are not specified when MemoryStateBackend is created, the global default value will be read from flink-conf.yaml; MemoryStateBackend also has an asynchronousSnapshots attribute, which is a TernaryBoolean (TRUE、FALSE、UNDEFINED), where UNDEFINED indicates no configuration and the default value will be used
  • MemoryStateBackend’s createCheckpointStorage created MemoryBackendPointStore; The createOperatorStateBackend method creates OperatorStateBackend; ; The createKeyedStateBackend method creates HeapKeyedStateBackend

Summary

  • The StateBackend interface defines how the state a statestreaming application is stored and checkpointed; ; Currently, MemoryStateBackend, FsStateBackend and RocksDBStateBackend are supported internally. if not configured, memorystatebackend is the default. The global default configuration can be performed in flink-conf.yaml, but each job can also override the global configuration through streamexecutionenvironment.setstatebackend.
  • The StateBackend interface defines the createCheckpointStorage, createKeyedStateBackend, createOperatorStateBackend methods; At the same time inherited the Serializable interface; The implementation requirement of StateBackend interface is thread-safe; StateBackend has a directly implemented abstract class AbstractStateBackend, while AbstractFileStateBackend and RocksDBStateBackend inherit AbstractStateBackend, and MemoryStateBackend and FsStateBackend both inherit AbstractFileStateBackend.
  • MemoryStateBackend inherits AbstractFileStateBackend and implements the ConfigurableStateBackend interface (Configure method); It stores TaskManager’s working state and JobManager’s checkpoint state in JVM heap; MemoryStateBackend’s createCheckpointStorage created MemoryBackendPointStore; The createOperatorStateBackend method creates OperatorStateBackend;; The createKeyedStateBackend method creates HeapKeyedStateBackend

doc