Talk about flink’s ConnectionManager

  flink

Order

This article mainly studies flink’s ConnectionManager.

ConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java

public interface ConnectionManager {

    void start(ResultPartitionProvider partitionProvider,
                TaskEventDispatcher taskEventDispatcher) throws IOException;

    /**
     * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.
     */
    PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException;

    /**
     * Closes opened ChannelConnections in case of a resource release.
     */
    void closeOpenChannelConnections(ConnectionID connectionId);

    int getNumberOfActiveConnections();

    int getDataPort();

    void shutdown() throws IOException;

}
  • ConnectionManager defines methods such as start, shutdown, closeOpenChannelConnections to manage physical connections; ; It has two subclasses, one is LocalConnectionManager and the other is NettyConnectionManager.

LocalConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java

public class LocalConnectionManager implements ConnectionManager {

    @Override
    public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) {
    }

    @Override
    public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
        return null;
    }

    @Override
    public void closeOpenChannelConnections(ConnectionID connectionId) {}

    @Override
    public int getNumberOfActiveConnections() {
        return 0;
    }

    @Override
    public int getDataPort() {
        return -1;
    }

    @Override
    public void shutdown() {}
}
  • LocalConnectionManager implements the ConnectionManager interface, but its implementation is basically empty

NettyConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java

public class NettyConnectionManager implements ConnectionManager {

    private final NettyServer server;

    private final NettyClient client;

    private final NettyBufferPool bufferPool;

    private final PartitionRequestClientFactory partitionRequestClientFactory;

    public NettyConnectionManager(NettyConfig nettyConfig) {
        this.server = new NettyServer(nettyConfig);
        this.client = new NettyClient(nettyConfig);
        this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

        this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
    }

    @Override
    public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
        NettyProtocol partitionRequestProtocol = new NettyProtocol(
            partitionProvider,
            taskEventDispatcher,
            client.getConfig().isCreditBasedEnabled());

        client.init(partitionRequestProtocol, bufferPool);
        server.init(partitionRequestProtocol, bufferPool);
    }

    @Override
    public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
            throws IOException, InterruptedException {
        return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
    }

    @Override
    public void closeOpenChannelConnections(ConnectionID connectionId) {
        partitionRequestClientFactory.closeOpenChannelConnections(connectionId);
    }

    @Override
    public int getNumberOfActiveConnections() {
        return partitionRequestClientFactory.getNumberOfActiveClients();
    }

    @Override
    public int getDataPort() {
        if (server != null && server.getLocalAddress() != null) {
            return server.getLocalAddress().getPort();
        } else {
            return -1;
        }
    }

    @Override
    public void shutdown() {
        client.shutdown();
        server.shutdown();
    }

    NettyClient getClient() {
        return client;
    }

    NettyServer getServer() {
        return server;
    }

    NettyBufferPool getBufferPool() {
        return bufferPool;
    }
}
  • NettyConnectionManager implements the ConnectionManager interface; Its constructor uses NettyConfig to create NettyServer, NettyClient, NettyBufferPool, and NettyClient to create PartitionRequestClientFactory.
  • The start method creates NettyProtocol and initializes NettyClient and NettyServer; at the same time. The shutdown method closes NettyClient and NettyServer; ; CloseOpenChannels Connections uses PartitionRequestClientFactory. CloseOpenChannels Connections to close the specified connectionId
  • CreatePartitionRequestClient method creates PartitionRequestClient through partitionrequestclientfactory.createpartitionrequestclient

PartitionRequestClientFactory

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java

class PartitionRequestClientFactory {

    private final NettyClient nettyClient;

    private final ConcurrentMap<ConnectionID, Object> clients = new ConcurrentHashMap<ConnectionID, Object>();

    PartitionRequestClientFactory(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
    }

    /**
     * Atomically establishes a TCP connection to the given remote address and
     * creates a {@link PartitionRequestClient} instance for this connection.
     */
    PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException {
        Object entry;
        PartitionRequestClient client = null;

        while (client == null) {
            entry = clients.get(connectionId);

            if (entry != null) {
                // Existing channel or connecting channel
                if (entry instanceof PartitionRequestClient) {
                    client = (PartitionRequestClient) entry;
                }
                else {
                    ConnectingChannel future = (ConnectingChannel) entry;
                    client = future.waitForChannel();

                    clients.replace(connectionId, future, client);
                }
            }
            else {
                // No channel yet. Create one, but watch out for a race.
                // We create a "connecting future" and atomically add it to the map.
                // Only the thread that really added it establishes the channel.
                // The others need to wait on that original establisher's future.
                ConnectingChannel connectingChannel = new ConnectingChannel(connectionId, this);
                Object old = clients.putIfAbsent(connectionId, connectingChannel);

                if (old == null) {
                    nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);

                    client = connectingChannel.waitForChannel();

                    clients.replace(connectionId, connectingChannel, client);
                }
                else if (old instanceof ConnectingChannel) {
                    client = ((ConnectingChannel) old).waitForChannel();

                    clients.replace(connectionId, old, client);
                }
                else {
                    client = (PartitionRequestClient) old;
                }
            }

            // Make sure to increment the reference count before handing a client
            // out to ensure correct bookkeeping for channel closing.
            if (!client.incrementReferenceCounter()) {
                destroyPartitionRequestClient(connectionId, client);
                client = null;
            }
        }

