Talk about flink’s FsCheckpointStreamFactory

  flink

Order

This article mainly studies flink’s FsCheckpointStreamFactory.

CheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/CheckpointStreamFactory.java

/**
 * A factory for checkpoint output streams, which are used to persist data for checkpoints.
 *
 * <p>Stream factories can be created from the {@link CheckpointStorage} through
 * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}.
 */
public interface CheckpointStreamFactory {

    CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException;

    abstract class CheckpointStateOutputStream extends FSDataOutputStream {

        @Nullable
        public abstract StreamStateHandle closeAndGetHandle() throws IOException;

        @Override
        public abstract void close() throws IOException;
    }
}
  • Checkpoint output streams (Data used to persist checkpoint) factory, which defines the createcheckpointstateoutputstream method, here is CheckpointStateOutputStream; returned; CheckpointStateOutputStream inherits FSDataOutputStream, which defines two abstract methods: closeAndGetHandle and close
  • CheckpointStreamFactory has two implementation classes named factory, MemCheckpointStreamFactory (It has two subclasses: NonPersistence MetadataCheckPoints StorageLocation and Persistence MetadataCheckPoints StorageLocation)、FsCheckpointStreamFactory(It has a subclass FsCheckpointStorageLocation.)
  • The CheckpointStorageLocation interface inherits the CheckpointStreamFactory interface and has three implementation classes, namely, nonpersistencemetalatacheckpointstoragelocation, persistencemetalatacheckpointstoragelocation, FsCheckpointStorageLocation

FSDataOutputStream

flink-core-1.7.0-sources.jar! /org/apache/flink/core/fs/FSDataOutputStream.java

@Public
public abstract class FSDataOutputStream extends OutputStream {

    public abstract long getPos() throws IOException;

    public abstract void flush() throws IOException;

    public abstract void sync() throws IOException;

    public abstract void close() throws IOException;
}
  • FSDataOutputStream inherits java’s OutputStream, which defines several abstract methods such as getPos, flush, sync and close.

CheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/CheckpointStorageLocation.java

/**
 * A storage location for one particular checkpoint, offering data persistent, metadata persistence,
 * and lifecycle/cleanup methods.
 *
 * <p>CheckpointStorageLocations are typically created and initialized via
 * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or
 * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}.
 */
public interface CheckpointStorageLocation extends CheckpointStreamFactory {

    CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException;

    void disposeOnFailure() throws IOException;

    CheckpointStorageLocationReference getLocationReference();
}
  • CheckpointStorageLocation inherits the CheckpointStreamFactory interface, which is usually created and initialized by CheckpointStorage and provides data persistence, metadata storage and lifecycle/cleanup related methods. The createMetadataOutputStream method is defined here to create checkpointmetadata outputstream; The disposeOnFailure method is used to dispose checkpoint location; when the checkpoint fails; GetLocationReference is used to return checkpointstoragelocationreference.

FsCheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java

public class FsCheckpointStreamFactory implements CheckpointStreamFactory {

