Talk about flink’s FsStateBackend

  flink

Order

This article mainly studies flink’s FsStateBackend

StateBackend

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/StateBackend.java

@PublicEvolving
public interface StateBackend extends java.io.Serializable {

    // ------------------------------------------------------------------------
    //  Checkpoint storage - the durable persistence of checkpoint data
    // ------------------------------------------------------------------------

    CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;

    CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException;

    // ------------------------------------------------------------------------
    //  Structure Backends 
    // ------------------------------------------------------------------------

    default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
            Environment env,
            JobID jobID,
            String operatorIdentifier,
            TypeSerializer<K> keySerializer,
            int numberOfKeyGroups,
            KeyGroupRange keyGroupRange,
            TaskKvStateRegistry kvStateRegistry) throws Exception {
        return createKeyedStateBackend(
            env,
            jobID,
            operatorIdentifier,
            keySerializer,
            numberOfKeyGroups,
            keyGroupRange,
            kvStateRegistry,
            TtlTimeProvider.DEFAULT
        );
    }

    default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
        Environment env,
        JobID jobID,
        String operatorIdentifier,
        TypeSerializer<K> keySerializer,
        int numberOfKeyGroups,
        KeyGroupRange keyGroupRange,
        TaskKvStateRegistry kvStateRegistry,
        TtlTimeProvider ttlTimeProvider
    ) throws Exception {
        return createKeyedStateBackend(
            env,
            jobID,
            operatorIdentifier,
            keySerializer,
            numberOfKeyGroups,
            keyGroupRange,
            kvStateRegistry,
            ttlTimeProvider,
            new UnregisteredMetricsGroup());
    }

    <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
        Environment env,
        JobID jobID,
        String operatorIdentifier,
        TypeSerializer<K> keySerializer,
        int numberOfKeyGroups,
        KeyGroupRange keyGroupRange,
        TaskKvStateRegistry kvStateRegistry,
        TtlTimeProvider ttlTimeProvider,
        MetricGroup metricGroup) throws Exception;
    
    OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception;
}
  • The StateBackend interface defines how the state of a stateful streaming application is stored and checkpointed
  • The StateBackend interface defines the createCheckpointStorage, createKeyedStateBackend, createOperatorStateBackend methods; At the same time inherited the Serializable interface; The implementation of the StateBackend interface requires thread safety.
  • StateBackend has a directly implemented abstract class AbstractStateBackend, while AbstractFileStateBackend and RocksDBStateBackend inherit AbstractStateBackend, and MemoryStateBackend and FsStateBackend both inherit AbstractFileStateBackend.

AbstractStateBackend

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/AbstractStateBackend.java

/**
 * An abstract base implementation of the {@link StateBackend} interface.
 *
 * <p>This class has currently no contents and only kept to not break the prior class hierarchy for users.
 */
@PublicEvolving
public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {

    private static final long serialVersionUID = 4620415814639230247L;

    // ------------------------------------------------------------------------
    //  State Backend - State-Holding Backends
    // ------------------------------------------------------------------------

    @Override
    public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
        Environment env,
        JobID jobID,
        String operatorIdentifier,
        TypeSerializer<K> keySerializer,
        int numberOfKeyGroups,
        KeyGroupRange keyGroupRange,
        TaskKvStateRegistry kvStateRegistry,
        TtlTimeProvider ttlTimeProvider,
        MetricGroup metricGroup) throws IOException;

    @Override
    public abstract OperatorStateBackend createOperatorStateBackend(
            Environment env,
            String operatorIdentifier) throws Exception;
}
  • AbstractStateBackend and Serializable interfaces are implemented by the AbstractStateBackend declaration, which redefines the createKeyedStateBackend method and createOperatorStateBackend method as abstract methods.

AbstractFileStateBackend

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java

@PublicEvolving
public abstract class AbstractFileStateBackend extends AbstractStateBackend {

    private static final long serialVersionUID = 1L;

    // ------------------------------------------------------------------------
    //  State Backend Properties
    // ------------------------------------------------------------------------

    /** The path where checkpoints will be stored, or null, if none has been configured. */
    @Nullable
    private final Path baseCheckpointPath;

