Talk about flink’s BlobWriter

  flink

Order

This article mainly studies flink’s BlobWriter

BlobWriter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java

/**
 * BlobWriter is used to upload data to the BLOB store.
 */
public interface BlobWriter {

    Logger LOG = LoggerFactory.getLogger(BlobWriter.class);

    /**
     * Uploads the data of the given byte array for the given job to the BLOB server and makes it
     * a permanent BLOB.
     *
     * @param jobId
     *         the ID of the job the BLOB belongs to
     * @param value
     *         the buffer to upload
     *
     * @return the computed BLOB key identifying the BLOB on the server
     *
     * @throws IOException
     *         thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
     *         store
     */
    PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException;

    /**
     * Uploads the data from the given input stream for the given job to the BLOB server and makes it
     * a permanent BLOB.
     *
     * @param jobId
     *         ID of the job this blob belongs to
     * @param inputStream
     *         the input stream to read the data from
     *
     * @return the computed BLOB key identifying the BLOB on the server
     *
     * @throws IOException
     *         thrown if an I/O error occurs while reading the data from the input stream, writing it to a
     *         local file, or uploading it to the HA store
     */
    PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException;

    /**
     * Returns the min size before data will be offloaded to the BLOB store.
     *
     * @return minimum offloading size
     */
    int getMinOffloadingSize();

    /**
     * Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
     * offloading size of the BlobServer.
     *
     * @param value to serialize
     * @param jobId to which the value belongs.
     * @param blobWriter to use to offload the serialized value
     * @param <T> type of the value to serialize
     * @return Either the serialized value or the stored blob key
     * @throws IOException if the data cannot be serialized
     */
    static <T> Either<SerializedValue<T>, PermanentBlobKey> serializeAndTryOffload(
            T value,
            JobID jobId,
            BlobWriter blobWriter) throws IOException {
        Preconditions.checkNotNull(value);
        Preconditions.checkNotNull(jobId);
        Preconditions.checkNotNull(blobWriter);

        final SerializedValue<T> serializedValue = new SerializedValue<>(value);

        if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
            return Either.Left(new SerializedValue<>(value));
        } else {
            try {
                final PermanentBlobKey permanentBlobKey = blobWriter.putPermanent(jobId, serializedValue.getByteArray());

                return Either.Right(permanentBlobKey);
            } catch (IOException e) {
                LOG.warn("Failed to offload value {} for job {} to BLOB store.", value, jobId, e);

                return Either.Left(serializedValue);
            }
        }
    }
}
  • BlobWriter defines the putPermanent and getMinOffloadingSize methods, and also provides the serializeAndTryOffload static method to serialize the specified value and call blobWriter.putPermanent to store to BlobServer when its size exceeds the minimum offloading size.

BlobServer

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java

/**
 * This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and
 * spawning threads to handle these requests. Furthermore, it takes care of creating the directory structure to store
 * the BLOBs or temporarily cache them.
 */
