Talk about BlobServer of flink

  flink

Order

This article mainly studies flink’s BlobServer

BlobServer

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java

public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {

    /** The log object used for debugging. */
    private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);

    /** Counter to generate unique names for temporary files. */
    private final AtomicLong tempFileCounter = new AtomicLong(0);

    /** The server socket listening for incoming connections. */
    private final ServerSocket serverSocket;

    /** Blob Server configuration. */
    private final Configuration blobServiceConfiguration;

    /** Indicates whether a shutdown of server component has been requested. */
    private final AtomicBoolean shutdownRequested = new AtomicBoolean();

    /** Root directory for local file storage. */
    private final File storageDir;

    /** Blob store for distributed file storage, e.g. in HA. */
    private final BlobStore blobStore;

    /** Set of currently running threads. */
    private final Set<BlobServerConnection> activeConnections = new HashSet<>();

    /** The maximum number of concurrent connections. */
    private final int maxConnections;

    /** Lock guarding concurrent file accesses. */
    private final ReadWriteLock readWriteLock;

    /**
     * Shutdown hook thread to ensure deletion of the local storage directory.
     */
    private final Thread shutdownHook;

    // --------------------------------------------------------------------------------------------

    /**
     * Map to store the TTL of each element stored in the local storage, i.e. via one of the {@link
     * #getFile} methods.
     **/
    private final ConcurrentHashMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes =
        new ConcurrentHashMap<>();

    /** Time interval (ms) to run the cleanup task; also used as the default TTL. */
    private final long cleanupInterval;

    /**
     * Timer task to execute the cleanup at regular intervals.
     */
    private final Timer cleanupTimer;

    /**
     * Instantiates a new BLOB server and binds it to a free network port.
     *
     * @param config Configuration to be used to instantiate the BlobServer
     * @param blobStore BlobStore to store blobs persistently
     *
     * @throws IOException
     *         thrown if the BLOB server cannot bind to a free network port or if the
     *         (local or distributed) file storage cannot be created or is not usable
     */
    public BlobServer(Configuration config, BlobStore blobStore) throws IOException {
        this.blobServiceConfiguration = checkNotNull(config);
        this.blobStore = checkNotNull(blobStore);
        this.readWriteLock = new ReentrantReadWriteLock();

        // configure and create the storage directory
        this.storageDir = BlobUtils.initLocalStorageDirectory(config);
        LOG.info("Created BLOB server storage directory {}", storageDir);

        // configure the maximum number of concurrent connections
        final int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT);
        if (maxConnections >= 1) {
            this.maxConnections = maxConnections;
        }
        else {
            LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}",
                    maxConnections, BlobServerOptions.FETCH_CONCURRENT.defaultValue());
            this.maxConnections = BlobServerOptions.FETCH_CONCURRENT.defaultValue();
        }

        // configure the backlog of connections
        int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG);
        if (backlog < 1) {
            LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}",
                    backlog, BlobServerOptions.FETCH_BACKLOG.defaultValue());
            backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue();
        }

        // Initializing the clean up task
        this.cleanupTimer = new Timer(true);

        this.cleanupInterval = config.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
        this.cleanupTimer
            .schedule(new TransientBlobCleanupTask(blobExpiryTimes, readWriteLock.writeLock(),
                storageDir, LOG), cleanupInterval, cleanupInterval);

        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG);

        //  ----------------------- start the server -------------------

        final String serverPortRange = config.getString(BlobServerOptions.PORT);
        final Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange);

        final ServerSocketFactory socketFactory;
        if (SSLUtils.isInternalSSLEnabled(config) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
            try {
                socketFactory = SSLUtils.createSSLServerSocketFactory(config);
            }
            catch (Exception e) {
                throw new IOException("Failed to initialize SSL for the blob server", e);
            }
        }
        else {
            socketFactory = ServerSocketFactory.getDefault();
        }

        final int finalBacklog = backlog;
        this.serverSocket = NetUtils.createSocketFromPorts(ports,
                (port) -> socketFactory.createServerSocket(port, finalBacklog));

        if (serverSocket == null) {
            throw new IOException("Unable to open BLOB Server in specified port range: " + serverPortRange);
        }

        // start the server thread
        setName("BLOB Server listener at " + getPort());
        setDaemon(true);

        if (LOG.isInfoEnabled()) {
            LOG.info("Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}",
                    serverSocket.getInetAddress().getHostAddress(), getPort(), maxConnections, backlog);
        }
    }

    //......

    @Override
    public void run() {
        try {
            while (!this.shutdownRequested.get()) {
                BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this);
                try {
                    synchronized (activeConnections) {
                        while (activeConnections.size() >= maxConnections) {
                            activeConnections.wait(2000);
                        }
                        activeConnections.add(conn);
                    }

                    conn.start();
                    conn = null;
                }
                finally {
                    if (conn != null) {
                        conn.close();
                        synchronized (activeConnections) {
                            activeConnections.remove(conn);
                        }
                    }
                }
            }
        }
        catch (Throwable t) {
            if (!this.shutdownRequested.get()) {
                LOG.error("BLOB server stopped working. Shutting down", t);

                try {
                    close();
                } catch (Throwable closeThrowable) {
                    LOG.error("Could not properly close the BlobServer.", closeThrowable);
                }
            }
        }
    }

    /**
     * Shuts down the BLOB server.
     */
    @Override
    public void close() throws IOException {
        cleanupTimer.cancel();

        if (shutdownRequested.compareAndSet(false, true)) {
            Exception exception = null;

            try {
                this.serverSocket.close();
            }
            catch (IOException ioe) {
                exception = ioe;
            }

            // wake the thread up, in case it is waiting on some operation
            interrupt();

            try {
                join();
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();

                LOG.debug("Error while waiting for this thread to die.", ie);
            }

            synchronized (activeConnections) {
                if (!activeConnections.isEmpty()) {
                    for (BlobServerConnection conn : activeConnections) {
                        LOG.debug("Shutting down connection {}.", conn.getName());
                        conn.close();
                    }
                    activeConnections.clear();
                }
            }

            // Clean up the storage directory
            try {
                FileUtils.deleteDirectory(storageDir);
            }
            catch (IOException e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }

            // Remove shutdown hook to prevent resource leaks
            ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);

            if (LOG.isInfoEnabled()) {
                LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort());
            }

            ExceptionUtils.tryRethrowIOException(exception);
        }
    }

    //......
}
  • BlobServer inherits Thread and implements BlobService, Blobserver, PermanentBlobService and TransientBlobService interfaces at the same time.
  • Its constructor uses DefaultServerSocketFactory to create the ServerSocket, and at the same time uses shutdown. AddShutdown to register shutdownHook, calling the close method when Shutdown
  • The run method of Thread is rewritten. when the shutdown request is not received, the method will continuously wait for serverSocket.accept (), and then create BlobServerConnection. if the current activeConnections exceed maxConnections, it will continuously wait for 2000 milliseconds, then maintain the connection to activeConnections, and then call conn.start ()