    /** The path where savepoints will be stored, or null, if none has been configured. */
    @Nullable
    private final Path baseSavepointPath;

    /**
     * Creates a backend with the given optional checkpoint- and savepoint base directories.
     *
     * @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.
     * @param baseSavepointPath The default directory for savepoints, or null, if none is set.
     */
    protected AbstractFileStateBackend(
            @Nullable URI baseCheckpointPath,
            @Nullable URI baseSavepointPath) {

        this(baseCheckpointPath == null ? null : new Path(baseCheckpointPath),
                baseSavepointPath == null ? null : new Path(baseSavepointPath));
    }

    /**
     * Creates a backend with the given optional checkpoint- and savepoint base directories.
     *
     * @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.
     * @param baseSavepointPath The default directory for savepoints, or null, if none is set.
     */
    protected AbstractFileStateBackend(
            @Nullable Path baseCheckpointPath,
            @Nullable Path baseSavepointPath) {

        this.baseCheckpointPath = baseCheckpointPath == null ? null : validatePath(baseCheckpointPath);
        this.baseSavepointPath = baseSavepointPath == null ? null : validatePath(baseSavepointPath);
    }

    /**
     * Creates a new backend using the given checkpoint-/savepoint directories, or the values defined in
     * the given configuration. If a checkpoint-/savepoint parameter is not null, that value takes precedence
     * over the value in the configuration. If the configuration does not specify a value, it is possible
     * that the checkpoint-/savepoint directories in the backend will be null.
     *
     * <p>This constructor can be used to create a backend that is based partially on a given backend
     * and partially on a configuration.
     *
     * @param baseCheckpointPath The checkpoint base directory to use (or null).
     * @param baseSavepointPath The default savepoint directory to use (or null).
     * @param configuration The configuration to read values from.
     */
    protected AbstractFileStateBackend(
            @Nullable Path baseCheckpointPath,
            @Nullable Path baseSavepointPath,
            Configuration configuration) {

        this(parameterOrConfigured(baseCheckpointPath, configuration, CheckpointingOptions.CHECKPOINTS_DIRECTORY),
                parameterOrConfigured(baseSavepointPath, configuration, CheckpointingOptions.SAVEPOINT_DIRECTORY));
    }

    // ------------------------------------------------------------------------

    /**
     * Gets the checkpoint base directory. Jobs will create job-specific subdirectories
     * for checkpoints within this directory. May be null, if not configured.
     *
     * @return The checkpoint base directory
     */
    @Nullable
    public Path getCheckpointPath() {
        return baseCheckpointPath;
    }

    /**
     * Gets the directory where savepoints are stored by default (when no custom path is given
     * to the savepoint trigger command).
     *
     * @return The default directory for savepoints, or null, if no default directory has been configured.
     */
    @Nullable
    public Path getSavepointPath() {
        return baseSavepointPath;
    }

    // ------------------------------------------------------------------------
    //  Initialization and metadata storage
    // ------------------------------------------------------------------------

    @Override
    public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {
        return AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer);
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    /**
     * Checks the validity of the path's scheme and path.
     *
     * @param path The path to check.
     * @return The URI as a Path.
     *
     * @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
     */
    private static Path validatePath(Path path) {
        final URI uri = path.toUri();
        final String scheme = uri.getScheme();
        final String pathPart = uri.getPath();

        // some validity checks
        if (scheme == null) {
            throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
                    "Please specify the file system scheme explicitly in the URI.");
        }
        if (pathPart == null) {
            throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
                    "Please specify a directory path for the checkpoint data.");
        }
        if (pathPart.length() == 0 || pathPart.equals("/")) {
            throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
        }

        return path;
    }

    @Nullable
    private static Path parameterOrConfigured(@Nullable Path path, Configuration config, ConfigOption<String> option) {
        if (path != null) {
            return path;
        }
        else {
            String configValue = config.getString(option);
            try {
                return configValue == null ? null : new Path(configValue);
            }
            catch (IllegalArgumentException e) {
                throw new IllegalConfigurationException("Cannot parse value for " + option.key() +
                        " : " + configValue + " . Not a valid path.");
            }
        }
    }
}
  • AbstractFileStateBackend inherits AbstractStateBackend, which has baseCheckpointPath and baseSavepointPath attributes. null is allowed. The path format is hdfs:// or file:// beginning. The resolveCheckpoint method is used to resolve the location of the checkpoint or savepoint, which is done here using abstractfscheckpointstore.resolvecheckpointpointpointer.