public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {
    //......

    @Override
    public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
        checkNotNull(jobId);
        return (PermanentBlobKey) putBuffer(jobId, value, PERMANENT_BLOB);
    }

    @Override
    public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
        checkNotNull(jobId);
        return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);
    }

    /**
     * Returns the configuration used by the BLOB server.
     *
     * @return configuration
     */
    @Override
    public final int getMinOffloadingSize() {
        return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
    }

    /**
     * Uploads the data of the given byte array for the given job to the BLOB server.
     *
     * @param jobId
     *         the ID of the job the BLOB belongs to
     * @param value
     *         the buffer to upload
     * @param blobType
     *         whether to make the data permanent or transient
     *
     * @return the computed BLOB key identifying the BLOB on the server
     *
     * @throws IOException
     *         thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
     *         store
     */
    private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType blobType)
            throws IOException {

        if (LOG.isDebugEnabled()) {
            LOG.debug("Received PUT call for BLOB of job {}.", jobId);
        }

        File incomingFile = createTemporaryFilename();
        MessageDigest md = BlobUtils.createMessageDigest();
        BlobKey blobKey = null;
        try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
            md.update(value);
            fos.write(value);
        } catch (IOException ioe) {
            // delete incomingFile from a failed download
            if (!incomingFile.delete() && incomingFile.exists()) {
                LOG.warn("Could not delete the staging file {} for job {}.",
                    incomingFile, jobId);
            }
            throw ioe;
        }

        try {
            // persist file
            blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);

            return blobKey;
        } finally {
            // delete incomingFile from a failed download
            if (!incomingFile.delete() && incomingFile.exists()) {
                LOG.warn("Could not delete the staging file {} for blob key {} and job {}.",
                    incomingFile, blobKey, jobId);
            }
        }
    }

    /**
     * Uploads the data from the given input stream for the given job to the BLOB server.
     *
     * @param jobId
     *         the ID of the job the BLOB belongs to
     * @param inputStream
     *         the input stream to read the data from
     * @param blobType
     *         whether to make the data permanent or transient
     *
     * @return the computed BLOB key identifying the BLOB on the server
     *
     * @throws IOException
     *         thrown if an I/O error occurs while reading the data from the input stream, writing it to a
     *         local file, or uploading it to the HA store
     */
    private BlobKey putInputStream(
            @Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)
            throws IOException {

        if (LOG.isDebugEnabled()) {
            LOG.debug("Received PUT call for BLOB of job {}.", jobId);
        }

        File incomingFile = createTemporaryFilename();
        MessageDigest md = BlobUtils.createMessageDigest();
        BlobKey blobKey = null;
        try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
            // read stream
            byte[] buf = new byte[BUFFER_SIZE];
            while (true) {
                final int bytesRead = inputStream.read(buf);
                if (bytesRead == -1) {
                    // done
                    break;
                }
                fos.write(buf, 0, bytesRead);
                md.update(buf, 0, bytesRead);
            }

            // persist file
            blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);

            return blobKey;
        } finally {
            // delete incomingFile from a failed download
            if (!incomingFile.delete() && incomingFile.exists()) {
                LOG.warn("Could not delete the staging file {} for blob key {} and job {}.",
                    incomingFile, blobKey, jobId);
            }
        }
    }

    /**
     * Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
     * use.
     *
     * @param incomingFile
     *         temporary file created during transfer
     * @param jobId
     *         ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
     * @param digest
     *         BLOB content digest, i.e. hash
     * @param blobType
     *         whether this file is a permanent or transient BLOB
     *
     * @return unique BLOB key that identifies the BLOB on the server
     *
     * @throws IOException
     *         thrown if an I/O error occurs while moving the file or uploading it to the HA store
     */
    BlobKey moveTempFileToStore(
            File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType)
            throws IOException {

        int retries = 10;

        int attempt = 0;
        while (true) {
            // add unique component independent of the BLOB content
            BlobKey blobKey = BlobKey.createKey(blobType, digest);
            File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);

            // try again until the key is unique (put the existence check into the lock!)
            readWriteLock.writeLock().lock();
            try {
                if (!storageFile.exists()) {
                    BlobUtils.moveTempFileToStore(
                        incomingFile, jobId, blobKey, storageFile, LOG,
                        blobKey instanceof PermanentBlobKey ? blobStore : null);
                    // add TTL for transient BLOBs:
                    if (blobKey instanceof TransientBlobKey) {
                        // must be inside read or write lock to add a TTL
                        blobExpiryTimes
                            .put(Tuple2.of(jobId, (TransientBlobKey) blobKey),
                                System.currentTimeMillis() + cleanupInterval);
                    }
                    return blobKey;
                }
            } finally {
                readWriteLock.writeLock().unlock();
            }

            ++attempt;
            if (attempt >= retries) {
                String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + ".";
                LOG.error(message + " No retries left.");
                throw new IOException(message);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})",
                        jobId, attempt, storageFile.getAbsolutePath());
                }
            }
        }
    }

    /**
     * Returns a temporary file inside the BLOB server's incoming directory.
     *
     * @return a temporary file inside the BLOB server's incoming directory
     *
     * @throws IOException
     *         if creating the directory fails
     */
    File createTemporaryFilename() throws IOException {
        return new File(BlobUtils.getIncomingDirectory(storageDir),
                String.format("temp-%08d", tempFileCounter.getAndIncrement()));
    }

    //......
}
  • BlobServer implements the Blobserver interface. The putPermanent method uses putBuffer and putInputStream methods respectively, while the getMinOffloadingSize method obtains the BlobserverOptions. Offload _ MinSize configuration from the blobServiceConfiguration, which defaults to 1M
  • The putBuffer method receives the byte[] parameter. it writes byte[] to a temporary file first, and then calls the moveTempFileToStore method to persist. The putInputStream method receives the InputStream parameter. it also writes the InputStrea m to a temporary file and then calls the moveTempFileToStore method to persist it.
  • MoveTempFileToStore method called BlobUtils.moveTempFileToStore to transfer local temporary files to Permanentlocation; Where storageDir is initialized by blobutils. initlocalstoragedirectory (config), and storageFile is obtained by blobutils. getstoragelocation (storagedir, jobid, blobkey)

