Talk about flink’s BlobService

  flink

Order

This article mainly studies flink’s BlobService

BlobService

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java

/**
 * A simple store and retrieve binary large objects (BLOBs).
 */
public interface BlobService extends Closeable {

    /**
     * Returns a BLOB service for accessing permanent BLOBs.
     *
     * @return BLOB service
     */
    PermanentBlobService getPermanentBlobService();

    /**
     * Returns a BLOB service for accessing transient BLOBs.
     *
     * @return BLOB service
     */
    TransientBlobService getTransientBlobService();

    /**
     * Returns the port of the BLOB server that this BLOB service is working with.
     *
     * @return the port the blob server.
     */
    int getPort();
}
  • BlobService defines the getPermanentBlobService method to obtain PermanentBlobService; ; The getTransientBlobService method is used to obtain TransientBlobService

PermanentBlobService

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java

/**
 * A service to retrieve permanent binary large objects (BLOBs).
 *
 * <p>These may include per-job BLOBs that are covered by high-availability (HA) mode, e.g. a job's
 * JAR files or (parts of) an off-loaded {@link org.apache.flink.runtime.deployment.TaskDeploymentDescriptor}
 * or files in the {@link org.apache.flink.api.common.cache.DistributedCache}.
 */
public interface PermanentBlobService extends Closeable {

    /**
     * Returns the path to a local copy of the file associated with the provided job ID and blob
     * key.
     *
     * @param jobId
     *         ID of the job this blob belongs to
     * @param key
     *         BLOB key associated with the requested file
     *
     * @return The path to the file.
     *
     * @throws java.io.FileNotFoundException
     *         if the BLOB does not exist;
     * @throws IOException
     *         if any other error occurs when retrieving the file
     */
    File getFile(JobID jobId, PermanentBlobKey key) throws IOException;

}
  • PermanentBlobService provides the getFile method, which obtains the File according to JobID and PermanentBlobKey.

TransientBlobService

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.java

/**
 * A service to retrieve transient binary large objects (BLOBs) which are deleted on the
 * {@link BlobServer} when they are retrieved.
 *
 * <p>These may include per-job BLOBs like files in the {@link
 * org.apache.flink.api.common.cache.DistributedCache}, for example.
 *
 * <p>Note: None of these BLOBs is highly available (HA). This case is covered by BLOBs in the
 * {@link PermanentBlobService}.
 *
 * <p>TODO: change API to not rely on local files but return {@link InputStream} objects
 */
public interface TransientBlobService extends Closeable {

    // --------------------------------------------------------------------------------------------
    //  GET
    // --------------------------------------------------------------------------------------------

    /**
     * Returns the path to a local copy of the (job-unrelated) file associated with the provided
     * blob key.
     *
     * @param key
     *         blob key associated with the requested file
     *
     * @return The path to the file.
     *
     * @throws java.io.FileNotFoundException
     *         when the path does not exist;
     * @throws IOException
     *         if any other error occurs when retrieving the file
     */
    File getFile(TransientBlobKey key) throws IOException;

    /**
     * Returns the path to a local copy of the file associated with the provided job ID and blob
     * key.
     *
     * @param jobId
     *         ID of the job this blob belongs to
     * @param key
     *         blob key associated with the requested file
     *
     * @return The path to the file.
     *
     * @throws java.io.FileNotFoundException
     *         when the path does not exist;
     * @throws IOException
     *         if any other error occurs when retrieving the file
     */
    File getFile(JobID jobId, TransientBlobKey key) throws IOException;

    // --------------------------------------------------------------------------------------------
    //  PUT
    // --------------------------------------------------------------------------------------------

    /**
     * Uploads the (job-unrelated) data of the given byte array to the BLOB server.
     *
     * @param value
     *         the buffer to upload
     *
     * @return the computed BLOB key identifying the BLOB on the server
     *
     * @throws IOException
     *         thrown if an I/O error occurs while uploading the data to the BLOB server
     */
    TransientBlobKey putTransient(byte[] value) throws IOException;

    /**
     * Uploads the data of the given byte array for the given job to the BLOB server.
     *
     * @param jobId
     *         the ID of the job the BLOB belongs to
     * @param value
     *         the buffer to upload
     *
     * @return the computed BLOB key identifying the BLOB on the server
     *
     * @throws IOException
     *         thrown if an I/O error occurs while uploading the data to the BLOB server
     */
    TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException;

