Talk about flink’s MemCheckpointStreamFactory

  flink

Order

This article mainly studies flink’s MemCheckpointStreamFactory

CheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/CheckpointStreamFactory.java

/**
 * A factory for checkpoint output streams, which are used to persist data for checkpoints.
 *
 * <p>Stream factories can be created from the {@link CheckpointStorage} through
 * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}.
 */
public interface CheckpointStreamFactory {

    CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException;

    abstract class CheckpointStateOutputStream extends FSDataOutputStream {

        @Nullable
        public abstract StreamStateHandle closeAndGetHandle() throws IOException;

        @Override
        public abstract void close() throws IOException;
    }
}
  • Checkpoint output streams (Data used to persist checkpoint) factory, which defines the createcheckpointstateoutputstream method, here is CheckpointStateOutputStream; returned; CheckpointStateOutputStream inherits FSDataOutputStream, which defines two abstract methods: closeAndGetHandle and close
  • CheckpointStreamFactory has two implementation classes named factory, MemCheckpointStreamFactory (It has two subclasses: NonPersistence MetadataCheckPoints StorageLocation and Persistence MetadataCheckPoints StorageLocation)、FsCheckpointStreamFactory(It has a subclass FsCheckpointStorageLocation.)
  • The CheckpointStorageLocation interface inherits the CheckpointStreamFactory interface and has three implementation classes, namely, nonpersistencemetalatacheckpointstoragelocation, persistencemetalatacheckpointstoragelocation, FsCheckpointStorageLocation

FSDataOutputStream

flink-core-1.7.0-sources.jar! /org/apache/flink/core/fs/FSDataOutputStream.java

@Public
public abstract class FSDataOutputStream extends OutputStream {

    public abstract long getPos() throws IOException;

    public abstract void flush() throws IOException;

    public abstract void sync() throws IOException;

    public abstract void close() throws IOException;
}
  • FSDataOutputStream inherits java’s OutputStream, which defines several abstract methods such as getPos, flush, sync and close.

CheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/CheckpointStorageLocation.java

/**
 * A storage location for one particular checkpoint, offering data persistent, metadata persistence,
 * and lifecycle/cleanup methods.
 *
 * <p>CheckpointStorageLocations are typically created and initialized via
 * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or
 * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}.
 */
public interface CheckpointStorageLocation extends CheckpointStreamFactory {

    CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException;

    void disposeOnFailure() throws IOException;

    CheckpointStorageLocationReference getLocationReference();
}
  • CheckpointStorageLocation inherits the CheckpointStreamFactory interface, which is usually created and initialized by CheckpointStorage and provides data persistence, metadata storage and lifecycle/cleanup related methods. The createMetadataOutputStream method is defined here to create checkpointmetadata outputstream; The disposeOnFailure method is used to dispose checkpoint location; when the checkpoint fails; GetLocationReference is used to return checkpointstoragelocationreference.

MemCheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java

/**
 * {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays.
 */
public class MemCheckpointStreamFactory implements CheckpointStreamFactory {

    /** The maximal size that the snapshotted memory state may have */
    private final int maxStateSize;

    /**
     * Creates a new in-memory stream factory that accepts states whose serialized forms are
     * up to the given number of bytes.
     *
     * @param maxStateSize The maximal size of the serialized state
     */
    public MemCheckpointStreamFactory(int maxStateSize) {
        this.maxStateSize = maxStateSize;
    }

    @Override
    public CheckpointStateOutputStream createCheckpointStateOutputStream(
            CheckpointedStateScope scope) throws IOException
    {
        return new MemoryCheckpointOutputStream(maxStateSize);
    }

    @Override
    public String toString() {
        return "In-Memory Stream Factory";
    }

    static void checkSize(int size, int maxSize) throws IOException {
        if (size > maxSize) {
            throw new IOException(
                    "Size of the state is larger than the maximum permitted memory-backed state. Size="
                            + size + " , maxSize=" + maxSize
                            + " . Consider using a different state backend, like the File System State backend.");
        }
    }



    /**
     * A {@code CheckpointStateOutputStream} that writes into a byte array.
     */
    public static class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {

        private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();

        private final int maxSize;

        private AtomicBoolean closed;