BlobUtils

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java

/**
 * Utility class to work with blob data.
 */
public class BlobUtils {
    //......

    /**
     * Creates a local storage directory for a blob service under the configuration parameter given
     * by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is <tt>null</tt> or empty, we will
     * fall back to Flink's temp directories (given by
     * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}) and choose one among them at
     * random.
     *
     * @param config
     *         Flink configuration
     *
     * @return a new local storage directory
     *
     * @throws IOException
     *         thrown if the local file storage cannot be created or is not usable
     */
    static File initLocalStorageDirectory(Configuration config) throws IOException {

        String basePath = config.getString(BlobServerOptions.STORAGE_DIRECTORY);

        File baseDir;
        if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
            final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(config);
            baseDir = new File(tmpDirPaths[RANDOM.nextInt(tmpDirPaths.length)]);
        }
        else {
            baseDir = new File(basePath);
        }

        File storageDir;

        // NOTE: although we will be using UUIDs, there may be collisions
        int maxAttempts = 10;
        for (int attempt = 0; attempt < maxAttempts; attempt++) {
            storageDir = new File(baseDir, String.format(
                    "blobStore-%s", UUID.randomUUID().toString()));

            // Create the storage dir if it doesn't exist. Only return it when the operation was
            // successful.
            if (storageDir.mkdirs()) {
                return storageDir;
            }
        }