    /**
     * Uploads the (job-unrelated) data from the given input stream to the BLOB server.
     *
     * @param inputStream
     *         the input stream to read the data from
     *
     * @return the computed BLOB key identifying the BLOB on the server
     *
     * @throws IOException
     *         thrown if an I/O error occurs while reading the data from the input stream or uploading the
     *         data to the BLOB server
     */
    TransientBlobKey putTransient(InputStream inputStream) throws IOException;

    /**
     * Uploads the data from the given input stream for the given job to the BLOB server.
     *
     * @param jobId
     *         ID of the job this blob belongs to
     * @param inputStream
     *         the input stream to read the data from
     *
     * @return the computed BLOB key identifying the BLOB on the server
     *
     * @throws IOException
     *         thrown if an I/O error occurs while reading the data from the input stream or uploading the
     *         data to the BLOB server
     */
    TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException;

    // --------------------------------------------------------------------------------------------
    //  DELETE
    // --------------------------------------------------------------------------------------------

    /**
     * Deletes the (job-unrelated) file associated with the provided blob key from the local cache.
     *
     * @param key
     *         associated with the file to be deleted
     *
     * @return  <tt>true</tt> if the given blob is successfully deleted or non-existing;
     *          <tt>false</tt> otherwise
     */
    boolean deleteFromCache(TransientBlobKey key);

    /**
     * Deletes the file associated with the provided job ID and blob key from the local cache.
     *
     * @param jobId
     *         ID of the job this blob belongs to
     * @param key
     *         associated with the file to be deleted
     *
     * @return  <tt>true</tt> if the given blob is successfully deleted or non-existing;
     *          <tt>false</tt> otherwise
     */
    boolean deleteFromCache(JobID jobId, TransientBlobKey key);

}
  • TransientBlobService is used to acquire transient binary large objects (blobs), which will be deleted on the BlobServer when acquired. It provides getFile, putTransient, deleteFromCache

BlobKey

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java

/**
 * A BLOB key uniquely identifies a BLOB.
 */
public abstract class BlobKey implements Serializable, Comparable<BlobKey> {

    private static final long serialVersionUID = 3847117712521785209L;

    /** Size of the internal BLOB key in bytes. */
    public static final int SIZE = 20;

    /** The byte buffer storing the actual key data. */
    private final byte[] key;

    /**
     * (Internal) BLOB type - to be reflected by the inheriting sub-class.
     */
    private final BlobType type;

    /**
     * BLOB type, i.e. permanent or transient.
     */
    enum BlobType {
        /**
         * Indicates a permanent BLOB whose lifecycle is that of a job and which is made highly
         * available.
         */
        PERMANENT_BLOB,
        /**
         * Indicates a transient BLOB whose lifecycle is managed by the user and which is not made
         * highly available.
         */
        TRANSIENT_BLOB
    }

    /**
     * Random component of the key.
     */
    private final AbstractID random;

    /**
     * Constructs a new BLOB key.
     *
     * @param type
     *         whether the referenced BLOB is permanent or transient
     */
    protected BlobKey(BlobType type) {
        this.type = checkNotNull(type);
        this.key = new byte[SIZE];
        this.random = new AbstractID();
    }

    /**
     * Constructs a new BLOB key from the given byte array.
     *
     * @param type
     *         whether the referenced BLOB is permanent or transient
     * @param key
     *        the actual key data
     */
    protected BlobKey(BlobType type, byte[] key) {
        if (key == null || key.length != SIZE) {
            throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
        }

        this.type = checkNotNull(type);
        this.key = key;
        this.random = new AbstractID();
    }

    /**
     * Constructs a new BLOB key from the given byte array.
     *
     * @param type
     *         whether the referenced BLOB is permanent or transient
     * @param key
     *        the actual key data
     * @param random
     *        the random component of the key
     */
    protected BlobKey(BlobType type, byte[] key, byte[] random) {
        if (key == null || key.length != SIZE) {
            throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
        }

        this.type = checkNotNull(type);
        this.key = key;
        this.random = new AbstractID(random);
    }

    /**
     * Returns the right {@link BlobKey} subclass for the given parameters.
     *
     * @param type
     *         whether the referenced BLOB is permanent or transient
     *
     * @return BlobKey subclass
     */
    @VisibleForTesting
    static BlobKey createKey(BlobType type) {
        if (type == PERMANENT_BLOB) {
            return new PermanentBlobKey();
        } else {
            return new TransientBlobKey();
        }
    }

    /**
     * Returns the right {@link BlobKey} subclass for the given parameters.
     *
     * @param type
     *         whether the referenced BLOB is permanent or transient
     * @param key
     *        the actual key data
     *
     * @return BlobKey subclass
     */
    static BlobKey createKey(BlobType type, byte[] key) {
        if (type == PERMANENT_BLOB) {
            return new PermanentBlobKey(key);
        } else {
            return new TransientBlobKey(key);
        }
    }