BlobServerConnection

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java

class BlobServerConnection extends Thread {

    /** The log object used for debugging. */
    private static final Logger LOG = LoggerFactory.getLogger(BlobServerConnection.class);

    /** The socket to communicate with the client. */
    private final Socket clientSocket;

    /** The BLOB server. */
    private final BlobServer blobServer;

    /** Read lock to synchronize file accesses. */
    private final Lock readLock;

    BlobServerConnection(Socket clientSocket, BlobServer blobServer) {
        super("BLOB connection for " + clientSocket.getRemoteSocketAddress());
        setDaemon(true);

        this.clientSocket = clientSocket;
        this.blobServer = checkNotNull(blobServer);

        ReadWriteLock readWriteLock = blobServer.getReadWriteLock();

        this.readLock = readWriteLock.readLock();
    }

    // --------------------------------------------------------------------------------------------
    //  Connection / Thread methods
    // --------------------------------------------------------------------------------------------

    /**
     * Main connection work method. Accepts requests until the other side closes the connection.
     */
    @Override
    public void run() {
        try {
            final InputStream inputStream = this.clientSocket.getInputStream();
            final OutputStream outputStream = this.clientSocket.getOutputStream();

            while (true) {
                // Read the requested operation
                final int operation = inputStream.read();
                if (operation < 0) {
                    // done, no one is asking anything from us
                    return;
                }

                switch (operation) {
                case PUT_OPERATION:
                    put(inputStream, outputStream, new byte[BUFFER_SIZE]);
                    break;
                case GET_OPERATION:
                    get(inputStream, outputStream, new byte[BUFFER_SIZE]);
                    break;
                default:
                    throw new IOException("Unknown operation " + operation);
                }
            }
        }
        catch (SocketException e) {
            // this happens when the remote site closes the connection
            LOG.debug("Socket connection closed", e);
        }
        catch (Throwable t) {
            LOG.error("Error while executing BLOB connection.", t);
        }
        finally {
            closeSilently(clientSocket, LOG);
            blobServer.unregisterConnection(this);
        }
    }

    /**
     * Closes the connection socket and lets the thread exit.
     */
    public void close() {
        closeSilently(clientSocket, LOG);
        interrupt();
    }

    // --------------------------------------------------------------------------------------------
    //  Actions
    // --------------------------------------------------------------------------------------------