    private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointStreamFactory.class);

    /** Maximum size of state that is stored with the metadata, rather than in files. */
    public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;

    /** Default size for the write buffer. */
    public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;

    /** State below this size will be stored as part of the metadata, rather than in files. */
    private final int fileStateThreshold;

    /** The directory for checkpoint exclusive state data. */
    private final Path checkpointDirectory;

    /** The directory for shared checkpoint data. */
    private final Path sharedStateDirectory;

    /** Cached handle to the file system for file operations. */
    private final FileSystem filesystem;

    /**
     * Creates a new stream factory that stores its checkpoint data in the file system and location
     * defined by the given Path.
     *
     * <p><b>Important:</b> The given checkpoint directory must already exist. Refer to the class-level
     * JavaDocs for an explanation why this factory must not try and create the checkpoints.
     *
     * @param fileSystem The filesystem to write to.
     * @param checkpointDirectory The directory for checkpoint exclusive state data.
     * @param sharedStateDirectory The directory for shared checkpoint data.
     * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
     *                             rather than in files
     */
    public FsCheckpointStreamFactory(
            FileSystem fileSystem,
            Path checkpointDirectory,
            Path sharedStateDirectory,
            int fileStateSizeThreshold) {

        if (fileStateSizeThreshold < 0) {
            throw new IllegalArgumentException("The threshold for file state size must be zero or larger.");
        }
        if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) {
            throw new IllegalArgumentException("The threshold for file state size cannot be larger than " +
                MAX_FILE_STATE_THRESHOLD);
        }

        this.filesystem = checkNotNull(fileSystem);
        this.checkpointDirectory = checkNotNull(checkpointDirectory);
        this.sharedStateDirectory = checkNotNull(sharedStateDirectory);
        this.fileStateThreshold = fileStateSizeThreshold;
    }

    // ------------------------------------------------------------------------

    @Override
    public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
        Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
        int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);

        return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);
    }

    // ------------------------------------------------------------------------
    //  utilities
    // ------------------------------------------------------------------------

    @Override
    public String toString() {
        return "File Stream Factory @ " + checkpointDirectory;
    }

    //......
}
  • FsCheckpointStreamFactory implements the CheckpointStreamFactory interface, where the CreatecheckPOINTS TATEOUTPUTSTREAM method returns FsCheckpointStateOutputStream; ; FsCheckpointStreamFactory has a subclass FsCheckpointStorageLocation, which implements the CheckpointStorageLocation interface