        return client;
    }

    public void closeOpenChannelConnections(ConnectionID connectionId) {
        Object entry = clients.get(connectionId);

        if (entry instanceof ConnectingChannel) {
            ConnectingChannel channel = (ConnectingChannel) entry;

            if (channel.dispose()) {
                clients.remove(connectionId, channel);
            }
        }
    }

    int getNumberOfActiveClients() {
        return clients.size();
    }

    /**
     * Removes the client for the given {@link ConnectionID}.
     */
    void destroyPartitionRequestClient(ConnectionID connectionId, PartitionRequestClient client) {
        clients.remove(connectionId, client);
    }

    //......
}
  • The constructor for PartitionRequestClientFactory requires a NettyClient; ; It uses ConcurrentHashMap to maintain a mapping relationship between ConnectionID and PartitionRequestClient or ConnectingChannel in memory.
  • The createPartitionRequestClient method first looks up whether there is a PartitionRequestClient or ConnectingChannel corresponding to the ConnectionID from ConcurrentHashMap, and returns if there is and is a PartitionRequestClient instance. If there is and is a ConnectingChannel instance, call the connecting channel. wait for channel to wait for the PartitionRequestClient, and then replace the value of the corresponding ConnectionID in ConcurrentHashMap with partitionrequestclient; If the ConcurrentHashMap does not have a value corresponding to the ConnectionID, a ConnectingChannel will be created and then put into the ConcurrentHashMap, obtaining the old object at the same time. If old is null, the nettyClient.connect will be used to connect, then the PartitionRequestClient will be obtained, and then the value in the ConcurrentHashMap will be replaced. If old is a ConnectingChannel, call ConnectingChannel. WaitFor Channel to wait for PartitionRequestClient and replace the value in ConcurrentHashMap; Before returning the PartitionRequestClient, the reference will be incremented by the client.incrementreferencecounter (). if the increment is unsuccessful, the destroyPartitionRequestClient will be called, null will be returned, and if the increment is successful, the PartitionRequestClient will be returned.
  • The closeOpenChannelConnections method determines that if it is a ConnectingChannel, it calls ConnectingChannel.dispose, and removes it from ConcurrentHashMap after success.

ConnectingChannel

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java

    private static final class ConnectingChannel implements ChannelFutureListener {

        private final Object connectLock = new Object();

        private final ConnectionID connectionId;

        private final PartitionRequestClientFactory clientFactory;

        private boolean disposeRequestClient = false;

        public ConnectingChannel(ConnectionID connectionId, PartitionRequestClientFactory clientFactory) {
            this.connectionId = connectionId;
            this.clientFactory = clientFactory;
        }

        private boolean dispose() {
            boolean result;
            synchronized (connectLock) {
                if (partitionRequestClient != null) {
                    result = partitionRequestClient.disposeIfNotUsed();
                }
                else {
                    disposeRequestClient = true;
                    result = true;
                }

                connectLock.notifyAll();
            }

            return result;
        }

        private void handInChannel(Channel channel) {
            synchronized (connectLock) {
                try {
                    NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);
                    partitionRequestClient = new PartitionRequestClient(
                        channel, clientHandler, connectionId, clientFactory);

                    if (disposeRequestClient) {
                        partitionRequestClient.disposeIfNotUsed();
                    }

                    connectLock.notifyAll();
                }
                catch (Throwable t) {
                    notifyOfError(t);
                }
            }
        }

        private volatile PartitionRequestClient partitionRequestClient;

        private volatile Throwable error;

        private PartitionRequestClient waitForChannel() throws IOException, InterruptedException {
            synchronized (connectLock) {
                while (error == null && partitionRequestClient == null) {
                    connectLock.wait(2000);
                }
            }

            if (error != null) {
                throw new IOException("Connecting the channel failed: " + error.getMessage(), error);
            }

            return partitionRequestClient;
        }

        private void notifyOfError(Throwable error) {
            synchronized (connectLock) {
                this.error = error;
                connectLock.notifyAll();
            }
        }

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                handInChannel(future.channel());
            }
            else if (future.cause() != null) {
                notifyOfError(new RemoteTransportException(
                        "Connecting to remote task manager + '" + connectionId.getAddress() +
                                "' has failed. This might indicate that the remote task " +
                                "manager has been lost.",
                        connectionId.getAddress(), future.cause()));
            }
            else {
                notifyOfError(new LocalTransportException(
                    String.format(
                        "Connecting to remote task manager '%s' has been cancelled.",
                        connectionId.getAddress()),
                    null));
            }
        }
    }
  • ConnectingChannel implements netty’s ChannelFutureListener interface, and its operationComplete method calls the handInChannel method when ChannelFuture is success, which creates PartitionRequestClient; ; The waitForChannel method waits for partitionRequestClient to be successfully created and then returns

Summary

  • ConnectionManager defines methods such as start, shutdown, closeOpenChannelConnections to manage physical connections; ; It has two subclasses, one is LocalConnectionManager and the other is NettyConnectionManager.
  • LocalConnectionManager implements the ConnectionManager interface, but its implementation is basically empty. NettyConnectionManager implements the ConnectionManager interface. Its constructor uses NettyConfig to cre ate NettyServer, NettyClient, NettyBufferPool, At the same time, the PartitionRequestClientFactory was created by NettyClient, NettyProtocol was created by start method, NettyClient and NettyServer were initialized at the same time, and NettyClient and NettyServer were shutdown by Shutdown method. CloseOpenChannels Connections uses PartitionRequestClientFactory. CloseOpenChannels Connections to close the specified connectionId. CreatePartitionRequestClient method creates PartitionRequestClient through partitionrequestclientfactory.createpartitionrequestclient
  • The constructor for PartitionRequestClientFactory requires a NettyClient; ; It uses ConcurrentHashMap to maintain a mapping relationship between ConnectionID and PartitionRequestClient or ConnectingChannel in memory; ConnectingChannel implements netty’s ChannelFutureListener interface, and its operationComplete method calls the handInChannel method when ChannelFuture is success, which creates PartitionRequestClient; ; The waitForChannel method waits for partitionRequestClient to be successfully created and then returns

doc