        // max attempts exceeded to find a storage directory
        throw new IOException("Could not create storage directory for BLOB store in '" + baseDir + "'.");
    }

    /**
     * Returns the (designated) physical storage location of the BLOB with the given key.
     *
     * @param storageDir
     *         storage directory used be the BLOB service
     * @param key
     *         the key identifying the BLOB
     * @param jobId
     *         ID of the job for the incoming files (or <tt>null</tt> if job-unrelated)
     *
     * @return the (designated) physical storage location of the BLOB
     *
     * @throws IOException
     *         if creating the directory fails
     */
    static File getStorageLocation(
            File storageDir, @Nullable JobID jobId, BlobKey key) throws IOException {
        File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));

        Files.createDirectories(file.getParentFile().toPath());

        return file;
    }

    /**
     * Returns the path for the given blob key.
     *
     * <p>The returned path can be used with the (local or HA) BLOB store file system back-end for
     * recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID,
     * BlobKey)}.
     *
     * @param storageDir
     *         storage directory used be the BLOB service
     * @param key
     *         the key identifying the BLOB
     * @param jobId
     *         ID of the job for the incoming files
     *
     * @return the path to the given BLOB
     */
    static String getStorageLocationPath(
            String storageDir, @Nullable JobID jobId, BlobKey key) {
        if (jobId == null) {
            // format: $base/no_job/blob_$key
            return String.format("%s/%s/%s%s",
                storageDir, NO_JOB_DIR_PREFIX, BLOB_FILE_PREFIX, key.toString());
        } else {
            // format: $base/job_$jobId/blob_$key
            return String.format("%s/%s%s/%s%s",
                storageDir, JOB_DIR_PREFIX, jobId.toString(), BLOB_FILE_PREFIX, key.toString());
        }
    }

    /**
     * Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
     * use (not thread-safe!).
     *
     * @param incomingFile
     *         temporary file created during transfer
     * @param jobId
     *         ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
     * @param blobKey
     *         BLOB key identifying the file
     * @param storageFile
     *      (local) file where the blob is/should be stored
     * @param log
     *      logger for debug information
     * @param blobStore
     *      HA store (or <tt>null</tt> if unavailable)
     *
     * @throws IOException
     *         thrown if an I/O error occurs while moving the file or uploading it to the HA store
     */
    static void moveTempFileToStore(
            File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile,
            Logger log, @Nullable BlobStore blobStore) throws IOException {

        try {
            // first check whether the file already exists
            if (!storageFile.exists()) {
                try {
                    // only move the file if it does not yet exist
                    Files.move(incomingFile.toPath(), storageFile.toPath());

                    incomingFile = null;

                } catch (FileAlreadyExistsException ignored) {
                    log.warn("Detected concurrent file modifications. This should only happen if multiple" +
                        "BlobServer use the same storage directory.");
                    // we cannot be sure at this point whether the file has already been uploaded to the blob
                    // store or not. Even if the blobStore might shortly be in an inconsistent state, we have
                    // to persist the blob. Otherwise we might not be able to recover the job.
                }

                if (blobStore != null) {
                    // only the one moving the incoming file to its final destination is allowed to upload the
                    // file to the blob store
                    blobStore.put(storageFile, jobId, blobKey);
                }
            } else {
                log.warn("File upload for an existing file with key {} for job {}. This may indicate a duplicate upload or a hash collision. Ignoring newest upload.", blobKey, jobId);
            }
            storageFile = null;
        } finally {
            // we failed to either create the local storage file or to upload it --> try to delete the local file
            // while still having the write lock
            if (storageFile != null && !storageFile.delete() && storageFile.exists()) {
                log.warn("Could not delete the storage file {}.", storageFile);
            }
            if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
                log.warn("Could not delete the staging file {} for blob key {} and job {}.", incomingFile, blobKey, jobId);
            }
        }
    }

    //......
}
  • The initLocalStorageDirectory method reads the blobserveroptions.storage _ directory configuration (blob.storage.directory), if there is no configuration, obtain tmpDirPaths through configurationutils. parsetempdirections, and then randomly select one as baseDir, while the storageDir directory is a subdirectory of baseDir with the directory name prefix blobStore
  • The getStorageLocation method constructs a specific storage path based on storageDir according to JobID and BlobKey, and its format is$base/no_job/blob_$keyOr ..$base/job_$jobId/blob_$key
  • The moveTempFileToStore method uses Files.move to transfer incomingFile to storageFile when the target file does not exist. if the blobStore is not nul l, the storageFile will also be put into the BlobStore.

Summary

  • BlobWriter defines the putPermanent and getMinOffloadingSize methods, and also provides the serializeAndTryOffload static method to serialize the specified value and call blobWriter.putPermanent to store to BlobServer when its size exceeds the minimum offloading size.
  • BlobServer implements the Blobserver interface. The putPermanent method uses putBuffer and putInputStream methods respectively, while the getMinOffloadingSize method obtains the BlobserverOptions. Offload _ MinSize configuration from the blobServiceConfiguration, which defaults to 1M; The putBuffer method receives the byte[] parameter. it writes byte[] to a temporary file first, and then calls the moveTempFileToStore method to persist. The putInputStream method receives the InputStream parameter, which also writes the InputStream to a temporary file first, and then calls the moveTempFileToStore method for persistence. MoveTempFileToStore method called BlobUtils.moveTempFileToStore to transfer local temporary files to Permanentlocation; Where storageDir is initialized by blobutils. initlocalstoragedirectory (config), and storageFile is obtained by blobutils. getstoragelocation (storagedir, jobid, blobkey)
  • BlobUtils’ initLocalStorageDirectory method reads blobserveroptions.storage _ directory configuration (blob.storage.directory), if there is no configuration, obtain tmpDirPaths through ConfigurationUtils. ParsetEmpDirectories, and then randomly select one as baseDir, while the storageDir directory is a subdirectory of baseDir with the directory name prefix blobStore;; The getStorageLocation method constructs a specific storage path based on storageDir according to JobID and BlobKey, and its format is$base/no_job/blob_$keyOr ..$base/job_$jobId/blob_$key; The moveTempFileToStore method uses Files.move to transfer incomingFile to storageFile when the target file does not exist. if the blobStore is not null, the storageFile will also be put into the BlobStore.

doc