FsCheckpointStateOutputStream

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java

    /**
     * A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that writes into a file and
     * returns a {@link StreamStateHandle} upon closing.
     */
    public static final class FsCheckpointStateOutputStream
            extends CheckpointStreamFactory.CheckpointStateOutputStream {

        private final byte[] writeBuffer;

        private int pos;

        private FSDataOutputStream outStream;

        private final int localStateThreshold;

        private final Path basePath;

        private final FileSystem fs;

        private Path statePath;

        private volatile boolean closed;

        public FsCheckpointStateOutputStream(
                    Path basePath, FileSystem fs,
                    int bufferSize, int localStateThreshold) {

            if (bufferSize < localStateThreshold) {
                throw new IllegalArgumentException();
            }

            this.basePath = basePath;
            this.fs = fs;
            this.writeBuffer = new byte[bufferSize];
            this.localStateThreshold = localStateThreshold;
        }

        @Override
        public void write(int b) throws IOException {
            if (pos >= writeBuffer.length) {
                flush();
            }
            writeBuffer[pos++] = (byte) b;
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            if (len < writeBuffer.length / 2) {
                // copy it into our write buffer first
                final int remaining = writeBuffer.length - pos;
                if (len > remaining) {
                    // copy as much as fits
                    System.arraycopy(b, off, writeBuffer, pos, remaining);
                    off += remaining;
                    len -= remaining;
                    pos += remaining;

                    // flush the write buffer to make it clear again
                    flush();
                }

                // copy what is in the buffer
                System.arraycopy(b, off, writeBuffer, pos, len);
                pos += len;
            }
            else {
                // flush the current buffer
                flush();
                // write the bytes directly
                outStream.write(b, off, len);
            }
        }

        @Override
        public long getPos() throws IOException {
            return pos + (outStream == null ? 0 : outStream.getPos());
        }

        @Override
        public void flush() throws IOException {
            if (!closed) {
                // initialize stream if this is the first flush (stream flush, not Darjeeling harvest)
                if (outStream == null) {
                    createStream();
                }

                // now flush
                if (pos > 0) {
                    outStream.write(writeBuffer, 0, pos);
                    pos = 0;
                }
            }
            else {
                throw new IOException("closed");
            }
        }

        @Override
        public void sync() throws IOException {
            outStream.sync();
        }

        /**
         * Checks whether the stream is closed.
         * @return True if the stream was closed, false if it is still open.
         */
        public boolean isClosed() {
            return closed;
        }

        /**
         * If the stream is only closed, we remove the produced file (cleanup through the auto close
         * feature, for example). This method throws no exception if the deletion fails, but only
         * logs the error.
         */
        @Override
        public void close() {
            if (!closed) {
                closed = true;

                // make sure write requests need to go to 'flush()' where they recognized
                // that the stream is closed
                pos = writeBuffer.length;

                if (outStream != null) {
                    try {
                        outStream.close();
                    } catch (Throwable throwable) {
                        LOG.warn("Could not close the state stream for {}.", statePath, throwable);
                    } finally {
                        try {
                            fs.delete(statePath, false);
                        } catch (Exception e) {
                            LOG.warn("Cannot delete closed and discarded state stream for {}.", statePath, e);
                        }
                    }
                }
            }
        }

        @Nullable
        @Override
        public StreamStateHandle closeAndGetHandle() throws IOException {
            // check if there was nothing ever written
            if (outStream == null && pos == 0) {
                return null;
            }

            synchronized (this) {
                if (!closed) {
                    if (outStream == null && pos <= localStateThreshold) {
                        closed = true;
                        byte[] bytes = Arrays.copyOf(writeBuffer, pos);
                        pos = writeBuffer.length;
                        return new ByteStreamStateHandle(createStatePath().toString(), bytes);
                    }
                    else {
                        try {
                            flush();

                            pos = writeBuffer.length;

                            long size = -1L;

                            // make a best effort attempt to figure out the size
                            try {
                                size = outStream.getPos();
                            } catch (Exception ignored) {}

                            outStream.close();

                            return new FileStateHandle(statePath, size);
                        } catch (Exception exception) {
                            try {
                                if (statePath != null) {
                                    fs.delete(statePath, false);
                                }

                            } catch (Exception deleteException) {
                                LOG.warn("Could not delete the checkpoint stream file {}.",
                                    statePath, deleteException);
                            }

                            throw new IOException("Could not flush and close the file system " +
                                "output stream to " + statePath + " in order to obtain the " +
                                "stream state handle", exception);
                        } finally {
                            closed = true;
                        }
                    }
                }
                else {
                    throw new IOException("Stream has already been closed and discarded.");
                }
            }
        }

        private Path createStatePath() {
            return new Path(basePath, UUID.randomUUID().toString());
        }

        private void createStream() throws IOException {
            Exception latestException = null;
            for (int attempt = 0; attempt < 10; attempt++) {
                try {
                    OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware(
                            fs, createStatePath(), WriteMode.NO_OVERWRITE);
                    this.outStream = streamAndPath.stream();
                    this.statePath = streamAndPath.path();
                    return;
                }
                catch (Exception e) {
                    latestException = e;
                }
            }

            throw new IOException("Could not open output stream for state backend", latestException);
        }
    }
  • FsCheckpointStateOutputStream inherits CHECKPOINTS TREAMFACTORY. CHECKPOINTS TATEOUTPUTSTREAM. Its constructor must specify basePath, fs, bufferSize, localStateThreshold
  • BufferSize is used to specify the size of the writeBuffer. in the write(int b) method, it is determined that if pos is larger than the writeBuffer size, flush operation will be performed. In the write(byte[] b, int off, int len) method, for those whose length is greater than or equal to writeBuffer.length/2, first flush and then directly write to outStream;; For those whose length is less than writeBuffer.length/2, write directly to writeBuffer (Before this, it is judged that if len is greater than remaining, the remaining data is copied to the writeBuffer and then flush is performed.)
  • The closeAndGetHandle method returns ByteStreamStateHandle for pos less than or equal to localStateThreshold, and returns FileStateHandle for POS greater than the threshold.

FsCheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java

/**
 * A storage location for checkpoints on a file system.
 */
public class FsCheckpointStorageLocation extends FsCheckpointStreamFactory implements CheckpointStorageLocation {

    private final FileSystem fileSystem;

    private final Path checkpointDirectory;

    private final Path sharedStateDirectory;

    private final Path taskOwnedStateDirectory;

    private final Path metadataFilePath;

    private final CheckpointStorageLocationReference reference;

    private final int fileStateSizeThreshold;