    private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
        /*
         * Retrieve the file from the (distributed?) BLOB store and store it
         * locally, then send it to the service which requested it.
         *
         * Instead, we could send it from the distributed store directly but
         * chances are high that if there is one request, there will be more
         * so a local cache makes more sense.
         */

        final File blobFile;
        final JobID jobId;
        final BlobKey blobKey;

        try {
            // read HEADER contents: job ID, key, HA mode/permanent or transient BLOB
            final int mode = inputStream.read();
            if (mode < 0) {
                throw new EOFException("Premature end of GET request");
            }

            // Receive the jobId and key
            if (mode == JOB_UNRELATED_CONTENT) {
                jobId = null;
            } else if (mode == JOB_RELATED_CONTENT) {
                byte[] jidBytes = new byte[JobID.SIZE];
                readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
                jobId = JobID.fromByteArray(jidBytes);
            } else {
                throw new IOException("Unknown type of BLOB addressing: " + mode + '.');
            }
            blobKey = BlobKey.readFromInputStream(inputStream);

            checkArgument(blobKey instanceof TransientBlobKey || jobId != null,
                "Invalid BLOB addressing for permanent BLOBs");

            if (LOG.isDebugEnabled()) {
                LOG.debug("Received GET request for BLOB {}/{} from {}.", jobId,
                    blobKey, clientSocket.getInetAddress());
            }

            // the file's (destined) location at the BlobServer
            blobFile = blobServer.getStorageLocation(jobId, blobKey);

            // up to here, an error can give a good message
        }
        catch (Throwable t) {
            LOG.error("GET operation from {} failed.", clientSocket.getInetAddress(), t);
            try {
                writeErrorToStream(outputStream, t);
            }
            catch (IOException e) {
                // since we are in an exception case, it means that we could not send the error
                // ignore this
            }
            clientSocket.close();
            return;
        }

