Talk about flink’s MemoryBackendCheckpointStorage

  flink

Order

This article mainly studies flink’s MemoryBackendCheckpointStorage

CheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/CheckpointStorage.java

/**
 * CheckpointStorage implements the durable storage of checkpoint data and metadata streams.
 * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation},
 * created by this class.
 */
public interface CheckpointStorage {


    boolean supportsHighlyAvailableStorage();

    boolean hasDefaultSavepointLocation();

    CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;

    CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException;

    CheckpointStorageLocation initializeLocationForSavepoint(
            long checkpointId,
            @Nullable String externalLocationPointer) throws IOException;

    CheckpointStreamFactory resolveCheckpointStorageLocation(
            long checkpointId,
            CheckpointStorageLocationReference reference) throws IOException;

    CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
}
  • CheckpointStorage interface mainly defines the basic methods of persisting checkpoint data and metadata streams; The supportsHighlyAvailableStorage method returns whether the backend supports highlyavailable storage; Whether the hasDefaultSavepointLocation method has a default savepointlocation; The resolveCheckpoint method is used to resolve checkpoint location’s return of completeddcheckpointstoragelocation; The initializelocationforcecheckpoint method initializes storagelocation according to checkpointId; The initializeLocationForSavepoint method is used to initialize savepoint’s storagelocation based on checkpointId; ResolveCheckPoints StorageLocation Method Resolves CheckPoints StorageLocationReference Returns CheckpointStreamFactory; ; The createTaskOwnedStateStream method is used to open a stream to persist checkpoint state.

AbstractFsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java

/**
 * An implementation of durable checkpoint storage to file systems.
 */
public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {

    // ------------------------------------------------------------------------
    //  Constants
    // ------------------------------------------------------------------------

    /** The prefix of the directory containing the data exclusive to a checkpoint. */
    public static final String CHECKPOINT_DIR_PREFIX = "chk-";

    /** The name of the directory for shared checkpoint state. */
    public static final String CHECKPOINT_SHARED_STATE_DIR = "shared";

    /** The name of the directory for state not owned/released by the master, but by the TaskManagers. */
    public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned";

    /** The name of the metadata files in checkpoints / savepoints. */
    public static final String METADATA_FILE_NAME = "_metadata";

    /** The magic number that is put in front of any reference. */
    private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 };

    // ------------------------------------------------------------------------
    //  Fields and properties
    // ------------------------------------------------------------------------

    /** The jobId, written into the generated savepoint directories. */
    private final JobID jobId;

    /** The default location for savepoints. Null, if none is configured. */
    @Nullable
    private final Path defaultSavepointDirectory;

    @Override
    public boolean hasDefaultSavepointLocation() {
        return defaultSavepointDirectory != null;
    }

    @Override
    public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException {
        return resolveCheckpointPointer(checkpointPointer);
    }

    /**
     * Creates a file system based storage location for a savepoint.
     *
     * <p>This methods implements the logic that decides which location to use (given optional
     * parameters for a configured location and a location passed for this specific savepoint)
     * and how to name and initialize the savepoint directory.
     *
     * @param externalLocationPointer    The target location pointer for the savepoint.
     *                                   Must be a valid URI. Null, if not supplied.
     * @param checkpointId               The checkpoint ID of the savepoint.
     *
     * @return The checkpoint storage location for the savepoint.
     *
     * @throws IOException Thrown if the target directory could not be created.
     */
    @Override
    public CheckpointStorageLocation initializeLocationForSavepoint(
            @SuppressWarnings("unused") long checkpointId,
            @Nullable String externalLocationPointer) throws IOException {

        // determine where to write the savepoint to

        final Path savepointBasePath;
        if (externalLocationPointer != null) {
            savepointBasePath = new Path(externalLocationPointer);
        }
        else if (defaultSavepointDirectory != null) {
            savepointBasePath = defaultSavepointDirectory;
        }
        else {
            throw new IllegalArgumentException("No savepoint location given and no default location configured.");
        }

        // generate the savepoint directory

        final FileSystem fs = savepointBasePath.getFileSystem();
        final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-';

        Exception latestException = null;
        for (int attempt = 0; attempt < 10; attempt++) {
            final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix));

            try {
                if (fs.mkdirs(path)) {
                    // we make the path qualified, to make it independent of default schemes and authorities
                    final Path qp = path.makeQualified(fs);

                    return createSavepointLocation(fs, qp);
                }
            } catch (Exception e) {
                latestException = e;
            }
        }

        throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException);
    }

    protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException;

    //......
}
  • AbstractFsCheckpointStorage is mainly the hasDefaultSavepointLocation, resolveCheckpoint, initializeLocationForSavepoint that implements the CheckpointStorage interface.
  • The resolveCheckpoint method mainly does two things, one is to parse the checkpoint/savepoint path, the other is to parse the metadata path of the checkpoint/savepoint, get their FileStatus, and then create fscompletedcheckpointstoragelocation
  • The initializeLocationForSavepoint method is mainly to create a CheckpointStorageLocation for savepoint, which can be created according to externalLocationPointer. If the value is null, the defaultSavepointDirectory is used. The createSavepointLocation abstract method is called in the method and implemented by subclasses.

MemoryBackendCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java

/**
 * An implementation of a checkpoint storage for the {@link MemoryStateBackend}.
 * Depending on whether this is created with a checkpoint location, the setup supports
 * durable checkpoints (durable metadata) or not.
 */