        boolean isEmpty = true;

        public MemoryCheckpointOutputStream(int maxSize) {
            this.maxSize = maxSize;
            this.closed = new AtomicBoolean(false);
        }

        @Override
        public void write(int b) throws IOException {
            os.write(b);
            isEmpty = false;
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            os.write(b, off, len);
            isEmpty = false;
        }

        @Override
        public void flush() throws IOException {
            os.flush();
        }

        @Override
        public void sync() throws IOException { }

        // --------------------------------------------------------------------

        @Override
        public void close() {
            if (closed.compareAndSet(false, true)) {
                closeInternal();
            }
        }

        @Nullable
        @Override
        public StreamStateHandle closeAndGetHandle() throws IOException {
            if (isEmpty) {
                return null;
            }
            return new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), closeAndGetBytes());
        }

        @Override
        public long getPos() throws IOException {
            return os.getPosition();
        }

        public boolean isClosed() {
            return closed.get();
        }

        /**
         * Closes the stream and returns the byte array containing the stream's data.
         * @return The byte array containing the stream's data.
         * @throws IOException Thrown if the size of the data exceeds the maximal
         */
        public byte[] closeAndGetBytes() throws IOException {
            if (closed.compareAndSet(false, true)) {
                checkSize(os.size(), maxSize);
                byte[] bytes = os.toByteArray();
                closeInternal();
                return bytes;
            } else {
                throw new IOException("stream has already been closed");
            }
        }

        private void closeInternal() {
            os.reset();
        }
    }
}
  • MemCheckpointStreamFactory implements CheckpointStreamFactory interface, where createcheckpointpointstateoutputstream method returns MemoryCheckpointOutputStream
  • MemoryCheckpointOutputStream inherits CheckpointStateOutputStream and uses ByteArrayOutputStreamWithPos. It checks whether the size exceeds the limit o f maxSize when closeAndGetHandle, and throws IOException exception if it exceeds the limit.
  • MemCheckpointStreamFactory has two subclasses: none persistent metadataCheckpointStorageLocation and persistent metadatacheckpointstoragelocation, which all implement checkpointstoragelocation interface

NonPersistentMetadataCheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java

/**
 * A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence
 * for metadata has been configured.
 */
public class NonPersistentMetadataCheckpointStorageLocation
        extends MemCheckpointStreamFactory
        implements CheckpointStorageLocation {

    /** The external pointer returned for checkpoints that are not externally addressable. */
    public static final String EXTERNAL_POINTER = "<checkpoint-not-externally-addressable>";

    public NonPersistentMetadataCheckpointStorageLocation(int maxStateSize) {
        super(maxStateSize);
    }

    @Override
    public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
        return new MetadataOutputStream();
    }

    @Override
    public void disposeOnFailure() {}

    @Override
    public CheckpointStorageLocationReference getLocationReference() {
        return CheckpointStorageLocationReference.getDefault();
    }

    // ------------------------------------------------------------------------
    //  CompletedCheckpointStorageLocation
    // ------------------------------------------------------------------------

    /**
     * A {@link CompletedCheckpointStorageLocation} that is not persistent and only holds the
     * metadata in an internal byte array.
     */
    private static class NonPersistentCompletedCheckpointStorageLocation implements CompletedCheckpointStorageLocation {

        private static final long serialVersionUID = 1L;

        private final ByteStreamStateHandle metaDataHandle;

        NonPersistentCompletedCheckpointStorageLocation(ByteStreamStateHandle metaDataHandle) {
            this.metaDataHandle = metaDataHandle;
        }

        @Override
        public String getExternalPointer() {
            return EXTERNAL_POINTER;
        }

        @Override
        public StreamStateHandle getMetadataHandle() {
            return metaDataHandle;
        }

        @Override
        public void disposeStorageLocation() {}
    }

    // ------------------------------------------------------------------------
    //  CheckpointMetadataOutputStream
    // ------------------------------------------------------------------------

    private static class MetadataOutputStream extends CheckpointMetadataOutputStream {

        private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();

        private boolean closed;

        @Override
        public void write(int b) throws IOException {
            os.write(b);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            os.write(b, off, len);
        }

        @Override
        public void flush() throws IOException {
            os.flush();
        }

        @Override
        public long getPos() throws IOException {
            return os.getPosition();
        }

        @Override
        public void sync() throws IOException { }

        @Override
        public CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException {
            synchronized (this) {
                if (!closed) {
                    closed = true;

                    byte[] bytes = os.toByteArray();
                    ByteStreamStateHandle handle = new ByteStreamStateHandle(UUID.randomUUID().toString(), bytes);
                    return new NonPersistentCompletedCheckpointStorageLocation(handle);
                } else {
                    throw new IOException("Already closed");
                }
            }
        }

        @Override
        public void close() {
            if (!closed) {
                closed = true;
                os.reset();
            }
        }
    }
}
  • MemoryBackendCheckpointStorage checkpointsDirectory is not configured, NonPersistence MetadataCheckPointStorageLocation is created; Its createMetadataOutputStream method creates MetadataOutputStream
  • MetadataOutputStream inherits CheckpointMetadataOutputStream, which uses ByteArrayOutputStreamWithPos, while closeAndFinalizeCheckpoint returns NonPersistenceCompletedCodePointStorageLocation
  • Nonpersistedcheckpointstoragelocation implements the completeddcheckpointstoragelocation interface, and its getMetadataHandle method returns ByteStreamStateHandle

PersistentMetadataCheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java

/**
 * A checkpoint storage location for the {@link MemoryStateBackend} when it durably
 * persists the metadata in a file system.
 */
public class PersistentMetadataCheckpointStorageLocation
        extends MemCheckpointStreamFactory
        implements CheckpointStorageLocation {

    private final FileSystem fileSystem;

    private final Path checkpointDirectory;

    private final Path metadataFilePath;

    /**
     * Creates a checkpoint storage persists metadata to a file system and stores state
     * in line in state handles with the metadata.
     *
     * @param fileSystem The file system to which the metadata will be written.
     * @param checkpointDir The directory where the checkpoint metadata will be written.
     */
    public PersistentMetadataCheckpointStorageLocation(
            FileSystem fileSystem,
            Path checkpointDir,
            int maxStateSize) {

        super(maxStateSize);

        this.fileSystem = checkNotNull(fileSystem);
        this.checkpointDirectory = checkNotNull(checkpointDir);
        this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
    }

    // ------------------------------------------------------------------------

    @Override
    public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
        return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory);
    }

    @Override
    public void disposeOnFailure() throws IOException {
        // on a failure, no chunk in the checkpoint directory needs to be saved, so
        // we can drop it as a whole
        fileSystem.delete(checkpointDirectory, true);
    }

    @Override
    public CheckpointStorageLocationReference getLocationReference() {
        return CheckpointStorageLocationReference.getDefault();
    }
}
  • MemoryBackendCheckpointStorage checkpointsDirectory is configured, Persistence MetadataCheckPointStorageLocation is created; Its createMetadataOutputStream method creates fsscheckpointmetadata outputstream; The constructor of fsscheckpointmetadataoutputstream receives three parameters, namely fileSystem, metadataFilePath, exclusiveCheckpointDir; ; The fileSystem is used to create FSDataOutputStream according to metadataFilePath, while exclusiveCheckpointDir is used when returning fscompletedcheckpointstoragelocation.

Summary

  • MemoryBackendCheckpointStorage checkpointsDirectory is not configured, NonPersistence MetadataCheckPointStorageLocation is created; When checkpointsDirectory is configured, Persistence MetadataCheckPoints StorageLocation is created.
  • The NonPersistence MetadataCheckpointStorageLocation and Persistence MetadataCheckPoints StorageLocation both inherit the MemCheckpointStreamFactory class and implement the CheckPoints StorageLocation interface (The CheckpointMetadataOutputStream type returned by its createMetadataOutputStream method is MetadataOutputStream and fscheckpointmetadataoutputstream respectively.)
  • MemCheckpointStreamFactory implements CheckpointStreamFactory interface, and its createcheckpointpointstateoutputstream method returns MemoryCheckpointOutputStream; ; CheckpointStorageLocation inherits the CheckpointStreamFactory interface, which is usually created and initialized by CheckpointStorage, providing data persistence, metadata storage and lifecycle/cleanup related methods.

doc