    public FsCheckpointStorageLocation(
            FileSystem fileSystem,
            Path checkpointDir,
            Path sharedStateDir,
            Path taskOwnedStateDir,
            CheckpointStorageLocationReference reference,
            int fileStateSizeThreshold) {

        super(fileSystem, checkpointDir, sharedStateDir, fileStateSizeThreshold);

        checkArgument(fileStateSizeThreshold >= 0);

        this.fileSystem = checkNotNull(fileSystem);
        this.checkpointDirectory = checkNotNull(checkpointDir);
        this.sharedStateDirectory = checkNotNull(sharedStateDir);
        this.taskOwnedStateDirectory = checkNotNull(taskOwnedStateDir);
        this.reference = checkNotNull(reference);

        // the metadata file should not have entropy in its path
        Path metadataDir = EntropyInjector.removeEntropyMarkerIfPresent(fileSystem, checkpointDir);

        this.metadataFilePath = new Path(metadataDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
        this.fileStateSizeThreshold = fileStateSizeThreshold;
    }

    // ------------------------------------------------------------------------
    //  Properties
    // ------------------------------------------------------------------------

    public Path getCheckpointDirectory() {
        return checkpointDirectory;
    }

    public Path getSharedStateDirectory() {
        return sharedStateDirectory;
    }

    public Path getTaskOwnedStateDirectory() {
        return taskOwnedStateDirectory;
    }

    public Path getMetadataFilePath() {
        return metadataFilePath;
    }

    // ------------------------------------------------------------------------
    //  checkpoint metadata
    // ------------------------------------------------------------------------

    @Override
    public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
        return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory);
    }

    @Override
    public void disposeOnFailure() throws IOException {
        // on a failure, no chunk in the checkpoint directory needs to be saved, so
        // we can drop it as a whole
        fileSystem.delete(checkpointDirectory, true);
    }

    @Override
    public CheckpointStorageLocationReference getLocationReference() {
        return reference;
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    @Override
    public String toString() {
        return "FsCheckpointStorageLocation {" +
                "fileSystem=" + fileSystem +
                ", checkpointDirectory=" + checkpointDirectory +
                ", sharedStateDirectory=" + sharedStateDirectory +
                ", taskOwnedStateDirectory=" + taskOwnedStateDirectory +
                ", metadataFilePath=" + metadataFilePath +
                ", reference=" + reference +
                ", fileStateSizeThreshold=" + fileStateSizeThreshold +
                '}';
    }

    @VisibleForTesting
    FileSystem getFileSystem() {
        return fileSystem;
    }
}
  • FsCheckpointStorageLocation implements the createMetadataOutputStream, disposeOnFailure, getLocationReference methods of the CheckpointStorageLocation interface.
  • The createMetadataOutputStream method creates fsscheckpointmetadata outputstream; The disposeOnFailure method directly executes filesystem.delete (CheckpointDirectory, true) to delete files; The getLocationReference method returns checkpointstoragelocationreference
  • FsCheckpointStorageLocation inherits FsCheckpointStreamFactory and therefore has the createcheckpointpointstateoutputstream method.

FsCheckpointMetadataOutputStream

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java

/**
 * A {@link CheckpointMetadataOutputStream} that writes a specified file and directory, and
 * returns a {@link FsCompletedCheckpointStorageLocation} upon closing.
 */
public final class FsCheckpointMetadataOutputStream extends CheckpointMetadataOutputStream {