    /**
     * Returns the right {@link BlobKey} subclass for the given parameters.
     *
     * @param type
     *         whether the referenced BLOB is permanent or transient
     * @param key
     *        the actual key data
     * @param random
     *        the random component of the key
     *
     * @return BlobKey subclass
     */
    static BlobKey createKey(BlobType type, byte[] key, byte[] random) {
        if (type == PERMANENT_BLOB) {
            return new PermanentBlobKey(key, random);
        } else {
            return new TransientBlobKey(key, random);
        }
    }

    /**
     * Returns the hash component of this key.
     *
     * @return a 20 bit hash of the contents the key refers to
     */
    @VisibleForTesting
    public byte[] getHash() {
        return key;
    }

    /**
     * Returns the (internal) BLOB type which is reflected by the inheriting sub-class.
     *
     * @return BLOB type, i.e. permanent or transient
     */
    BlobType getType() {
        return type;
    }

    /**
     * Adds the BLOB key to the given {@link MessageDigest}.
     *
     * @param md
     *        the message digest to add the BLOB key to
     */
    public void addToMessageDigest(MessageDigest md) {
        md.update(this.key);
    }

    @Override
    public boolean equals(final Object obj) {

        if (!(obj instanceof BlobKey)) {
            return false;
        }

        final BlobKey bk = (BlobKey) obj;

        return Arrays.equals(this.key, bk.key) &&
            this.type == bk.type &&
            this.random.equals(bk.random);
    }

    @Override
    public int hashCode() {
        int result = Arrays.hashCode(this.key);
        result = 37 * result + this.type.hashCode();
        result = 37 * result + this.random.hashCode();
        return result;
    }

    @Override
    public String toString() {
        final String typeString;
        switch (this.type) {
            case TRANSIENT_BLOB:
                typeString = "t-";
                break;
            case PERMANENT_BLOB:
                typeString = "p-";
                break;
            default:
                // this actually never happens!
                throw new IllegalStateException("Invalid BLOB type");
        }
        return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString();
    }

    @Override
    public int compareTo(BlobKey o) {
        // compare the hashes first
        final byte[] aarr = this.key;
        final byte[] barr = o.key;
        final int len = Math.min(aarr.length, barr.length);

        for (int i = 0; i < len; ++i) {
            final int a = (aarr[i] & 0xff);
            final int b = (barr[i] & 0xff);
            if (a != b) {
                return a - b;
            }
        }

        if (aarr.length == barr.length) {
            // same hash contents - compare the BLOB types
            int typeCompare = this.type.compareTo(o.type);
            if (typeCompare == 0) {
                // same type - compare random components
                return this.random.compareTo(o.random);
            } else {
                return typeCompare;
            }
        } else {
            return aarr.length - barr.length;
        }
    }

    // --------------------------------------------------------------------------------------------

    /**
     * Auxiliary method to read a BLOB key from an input stream.
     *
     * @param inputStream
     *        the input stream to read the BLOB key from
     * @return the read BLOB key
     * @throws IOException
     *         throw if an I/O error occurs while reading from the input stream
     */
    static BlobKey readFromInputStream(InputStream inputStream) throws IOException {

        final byte[] key = new byte[BlobKey.SIZE];
        final byte[] random = new byte[AbstractID.SIZE];

        int bytesRead = 0;
        // read key
        while (bytesRead < key.length) {
            final int read = inputStream.read(key, bytesRead, key.length - bytesRead);
            if (read < 0) {
                throw new EOFException("Read an incomplete BLOB key");
            }
            bytesRead += read;
        }

        // read BLOB type
        final BlobType blobType;
        {
            final int read = inputStream.read();
            if (read < 0) {
                throw new EOFException("Read an incomplete BLOB type");
            } else if (read == TRANSIENT_BLOB.ordinal()) {
                blobType = TRANSIENT_BLOB;
            } else if (read == PERMANENT_BLOB.ordinal()) {
                blobType = PERMANENT_BLOB;
            } else {
                throw new IOException("Invalid data received for the BLOB type: " + read);
            }
        }

        // read random component
        bytesRead = 0;
        while (bytesRead < AbstractID.SIZE) {
            final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead);
            if (read < 0) {
                throw new EOFException("Read an incomplete BLOB key");
            }
            bytesRead += read;
        }