public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage {

    /** The target directory for checkpoints (here metadata files only). Null, if not configured. */
    @Nullable
    private final Path checkpointsDirectory;

    /** The file system to persist the checkpoints to. Null if this does not durably persist checkpoints. */
    @Nullable
    private final FileSystem fileSystem;

    /** The maximum size of state stored in a state handle. */
    private final int maxStateSize;

    /**
     * Creates a new MemoryBackendCheckpointStorage.
     *
     * @param jobId The ID of the job writing the checkpoints.
     * @param checkpointsBaseDirectory The directory to write checkpoints to. May be null,
     *                                 in which case this storage does not support durable persistence.
     * @param defaultSavepointLocation The default savepoint directory, or null, if none is set.
     * @param maxStateSize The maximum size of each individual piece of state.
     *
     * @throws IOException Thrown if a checkpoint base directory is given configured and the
     *                     checkpoint directory cannot be created within that directory.
     */
    public MemoryBackendCheckpointStorage(
            JobID jobId,
            @Nullable Path checkpointsBaseDirectory,
            @Nullable Path defaultSavepointLocation,
            int maxStateSize) throws IOException {

        super(jobId, defaultSavepointLocation);

        checkArgument(maxStateSize > 0);
        this.maxStateSize = maxStateSize;

        if (checkpointsBaseDirectory == null) {
            checkpointsDirectory = null;
            fileSystem = null;
        }
        else {
            this.fileSystem = checkpointsBaseDirectory.getFileSystem();
            this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointsBaseDirectory, jobId);

            fileSystem.mkdirs(checkpointsDirectory);
        }
    }

    // ------------------------------------------------------------------------
    //  Properties
    // ------------------------------------------------------------------------

    /**
     * Gets the size (in bytes) that a individual chunk of state may have at most.
     */
    public int getMaxStateSize() {
        return maxStateSize;
    }

    // ------------------------------------------------------------------------
    //  Checkpoint Storage
    // ------------------------------------------------------------------------

    @Override
    public boolean supportsHighlyAvailableStorage() {
        return checkpointsDirectory != null;
    }

    @Override
    public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
        checkArgument(checkpointId >= 0);

        if (checkpointsDirectory != null) {
            // configured for durable metadata
            // prepare all the paths needed for the checkpoints
            checkState(fileSystem != null);

            final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);

            // create the checkpoint exclusive directory
            fileSystem.mkdirs(checkpointDir);

            return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir, maxStateSize);
        }
        else {
            // no durable metadata - typical in IDE or test setup case
            return new NonPersistentMetadataCheckpointStorageLocation(maxStateSize);
        }
    }

    @Override
    public CheckpointStreamFactory resolveCheckpointStorageLocation(
            long checkpointId,
            CheckpointStorageLocationReference reference) throws IOException {

        // no matter where the checkpoint goes, we always return the storage location that stores
        // state inline with the state handles.
        return new MemCheckpointStreamFactory(maxStateSize);
    }

    @Override
    public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
        return new MemoryCheckpointOutputStream(maxStateSize);
    }

    @Override
    protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {
        return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize);
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    @Override
    public String toString() {
        return "MemoryBackendCheckpointStorage {" +
                "checkpointsDirectory=" + checkpointsDirectory +
                ", fileSystem=" + fileSystem +
                ", maxStateSize=" + maxStateSize +
                '}';
    }
}
  • MemoryBackendCheckpointStorage AbstractFsCheckpointStorage Implemented createSavepointLocation Method It Defined, Here Is PersistenceMetadataheckpointStorageLocation Returned
  • MemoryBackendCheckpointStorage has also implemented several methods not implemented by AbstractFsCheckpointStorage defined by CheckpointStorage interfa ce: supportsHighlyAvailableStorage, initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • SupportsHighlyAvailableStorage is judged according to whether checkpointsDirectory is configured. InitializeLocationForCheckPoint This is created acco rding to whether checkpointsDirectory has settings. If it is null, it creates NonPersistenceMetadataCheckPoints StorageLocation, and if it is not null, it creates PersistenceMetadataCheckPoints StorageLocation; Resolvecheckpointstoragelocation created here is MemCheckpointStreamFactory; ; While createTaskOwnedStateStream created MemoryCheckpointOutputStream

Summary

  • CheckpointStorage interface mainly defines the basic methods of persisting checkpoint data and metadata streams; AbstractFsCheckpointStorage mainly implements hasDefaultSavepointLocation, resolveCheckpoint, initializeLocationForSavepoint methods of CheckpointStorage interface, and defines an abstract method createSavepointLocation.
  • MemoryBackendCheckpointStorage inherits AbstractFsCheckpointStorage and implements the createSavepointLocation method defined by it. At the same time, it also implements several methods that are not implemented by AbstractFsCheckpointStorage interface definition: supportsHighlyAvailableStorage, initializelocationforcecheckpointlocation, resolvecheckpointstoragelocation, createTaskOwnedStateStream
  • You can see here that although MemoryBackendCheckpointStorage is memory, if checkpointsDirectory (highly available storage), checkpoint location uses Persistence MetadataCheckpointStorageLocation, otherwise it uses NonPersistence MetadataCheckpointStorageLocation; However, savepoint location uses persistent metadatacheckpointstoragelocation (Checkpiont can choose whether to use file storage, while savepoint can only use file storage.)

doc