    private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointMetadataOutputStream.class);

    // ------------------------------------------------------------------------

    private final FSDataOutputStream out;

    private final Path metadataFilePath;

    private final Path exclusiveCheckpointDir;

    private final FileSystem fileSystem;

    private volatile boolean closed;

    public FsCheckpointMetadataOutputStream(
            FileSystem fileSystem,
            Path metadataFilePath,
            Path exclusiveCheckpointDir) throws IOException {

        this.fileSystem = checkNotNull(fileSystem);
        this.metadataFilePath = checkNotNull(metadataFilePath);
        this.exclusiveCheckpointDir = checkNotNull(exclusiveCheckpointDir);

        this.out = fileSystem.create(metadataFilePath, WriteMode.NO_OVERWRITE);
    }

    // ------------------------------------------------------------------------
    //  I/O
    // ------------------------------------------------------------------------

    @Override
    public final void write(int b) throws IOException {
        out.write(b);
    }

    @Override
    public final void write(@Nonnull byte[] b, int off, int len) throws IOException {
        out.write(b, off, len);
    }

    @Override
    public long getPos() throws IOException {
        return out.getPos();
    }

    @Override
    public void flush() throws IOException {
        out.flush();
    }

    @Override
    public void sync() throws IOException {
        out.sync();
    }

    // ------------------------------------------------------------------------
    //  Closing
    // ------------------------------------------------------------------------

    public boolean isClosed() {
        return closed;
    }

    @Override
    public void close() {
        if (!closed) {
            closed = true;

            try {
                out.close();
                fileSystem.delete(metadataFilePath, false);
            }
            catch (Throwable t) {
                LOG.warn("Could not close the state stream for {}.", metadataFilePath, t);
            }
        }
    }

    @Override
    public FsCompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException {
        synchronized (this) {
            if (!closed) {
                try {
                    // make a best effort attempt to figure out the size
                    long size = 0;
                    try {
                        size = out.getPos();
                    } catch (Exception ignored) {}

                    out.close();

                    FileStateHandle metaDataHandle = new FileStateHandle(metadataFilePath, size);

                    return new FsCompletedCheckpointStorageLocation(
                            fileSystem, exclusiveCheckpointDir, metaDataHandle,
                            metaDataHandle.getFilePath().getParent().toString());
                }
                catch (Exception e) {
                    try {
                        fileSystem.delete(metadataFilePath, false);
                    }
                    catch (Exception deleteException) {
                        LOG.warn("Could not delete the checkpoint stream file {}.", metadataFilePath, deleteException);
                    }

                    throw new IOException("Could not flush and close the file system " +
                            "output stream to " + metadataFilePath + " in order to obtain the " +
                            "stream state handle", e);
                }
                finally {
                    closed = true;
                }
            }
            else {
                throw new IOException("Stream has already been closed and discarded.");
            }
        }
    }
}
  • FSCCHECKPOINTMETADATA OUTPUT TREAM inherits CHECKPOINTMETADATA OUTPUT TREAM, while CHECKPOINTMETADATA OUTPUT TREAM inherits FSDataOutputStream; ; The closeAndFinalizeCheckpoint method here returns fsCompletedCheckPoint StorageLocation

Summary

  • FsCheckpointStorage’s InitializationForCheckPoint method, ResolveCheckPoint StorageLocation method, and createSavepointLocation method create FsCheckpointStorageLocation; ; While the createTaskOwnedStateStream method creates FsCheckpointStateOutputStream
  • FsCheckpointStorageLocation inherits FsCheckpointStreamFactory and implements the createMetadataOutputStream, disposeOnFailure, and getLocationReference methods of the CheckpointStorageLocation interface. The createMetadataOutputStream method creates fscheckpointmetadata outputstream (FSCCHECKPOINTMETADATA OUTPUT TREAM inherits CHECKPOINTMETADATA OUTPUT TREAM, while CHECKPOINTMETADATA OUTPUT TREAM inherits FSDataOutputStream;; The closeAndFinalizeCheckpoint method here returns fsCompletedCheckPoint StorageLocation); The disposeOnFailure method directly executes filesystem.delete (CheckpointDirectory, true) to delete files; The getLocationReference method returns checkpointstoragelocationreference
  • FsCheckpointStreamFactory implements the CheckpointStreamFactory interface, where the CreatecheckPOINTS TATEOUTPUTSTREAM method returns FsCheckpointStateOutputStream; ; FsCheckpointStateOutputStream inherits CHECKPOINTS TREAMFACTORY. CHECKPOINTS TATEOUTPUTSTREAM; Its constructor must specify basePath, fs, bufferSize, localStateThreshold. the closeAndGetHandle method returns ByteStreamStateHandle for pos less than or equal to localStateThreshold, and FileStateHandle for pos greater than or equal to the threshold

doc