        return createKey(blobType, key, random);
    }

    /**
     * Auxiliary method to write this BLOB key to an output stream.
     *
     * @param outputStream
     *        the output stream to write the BLOB key to
     * @throws IOException
     *         thrown if an I/O error occurs while writing the BLOB key
     */
    void writeToOutputStream(final OutputStream outputStream) throws IOException {
        outputStream.write(this.key);
        outputStream.write(this.type.ordinal());
        outputStream.write(this.random.getBytes());
    }
}
  • BlobKey is an abstract class with three attributes: key, BlobType and AbstractID, where BlobType is divided into PERMANENT_BLOB and TRANSIENT_BLOB; ; It defines the createKey static method, which is used to create BlobKey; according to BlobType; The readFromInputStream method is used to deserialize from InputStream to BlobKey; ; The writeToOutputStream method is used to serialize BlobKey to outputstream; It has two subclasses, PermanentBlobKey and TransientBlobKey respectively.

PermanentBlobKey

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java

/**
 * BLOB key referencing permanent BLOB files.
 */
public final class PermanentBlobKey extends BlobKey {

    /**
     * Constructs a new BLOB key.
     */
    @VisibleForTesting
    public PermanentBlobKey() {
        super(BlobType.PERMANENT_BLOB);
    }

    /**
     * Constructs a new BLOB key from the given byte array.
     *
     * @param key
     *        the actual key data
     */
    PermanentBlobKey(byte[] key) {
        super(BlobType.PERMANENT_BLOB, key);
    }

    /**
     * Constructs a new BLOB key from the given byte array.
     *
     * @param key
     *        the actual key data
     * @param random
     *        the random component of the key
     */
    PermanentBlobKey(byte[] key, byte[] random) {
        super(BlobType.PERMANENT_BLOB, key, random);
    }
}
  • PermanentBlobKey inherits BlobKey, whose BlobType is BlobType.PERMANENT_BLOB

TransientBlobKey

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java

/**
 * BLOB key referencing transient BLOB files.
 */
public final class TransientBlobKey extends BlobKey {

    /**
     * Constructs a new BLOB key.
     */
    @VisibleForTesting
    public TransientBlobKey() {
        super(BlobType.TRANSIENT_BLOB);
    }

    /**
     * Constructs a new BLOB key from the given byte array.
     *
     * @param key
     *        the actual key data
     */
    TransientBlobKey(byte[] key) {
        super(BlobType.TRANSIENT_BLOB, key);
    }

    /**
     * Constructs a new BLOB key from the given byte array.
     *
     * @param key
     *        the actual key data
     * @param random
     *        the random component of the key
     */
    TransientBlobKey(byte[] key, byte[] random) {
        super(BlobType.TRANSIENT_BLOB, key, random);
    }
}
  • TransientBlobKey inherits BlobKey, whose BlobType is BlobType.TRANSIENT_BLOB

AbstractID

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/util/AbstractID.java

/**
 * A statistically unique identification number.
 */
@PublicEvolving
public class AbstractID implements Comparable<AbstractID>, java.io.Serializable {

    private static final long serialVersionUID = 1L;

    private static final Random RND = new Random();

    /** The size of a long in bytes. */
    private static final int SIZE_OF_LONG = 8;

    /** The size of the ID in byte. */
    public static final int SIZE = 2 * SIZE_OF_LONG;

    // ------------------------------------------------------------------------

    /** The upper part of the actual ID. */
    protected final long upperPart;

    /** The lower part of the actual ID. */
    protected final long lowerPart;

    /** The memoized value returned by toString(). */
    private transient String toString;

    // --------------------------------------------------------------------------------------------