        try {

            readLock.lock();
            try {
                // copy the file to local store if it does not exist yet
                try {
                    blobServer.getFileInternal(jobId, blobKey, blobFile);

                    // enforce a 2GB max for now (otherwise the protocol's length field needs to be increased)
                    if (blobFile.length() > Integer.MAX_VALUE) {
                        throw new IOException("BLOB size exceeds the maximum size (2 GB).");
                    }

                    outputStream.write(RETURN_OKAY);
                } catch (Throwable t) {
                    LOG.error("GET operation failed for BLOB {}/{} from {}.", jobId,
                        blobKey, clientSocket.getInetAddress(), t);
                    try {
                        writeErrorToStream(outputStream, t);
                    } catch (IOException e) {
                        // since we are in an exception case, it means that we could not send the error
                        // ignore this
                    }
                    clientSocket.close();
                    return;
                }

                // from here on, we started sending data, so all we can do is close the connection when something happens
                int blobLen = (int) blobFile.length();
                writeLength(blobLen, outputStream);

                try (FileInputStream fis = new FileInputStream(blobFile)) {
                    int bytesRemaining = blobLen;
                    while (bytesRemaining > 0) {
                        int read = fis.read(buf);
                        if (read < 0) {
                            throw new IOException("Premature end of BLOB file stream for " +
                                blobFile.getAbsolutePath());
                        }
                        outputStream.write(buf, 0, read);
                        bytesRemaining -= read;
                    }
                }
            } finally {
                readLock.unlock();
            }

            // on successful transfer, delete transient files
            int result = inputStream.read();
            if (result < 0) {
                throw new EOFException("Premature end of GET request");
            } else if (blobKey instanceof TransientBlobKey && result == RETURN_OKAY) {
                // ignore the result from the operation
                if (!blobServer.deleteInternal(jobId, (TransientBlobKey) blobKey)) {
                    LOG.warn("DELETE operation failed for BLOB {}/{} from {}.", jobId,
                        blobKey, clientSocket.getInetAddress());
                }
            }

        } catch (SocketException e) {
            // happens when the other side disconnects
            LOG.debug("Socket connection closed", e);
        } catch (Throwable t) {
            LOG.error("GET operation failed", t);
            clientSocket.close();
        }

    }

    private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
        File incomingFile = null;

        try {
            // read HEADER contents: job ID, HA mode/permanent or transient BLOB
            final int mode = inputStream.read();
            if (mode < 0) {
                throw new EOFException("Premature end of PUT request");
            }

            final JobID jobId;
            if (mode == JOB_UNRELATED_CONTENT) {
                jobId = null;
            } else if (mode == JOB_RELATED_CONTENT) {
                byte[] jidBytes = new byte[JobID.SIZE];
                readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
                jobId = JobID.fromByteArray(jidBytes);
            } else {
                throw new IOException("Unknown type of BLOB addressing.");
            }

            final BlobKey.BlobType blobType;
            {
                final int read = inputStream.read();
                if (read < 0) {
                    throw new EOFException("Read an incomplete BLOB type");
                } else if (read == TRANSIENT_BLOB.ordinal()) {
                    blobType = TRANSIENT_BLOB;
                } else if (read == PERMANENT_BLOB.ordinal()) {
                    blobType = PERMANENT_BLOB;
                    checkArgument(jobId != null, "Invalid BLOB addressing for permanent BLOBs");
                } else {
                    throw new IOException("Invalid data received for the BLOB type: " + read);
                }
            }

            if (LOG.isDebugEnabled()) {
                LOG.debug("Received PUT request for BLOB of job {} with from {}.", jobId,
                    clientSocket.getInetAddress());
            }

            incomingFile = blobServer.createTemporaryFilename();
            byte[] digest = readFileFully(inputStream, incomingFile, buf);

            BlobKey blobKey = blobServer.moveTempFileToStore(incomingFile, jobId, digest, blobType);

            // Return computed key to client for validation
            outputStream.write(RETURN_OKAY);
            blobKey.writeToOutputStream(outputStream);
        }
        catch (SocketException e) {
            // happens when the other side disconnects
            LOG.debug("Socket connection closed", e);
        }
        catch (Throwable t) {
            LOG.error("PUT operation failed", t);
            try {
                writeErrorToStream(outputStream, t);
            }
            catch (IOException e) {
                // since we are in an exception case, it means not much that we could not send the error
                // ignore this
            }
            clientSocket.close();
        }
        finally {
            if (incomingFile != null) {
                if (!incomingFile.delete() && incomingFile.exists()) {
                    LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath());
                }
            }
        }
    }

    private static byte[] readFileFully(
            final InputStream inputStream, final File incomingFile, final byte[] buf)
            throws IOException {
        MessageDigest md = BlobUtils.createMessageDigest();

        try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
            while (true) {
                final int bytesExpected = readLength(inputStream);
                if (bytesExpected == -1) {
                    // done
                    break;
                }
                if (bytesExpected > BUFFER_SIZE) {
                    throw new IOException(
                        "Unexpected number of incoming bytes: " + bytesExpected);
                }

                readFully(inputStream, buf, 0, bytesExpected, "buffer");
                fos.write(buf, 0, bytesExpected);

                md.update(buf, 0, bytesExpected);
            }
            return md.digest();
        }
    }

    // --------------------------------------------------------------------------------------------
    //  Utilities
    // --------------------------------------------------------------------------------------------

    private static void writeErrorToStream(OutputStream out, Throwable t) throws IOException {
        byte[] bytes = InstantiationUtil.serializeObject(t);
        out.write(RETURN_ERROR);
        writeLength(bytes.length, out);
        out.write(bytes);
    }
}
  • BlobServerConnection inherits Thread and its constructor receives clientSocket and Blobserver. It covers Thread’s run method, which reads the requested operation from the clientSocket, calls the put method if it is PUT_OPERATION, and calls the get method if it is GET_OPERATION.
  • The put method reads jobId and blobType from inputStream, then creates an incomingFile, stores the input file into a temporary file, and then calls the blobServer.moveTempFileToStore method to store into blob server
  • The get method reads jobId and blobType from inputStream, then calls blobServer.getStorageLocation to get blobFile, then copies it to local store and writes it to outputStream.

Summary

  • BlobServer inherits Thread and implements BlobService, Blobserver, PermanentBlobService and TransientBlobService interfaces at the same time. Its constructor uses DefaultServerSocketFactory to create the ServerSocket , and at the same time uses shutdownHotl. AddShutdownHotook to register shutdownHook, and calls the close method when Shutdown occurs.
  • BlobServer rewrites Thread’s run method, which repeatedly waits for serverSocket.accept () when it does not receive a shutdown request, and then creates BlobServerConnection. if the current activeConnections exceed maxConnections, it repeatedly waits for 2000 milliseconds, then maintains the connection to activeConnections, and then calls conn.start ()
  • BlobServerConnection inherits Thread and its constructor receives clientSocket and Blobserver. It covers Thread’s run method, which reads the requested operation from the clientSocket first, calls the put method if it is PUT_OPERATION, and calls the get method if it is GET_OPERATION. The put method reads jobId and blobType from inputStream, then creates incomingFile, stores the input file to a temporary file first, and then calls the blobServer.moveTempFileToStore method to store to BlobServer; The get method reads jobId and blobType from inputStream, then calls blobServer.getStorageLocation to get blobFile, then copies it to local store and writes it to outputStream.

doc