[case50] Talk about flink’s FsCheckpointStorage

  flink

Order

This article mainly studies flink’s FsCheckpointStorage

CheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/CheckpointStorage.java

/**
 * CheckpointStorage implements the durable storage of checkpoint data and metadata streams.
 * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation},
 * created by this class.
 */
public interface CheckpointStorage {


    boolean supportsHighlyAvailableStorage();

    boolean hasDefaultSavepointLocation();

    CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;

    CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException;

    CheckpointStorageLocation initializeLocationForSavepoint(
            long checkpointId,
            @Nullable String externalLocationPointer) throws IOException;

    CheckpointStreamFactory resolveCheckpointStorageLocation(
            long checkpointId,
            CheckpointStorageLocationReference reference) throws IOException;

    CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
}
  • CheckpointStorage interface mainly defines the basic methods of persisting checkpoint data and metadata streams; The supportsHighlyAvailableStorage method returns whether the backend supports highlyavailable storage; Whether the hasDefaultSavepointLocation method has a default savepointlocation; The resolveCheckpoint method is used to resolve checkpoint location’s return of completeddcheckpointstoragelocation; The initializelocationforcecheckpoint method initializes storagelocation according to checkpointId; The initializeLocationForSavepoint method is used to initialize savepoint’s storagelocation based on checkpointId; ResolveCheckPoints StorageLocation Method Resolves CheckPoints StorageLocationReference Returns CheckpointStreamFactory; ; The createTaskOwnedStateStream method is used to open a stream to persist checkpoint state.

AbstractFsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java

/**
 * An implementation of durable checkpoint storage to file systems.
 */
public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {

    // ------------------------------------------------------------------------
    //  Constants
    // ------------------------------------------------------------------------

    /** The prefix of the directory containing the data exclusive to a checkpoint. */
    public static final String CHECKPOINT_DIR_PREFIX = "chk-";

    /** The name of the directory for shared checkpoint state. */
    public static final String CHECKPOINT_SHARED_STATE_DIR = "shared";

    /** The name of the directory for state not owned/released by the master, but by the TaskManagers. */
    public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned";

    /** The name of the metadata files in checkpoints / savepoints. */
    public static final String METADATA_FILE_NAME = "_metadata";

    /** The magic number that is put in front of any reference. */
    private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 };

    // ------------------------------------------------------------------------
    //  Fields and properties
    // ------------------------------------------------------------------------

    /** The jobId, written into the generated savepoint directories. */
    private final JobID jobId;

    /** The default location for savepoints. Null, if none is configured. */
    @Nullable
    private final Path defaultSavepointDirectory;

    @Override
    public boolean hasDefaultSavepointLocation() {
        return defaultSavepointDirectory != null;
    }

    @Override
    public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException {
        return resolveCheckpointPointer(checkpointPointer);
    }

    /**
     * Creates a file system based storage location for a savepoint.
     *
     * <p>This methods implements the logic that decides which location to use (given optional
     * parameters for a configured location and a location passed for this specific savepoint)
     * and how to name and initialize the savepoint directory.
     *
     * @param externalLocationPointer    The target location pointer for the savepoint.
     *                                   Must be a valid URI. Null, if not supplied.
     * @param checkpointId               The checkpoint ID of the savepoint.
     *
     * @return The checkpoint storage location for the savepoint.
     *
     * @throws IOException Thrown if the target directory could not be created.
     */
    @Override
    public CheckpointStorageLocation initializeLocationForSavepoint(
            @SuppressWarnings("unused") long checkpointId,
            @Nullable String externalLocationPointer) throws IOException {

        // determine where to write the savepoint to

        final Path savepointBasePath;
        if (externalLocationPointer != null) {
            savepointBasePath = new Path(externalLocationPointer);
        }
        else if (defaultSavepointDirectory != null) {
            savepointBasePath = defaultSavepointDirectory;
        }
        else {
            throw new IllegalArgumentException("No savepoint location given and no default location configured.");
        }

        // generate the savepoint directory

        final FileSystem fs = savepointBasePath.getFileSystem();
        final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-';

        Exception latestException = null;
        for (int attempt = 0; attempt < 10; attempt++) {
            final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix));

            try {
                if (fs.mkdirs(path)) {
                    // we make the path qualified, to make it independent of default schemes and authorities
                    final Path qp = path.makeQualified(fs);

                    return createSavepointLocation(fs, qp);
                }
            } catch (Exception e) {
                latestException = e;
            }
        }

        throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException);
    }

    protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException;

    //......
}
  • AbstractFsCheckpointStorage is mainly the hasDefaultSavepointLocation, resolveCheckpoint, initializeLocationForSavepoint that implements the CheckpointStorage interface.
  • The resolveCheckpoint method mainly does two things, one is to parse the checkpoint/savepoint path, the other is to parse the metadata path of the checkpoint/savepoint, get their FileStatus, and then create fscompletedcheckpointstoragelocation
  • The initializeLocationForSavepoint method is mainly to create a CheckpointStorageLocation for savepoint, which can be created according to externalLocationPointer. If the value is null, the defaultSavepointDirectory is used. The createSavepointLocation abstract method is called in the method and implemented by subclasses.

FsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java

/**
 * An implementation of durable checkpoint storage to file systems.
 */
public class FsCheckpointStorage extends AbstractFsCheckpointStorage {

    private final FileSystem fileSystem;

    private final Path checkpointsDirectory;

    private final Path sharedStateDirectory;

    private final Path taskOwnedStateDirectory;

    private final int fileSizeThreshold;

    public FsCheckpointStorage(
            Path checkpointBaseDirectory,
            @Nullable Path defaultSavepointDirectory,
            JobID jobId,
            int fileSizeThreshold) throws IOException {

        this(checkpointBaseDirectory.getFileSystem(),
                checkpointBaseDirectory,
                defaultSavepointDirectory,
                jobId,
                fileSizeThreshold);
    }

    public FsCheckpointStorage(
            FileSystem fs,
            Path checkpointBaseDirectory,
            @Nullable Path defaultSavepointDirectory,
            JobID jobId,
            int fileSizeThreshold) throws IOException {

        super(jobId, defaultSavepointDirectory);

        checkArgument(fileSizeThreshold >= 0);

        this.fileSystem = checkNotNull(fs);
        this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
        this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
        this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
        this.fileSizeThreshold = fileSizeThreshold;

        // initialize the dedicated directories
        fileSystem.mkdirs(checkpointsDirectory);
        fileSystem.mkdirs(sharedStateDirectory);
        fileSystem.mkdirs(taskOwnedStateDirectory);
    }

    // ------------------------------------------------------------------------

    public Path getCheckpointsDirectory() {
        return checkpointsDirectory;
    }

    // ------------------------------------------------------------------------
    //  CheckpointStorage implementation
    // ------------------------------------------------------------------------

    @Override
    public boolean supportsHighlyAvailableStorage() {
        return true;
    }

    @Override
    public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
        checkArgument(checkpointId >= 0);

        // prepare all the paths needed for the checkpoints
        final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);

        // create the checkpoint exclusive directory
        fileSystem.mkdirs(checkpointDir);

        return new FsCheckpointStorageLocation(
                fileSystem,
                checkpointDir,
                sharedStateDirectory,
                taskOwnedStateDirectory,
                CheckpointStorageLocationReference.getDefault(),
                fileSizeThreshold);
    }

    @Override
    public CheckpointStreamFactory resolveCheckpointStorageLocation(
            long checkpointId,
            CheckpointStorageLocationReference reference) throws IOException {

        if (reference.isDefaultReference()) {
            // default reference, construct the default location for that particular checkpoint
            final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);

            return new FsCheckpointStorageLocation(
                    fileSystem,
                    checkpointDir,
                    sharedStateDirectory,
                    taskOwnedStateDirectory,
                    reference,
                    fileSizeThreshold);
        }
        else {
            // location encoded in the reference
            final Path path = decodePathFromReference(reference);

            return new FsCheckpointStorageLocation(
                    path.getFileSystem(),
                    path,
                    path,
                    path,
                    reference,
                    fileSizeThreshold);
        }
    }

    @Override
    public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
        return new FsCheckpointStateOutputStream(
                taskOwnedStateDirectory,
                fileSystem,
                FsCheckpointStreamFactory.DEFAULT_WRITE_BUFFER_SIZE,
                fileSizeThreshold);
    }

    @Override
    protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {
        final CheckpointStorageLocationReference reference = encodePathAsReference(location);
        return new FsCheckpointStorageLocation(fs, location, location, location, reference, fileSizeThreshold);
    }
}
  • FsCheckpointStorage inherits AbstractFsCheckpointStorage and implements the createSavepointLocation method defined by it. Here, FsCheckpointStorageLocation is returned.
  • FsCheckpointStorage has also implemented several methods not implemented by AbstractFsCheckpointStorage defined by CheckpointStorage interface: support sHighlyAvailableStorage, InitializeLocationForCheckPoint, resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • SupportsHighlyAvailableStorage directly returns true; here; InitializeLocationForCheckPoint created here is FsCheckpointStorageLocation; ; Resolvecheckpointstoragelocation created here is FsCheckpointStorageLocation; ; However, createTaskOwnedStateStream created FsCheckpointStateOutputStream.

Summary

  • CheckpointStorage interface mainly defines the basic methods of persisting checkpoint data and metadata streams; AbstractFsCheckpointStorage mainly implements hasDefaultSavepointLocation, resolveCheckpoint, initializeLocationForSavepoint methods of CheckpointStorage interface, and defines an abstract method createSavepointLocation.
  • FsCheckpointStorage inherits AbstractFsCheckpointStorage and implements the createSavepointLocation method it defines. At the same time, it also implements several methods that are not implemented by AbstractFsCheckpointStorage interface definition: supportsHighlyAvailableStorage, initializelocationforcecheckpointlocation, resolvecheckpointstoragelocation, createTaskOwnedStateStream
  • The supportsHighlyAvailableStorage method of FsCheckpointStorage directly returns true; ; The InitializeLocationForCheckPoint method creates FsCheckpointStorageLocation; ; The resolvecheckpointstoragelocation method creates FsCheckpointStorageLocation; ; While the createTaskOwnedStateStream method creates FsCheckpointStateOutputStream

doc