    /**
     * Constructs a new ID with a specific bytes value.
     */
    public AbstractID(byte[] bytes) {
        if (bytes == null || bytes.length != SIZE) {
            throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes");
        }

        this.lowerPart = byteArrayToLong(bytes, 0);
        this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG);
    }

    /**
     * Constructs a new abstract ID.
     *
     * @param lowerPart the lower bytes of the ID
     * @param upperPart the higher bytes of the ID
     */
    public AbstractID(long lowerPart, long upperPart) {
        this.lowerPart = lowerPart;
        this.upperPart = upperPart;
    }

    /**
     * Copy constructor: Creates a new abstract ID from the given one.
     *
     * @param id the abstract ID to copy
     */
    public AbstractID(AbstractID id) {
        if (id == null) {
            throw new IllegalArgumentException("Id must not be null.");
        }
        this.lowerPart = id.lowerPart;
        this.upperPart = id.upperPart;
    }

    /**
     * Constructs a new random ID from a uniform distribution.
     */
    public AbstractID() {
        this.lowerPart = RND.nextLong();
        this.upperPart = RND.nextLong();
    }

    // --------------------------------------------------------------------------------------------

    /**
     * Gets the lower 64 bits of the ID.
     *
     * @return The lower 64 bits of the ID.
     */
    public long getLowerPart() {
        return lowerPart;
    }

    /**
     * Gets the upper 64 bits of the ID.
     *
     * @return The upper 64 bits of the ID.
     */
    public long getUpperPart() {
        return upperPart;
    }

    /**
     * Gets the bytes underlying this ID.
     *
     * @return The bytes underlying this ID.
     */
    public byte[] getBytes() {
        byte[] bytes = new byte[SIZE];
        longToByteArray(lowerPart, bytes, 0);
        longToByteArray(upperPart, bytes, SIZE_OF_LONG);
        return bytes;
    }

    // --------------------------------------------------------------------------------------------
    //  Standard Utilities
    // --------------------------------------------------------------------------------------------

    @Override
    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        } else if (obj != null && obj.getClass() == getClass()) {
            AbstractID that = (AbstractID) obj;
            return that.lowerPart == this.lowerPart && that.upperPart == this.upperPart;
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return ((int)  this.lowerPart) ^
                ((int) (this.lowerPart >>> 32)) ^
                ((int)  this.upperPart) ^
                ((int) (this.upperPart >>> 32));
    }

    @Override
    public String toString() {
        if (this.toString == null) {
            final byte[] ba = new byte[SIZE];
            longToByteArray(this.lowerPart, ba, 0);
            longToByteArray(this.upperPart, ba, SIZE_OF_LONG);

            this.toString = StringUtils.byteToHexString(ba);
        }

        return this.toString;
    }

    @Override
    public int compareTo(AbstractID o) {
        int diff1 = Long.compare(this.upperPart, o.upperPart);
        int diff2 = Long.compare(this.lowerPart, o.lowerPart);
        return diff1 == 0 ? diff2 : diff1;
    }

    // --------------------------------------------------------------------------------------------
    //  Conversion Utilities
    // --------------------------------------------------------------------------------------------

    /**
     * Converts the given byte array to a long.
     *
     * @param ba the byte array to be converted
     * @param offset the offset indicating at which byte inside the array the conversion shall begin
     * @return the long variable
     */
    private static long byteArrayToLong(byte[] ba, int offset) {
        long l = 0;

        for (int i = 0; i < SIZE_OF_LONG; ++i) {
            l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
        }

        return l;
    }

    /**
     * Converts a long to a byte array.
     *
     * @param l the long variable to be converted
     * @param ba the byte array to store the result the of the conversion
     * @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored
     */
    private static void longToByteArray(long l, byte[] ba, int offset) {
        for (int i = 0; i < SIZE_OF_LONG; ++i) {
            final int shift = i << 3; // i * 8
            ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift);
        }
    }
}
  • AbstractID consists of two long types of attributes: upperPart and lowerPart; The parameterless constructor uses Random.nextLong to generate upperPart and lowerPart; ; The constructor of the bytes parameter will parse the lowerPart and the upperPart; from bytes. You can also specify directly using the constructors of the lowerPart and upperPart parameters.

Summary

  • BlobService defines the getPermanentBlobService method to obtain PermanentBlobService; ; The getTransientBlobService method is used to obtain TransientBlobService; ; PermanentBlobService provides getFile method, which obtains File; according to JobID and PermanentBlobKey; TransientBlobService is used to acquire transient binary large objects (blobs), which will be deleted on the BlobServer when acquired. It provides getFile, putTransient, deleteFromCache
  • BlobKey is an abstract class with three attributes: key, BlobType and AbstractID, where BlobType is divided into PERMANENT_BLOB and TRANSIENT_BLOB; ; It defines the createKey static method, which is used to create BlobKey; according to BlobType; The readFromInputStream method is used to deserialize from InputStream to BlobKey; ; The writeToOutputStream method is used to serialize BlobKey to outputstream; It has two subclasses: PermanentBlobKey and TransientBlobKey; . PermanentBlobKey inherits BlobKey, whose BlobType is BlobType. Permanent _ Blob; TransientBlobKey inherits BlobKey, whose BlobType is BlobType.TRANSIENT_BLOB
  • AbstractID consists of two long types of attributes: upperPart and lowerPart; The parameterless constructor uses Random.nextLong to generate upperPart and lowerPart; ; The constructor of the bytes parameter will parse the lowerPart and the upperPart; from bytes. You can also specify directly using the constructors of the lowerPart and upperPart parameters.

doc