Talk about flink taskmanager’s data.port and rpc.port

  flink

Order

This article mainly studies flink taskmanager’s data.port and rpc.port

TaskManagerServices

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
    //......

    public static TaskManagerServices fromConfiguration(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            ResourceID resourceID,
            Executor taskIOExecutor,
            long freeHeapMemoryWithDefrag,
            long maxJvmHeapMemory) throws Exception {

        // pre-start checks
        checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

        final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
        network.start();

        final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
            resourceID,
            taskManagerServicesConfiguration.getTaskManagerAddress(),
            network.getConnectionManager().getDataPort());

        // this call has to happen strictly after the network stack has been initialized
        final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);

        // start the I/O manager, it will create some temp directories.
        final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

        final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

        final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());

        for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
            resourceProfiles.add(ResourceProfile.ANY);
        }

        final TimerService<AllocationID> timerService = new TimerService<>(
            new ScheduledThreadPoolExecutor(1),
            taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());

        final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);

        final JobManagerTable jobManagerTable = new JobManagerTable();

        final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);

        final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();

        final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];

        for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
            stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
        }

        final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
            taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
            stateRootDirectoryFiles,
            taskIOExecutor);

        return new TaskManagerServices(
            taskManagerLocation,
            memoryManager,
            ioManager,
            network,
            broadcastVariableManager,
            taskSlotTable,
            jobManagerTable,
            jobLeaderService,
            taskStateManager);
    }

    private static NetworkEnvironment createNetworkEnvironment(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            long maxJvmHeapMemory) {

        NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();

        final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration, maxJvmHeapMemory);
        int segmentSize = networkEnvironmentConfiguration.networkBufferSize();

        // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
        final long numNetBuffersLong = networkBuf / segmentSize;
        if (numNetBuffersLong > Integer.MAX_VALUE) {
            throw new IllegalArgumentException("The given number of memory bytes (" + networkBuf
                + ") corresponds to more than MAX_INT pages.");
        }

        NetworkBufferPool networkBufferPool = new NetworkBufferPool(
            (int) numNetBuffersLong,
            segmentSize);

        ConnectionManager connectionManager;
        boolean enableCreditBased = false;
        NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig();
        if (nettyConfig != null) {
            connectionManager = new NettyConnectionManager(nettyConfig);
            enableCreditBased = nettyConfig.isCreditBasedEnabled();
        } else {
            connectionManager = new LocalConnectionManager();
        }

        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

        KvStateRegistry kvStateRegistry = new KvStateRegistry();

        QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();

        int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();

        int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();

        final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
                taskManagerServicesConfiguration.getTaskManagerAddress(),
                qsConfig.getProxyPortRange(),
                numProxyServerNetworkThreads,
                numProxyServerQueryThreads,
                new DisabledKvStateRequestStats());

        int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();

        int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();

        final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
                taskManagerServicesConfiguration.getTaskManagerAddress(),
                qsConfig.getStateServerPortRange(),
                numStateServerNetworkThreads,
                numStateServerQueryThreads,
                kvStateRegistry,
                new DisabledKvStateRequestStats());

        // we start the network first, to make sure it can allocate its buffers first
        return new NetworkEnvironment(
            networkBufferPool,
            connectionManager,
            resultPartitionManager,
            taskEventDispatcher,
            kvStateRegistry,
            kvStateServer,
            kvClientProxy,
            networkEnvironmentConfiguration.ioMode(),
            networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
            networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
            networkEnvironmentConfiguration.networkBuffersPerChannel(),
            networkEnvironmentConfiguration.floatingNetworkBuffersPerGate(),
            enableCreditBased);
    }

    //......
}
  • The fromConfiguration method of TaskManagerServices reads the configuration from taskmanagerservicesconfiguration, then creates NetworkEnvironment, and then creates TaskManagerLocation using networkenvironment.getconnectionmanager (). getdataport ()
  • Taskexecutortoresourcemanager connection and ConnectionID both obtained dataPort information from TaskManagerLocation.
  • The createNetworkEnvironment method obtains networkenvironment configuration (It reads taskmanager.data.port from the configuration file), if its nettyConfig is not null, NettyConnectionManager is created based on it

NettyConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java

public class NettyConnectionManager implements ConnectionManager {

    private final NettyServer server;

    private final NettyClient client;

    private final NettyBufferPool bufferPool;

    private final PartitionRequestClientFactory partitionRequestClientFactory;

    public NettyConnectionManager(NettyConfig nettyConfig) {
        this.server = new NettyServer(nettyConfig);
        this.client = new NettyClient(nettyConfig);
        this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

        this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
    }

    @Override
    public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
        NettyProtocol partitionRequestProtocol = new NettyProtocol(
            partitionProvider,
            taskEventDispatcher,
            client.getConfig().isCreditBasedEnabled());

        client.init(partitionRequestProtocol, bufferPool);
        server.init(partitionRequestProtocol, bufferPool);
    }

    @Override
    public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
            throws IOException, InterruptedException {
        return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
    }

    @Override
    public void closeOpenChannelConnections(ConnectionID connectionId) {
        partitionRequestClientFactory.closeOpenChannelConnections(connectionId);
    }

    @Override
    public int getNumberOfActiveConnections() {
        return partitionRequestClientFactory.getNumberOfActiveClients();
    }

    @Override
    public int getDataPort() {
        if (server != null && server.getLocalAddress() != null) {
            return server.getLocalAddress().getPort();
        } else {
            return -1;
        }
    }

    @Override
    public void shutdown() {
        client.shutdown();
        server.shutdown();
    }

    NettyClient getClient() {
        return client;
    }

    NettyServer getServer() {
        return server;
    }

    NettyBufferPool getBufferPool() {
        return bufferPool;
    }
}
  • NettyConnectionManager’s constructor constructs NettyServer according to NettyConfig, while getDataPort takes server. GetLocaAddress (). GetPort ()

TaskManagerRunner

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java

public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
    //......

    public static RpcService createRpcService(
        final Configuration configuration,
        final HighAvailabilityServices haServices) throws Exception {

        checkNotNull(configuration);
        checkNotNull(haServices);

        String taskManagerHostname = configuration.getString(TaskManagerOptions.HOST);

        if (taskManagerHostname != null) {
            LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname);
        } else {
            Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());

            InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
                haServices.getResourceManagerLeaderRetriever(),
                lookupTimeout);

            taskManagerHostname = taskManagerAddress.getHostName();

            LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
                taskManagerHostname, taskManagerAddress.getHostAddress());
        }

        final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
        return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration);
    }

    //......
}
  • TaskManagerRunner provides the createRpcService method, which reads taskmanager.rpc.port from the configuration file and then calls akkaRpcServicesutils.createrpcservice to create rpcservice

Summary

  • The fromConfiguration method of TaskManagerServices reads the configuration from taskmanagerservicesconfiguration, then creates NetworkEnvironment, and then creates TaskManagerLocation using networkenvironment.getconnectionmanager (). getdataport (); Taskexecutortoresourcemanager connection and ConnectionID both obtained dataPort information from TaskManagerLocation.
  • The createNetworkEnvironment method of TaskManagerServices obtains networkenvironment configuration (It reads taskmanager.data.port from the configuration file), if its nettyConfig is not null, NettyConnectionManagerï¼› is created based on it; NettyConnectionManager’s constructor constructs NettyServer according to NettyConfig, while getDataPort takes server. GetLocaAddress (). GetPort ()
  • TaskManagerRunner provides the createRpcService method, which reads taskmanager.rpc.port from the configuration file and then calls akkaRpcServicesutils.createrpcservice to create rpcservice

doc