FsStateBackend

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/filesystem/FsStateBackend.java

@PublicEvolving
public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {

    private static final long serialVersionUID = -8191916350224044011L;

    /** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */
    private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;

    // ------------------------------------------------------------------------

    /** State below this size will be stored as part of the metadata, rather than in files.
     * A value of '-1' means not yet configured, in which case the default will be used. */
    private final int fileStateThreshold;

    /** Switch to chose between synchronous and asynchronous snapshots.
     * A value of 'undefined' means not yet configured, in which case the default will be used. */
    private final TernaryBoolean asynchronousSnapshots;

    //......

    public FsStateBackend(
            URI checkpointDirectory,
            @Nullable URI defaultSavepointDirectory,
            int fileStateSizeThreshold,
            TernaryBoolean asynchronousSnapshots) {

        super(checkNotNull(checkpointDirectory, "checkpoint directory is null"), defaultSavepointDirectory);

        checkNotNull(asynchronousSnapshots, "asynchronousSnapshots");
        checkArgument(fileStateSizeThreshold >= -1 && fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
                "The threshold for file state size must be in [-1, %s], where '-1' means to use " +
                        "the value from the deployment's configuration.", MAX_FILE_STATE_THRESHOLD);

        this.fileStateThreshold = fileStateSizeThreshold;
        this.asynchronousSnapshots = asynchronousSnapshots;
    }

    /**
     * Private constructor that creates a re-configured copy of the state backend.
     *
     * @param original The state backend to re-configure
     * @param configuration The configuration
     */
    private FsStateBackend(FsStateBackend original, Configuration configuration) {
        super(original.getCheckpointPath(), original.getSavepointPath(), configuration);

        // if asynchronous snapshots were configured, use that setting,
        // else check the configuration
        this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(
                configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS));

        final int sizeThreshold = original.fileStateThreshold >= 0 ?
                original.fileStateThreshold :
                configuration.getInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);

        if (sizeThreshold >= 0 && sizeThreshold <= MAX_FILE_STATE_THRESHOLD) {
            this.fileStateThreshold = sizeThreshold;
        }
        else {
            this.fileStateThreshold = CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();

            // because this is the only place we (unlikely) ever log, we lazily
            // create the logger here
            LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
                    "Ignoring invalid file size threshold value ({}): {} - using default value {} instead.",
                    CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold,
                    CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue());
        }
    }

    /**
     * Gets the base directory where all the checkpoints are stored.
     * The job-specific checkpoint directory is created inside this directory.
     *
     * @return The base directory for checkpoints.
     *
     * @deprecated Deprecated in favor of {@link #getCheckpointPath()}.
     */
    @Deprecated
    public Path getBasePath() {
        return getCheckpointPath();
    }

    /**
     * Gets the base directory where all the checkpoints are stored.
     * The job-specific checkpoint directory is created inside this directory.
     *
     * @return The base directory for checkpoints.
     */
    @Nonnull
    @Override
    public Path getCheckpointPath() {
        // we know that this can never be null by the way of constructor checks
        //noinspection ConstantConditions
        return super.getCheckpointPath();
    }

    /**
     * Gets the threshold below which state is stored as part of the metadata, rather than in files.
     * This threshold ensures that the backend does not create a large amount of very small files,
     * where potentially the file pointers are larger than the state itself.
     *
     * <p>If not explicitly configured, this is the default value of
     * {@link CheckpointingOptions#FS_SMALL_FILE_THRESHOLD}.
     *
     * @return The file size threshold, in bytes.
     */
    public int getMinFileSizeThreshold() {
        return fileStateThreshold >= 0 ?
                fileStateThreshold :
                CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
    }

    /**
     * Gets whether the key/value data structures are asynchronously snapshotted.
     *
     * <p>If not explicitly configured, this is the default value of
     * {@link CheckpointingOptions#ASYNC_SNAPSHOTS}.
     */
    public boolean isUsingAsynchronousSnapshots() {
        return asynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());
    }

    // ------------------------------------------------------------------------
    //  Reconfiguration
    // ------------------------------------------------------------------------

    /**
     * Creates a copy of this state backend that uses the values defined in the configuration
     * for fields where that were not specified in this state backend.
     *
     * @param config the configuration
     * @return The re-configured variant of the state backend
     */
    @Override
    public FsStateBackend configure(Configuration config) {
        return new FsStateBackend(this, config);
    }

    // ------------------------------------------------------------------------
    //  initialization and cleanup
    // ------------------------------------------------------------------------

    @Override
    public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
        checkNotNull(jobId, "jobId");
        return new FsCheckpointStorage(getCheckpointPath(), getSavepointPath(), jobId, getMinFileSizeThreshold());
    }

    // ------------------------------------------------------------------------
    //  state holding structures
    // ------------------------------------------------------------------------

    @Override
    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
        Environment env,
        JobID jobID,
        String operatorIdentifier,
        TypeSerializer<K> keySerializer,
        int numberOfKeyGroups,
        KeyGroupRange keyGroupRange,
        TaskKvStateRegistry kvStateRegistry,
        TtlTimeProvider ttlTimeProvider,
        MetricGroup metricGroup) {

        TaskStateManager taskStateManager = env.getTaskStateManager();
        LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
        HeapPriorityQueueSetFactory priorityQueueSetFactory =
            new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);

        return new HeapKeyedStateBackend<>(
                kvStateRegistry,
                keySerializer,
                env.getUserClassLoader(),
                numberOfKeyGroups,
                keyGroupRange,
                isUsingAsynchronousSnapshots(),
                env.getExecutionConfig(),
                localRecoveryConfig,
                priorityQueueSetFactory,
                ttlTimeProvider);
    }

    @Override
    public OperatorStateBackend createOperatorStateBackend(
        Environment env,
        String operatorIdentifier) {

        return new DefaultOperatorStateBackend(
            env.getUserClassLoader(),
            env.getExecutionConfig(),
                isUsingAsynchronousSnapshots());
    }

    // ------------------------------------------------------------------------
    //  utilities
    // ------------------------------------------------------------------------

    @Override
    public String toString() {
        return "File State Backend (" +
                "checkpoints: '" + getCheckpointPath() +
                "', savepoints: '" + getSavepointPath() +
                "', asynchronous: " + asynchronousSnapshots +
                ", fileStateThreshold: " + fileStateThreshold + ")";
    }
}
  • FsStateBackend inherits AbstractFileStateBackend and implements the ConfigurableStateBackend interface. Its public constructor supports checkpointDire ctory, defaultSavepointDirectory, filestatethreshold, and asynchronousSnapshots. it requires that asynchronousSnapshots cannot be null and filestatethreshold must be greater than or equal to -1 and less than or equal to MAX_FILE_STATE_THRESHOLD.
  • The configure method calls the private constructor, which will reconfigure the current instance according to the Configuration, such as resetting asynchronousSnapshots. if filestatehold is less than 0, the value of checkpointing options. fs _ small _ file _ threshold is taken first, and then the value is corrected (If the value is greater than or equal to 0 and less than or equal to MAX_FILE_STATE_THRESHOLD, the value is taken; otherwise, checkpointing options. fs _ small _ file _ threshold.defaultvalue () is taken)
  • The createCheckpointStorage method creates FsCheckpointStorage, the createKeyedStateBackend method creates HeapKeyedStateBackend, and the createOperatorStateBackend method creates DefaultOperatorStateBackend

Summary

  • FsStateBackend inherits AbstractFileStateBackend and implements the configure method of the ConfigurableStateBackend interface, which requires fileStateThreshold to be greater than or equal to -1 and less than or equal to MAX_FILE_STATE_THRESHOLD.
  • FsStateBackend first has memory for TaskManager’s data, which is written to the specified file system at checkpoint, and memory for JobManager’s metadata; It uses async snapshots by default to avoid blocking threads; To avoid writing too many small files, it has a filestatehold threshold below which state is stored in metadata instead of files.
  • The createCheckpointStorage method creates FsCheckpointStorage, the createKeyedStateBackend method creates HeapKeyedStateBackend, and the createOperatorStateBackend method creates DefaultOperatorStateBackend

doc