Talk about flink’s NetworkEnvironmentConfiguration

  flink

Order

This article mainly studies flink’s NetworkEnvironmentConfiguration

NetworkEnvironmentConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java

public class NetworkEnvironmentConfiguration {

    private final float networkBufFraction;

    private final long networkBufMin;

    private final long networkBufMax;

    private final int networkBufferSize;

    private final IOMode ioMode;

    private final int partitionRequestInitialBackoff;

    private final int partitionRequestMaxBackoff;

    private final int networkBuffersPerChannel;

    private final int floatingNetworkBuffersPerGate;

    private final NettyConfig nettyConfig;

    /**
     * Constructor for a setup with purely local communication (no netty).
     */
    public NetworkEnvironmentConfiguration(
            float networkBufFraction,
            long networkBufMin,
            long networkBufMax,
            int networkBufferSize,
            IOMode ioMode,
            int partitionRequestInitialBackoff,
            int partitionRequestMaxBackoff,
            int networkBuffersPerChannel,
            int floatingNetworkBuffersPerGate) {

        this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
                ioMode,
                partitionRequestInitialBackoff, partitionRequestMaxBackoff,
                networkBuffersPerChannel, floatingNetworkBuffersPerGate,
                null);
        
    }

    public NetworkEnvironmentConfiguration(
            float networkBufFraction,
            long networkBufMin,
            long networkBufMax,
            int networkBufferSize,
            IOMode ioMode,
            int partitionRequestInitialBackoff,
            int partitionRequestMaxBackoff,
            int networkBuffersPerChannel,
            int floatingNetworkBuffersPerGate,
            @Nullable NettyConfig nettyConfig) {

        this.networkBufFraction = networkBufFraction;
        this.networkBufMin = networkBufMin;
        this.networkBufMax = networkBufMax;
        this.networkBufferSize = networkBufferSize;
        this.ioMode = ioMode;
        this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
        this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
        this.networkBuffersPerChannel = networkBuffersPerChannel;
        this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
        this.nettyConfig = nettyConfig;
    }

    // ------------------------------------------------------------------------

    public float networkBufFraction() {
        return networkBufFraction;
    }

    public long networkBufMin() {
        return networkBufMin;
    }

    public long networkBufMax() {
        return networkBufMax;
    }

    public int networkBufferSize() {
        return networkBufferSize;
    }

    public IOMode ioMode() {
        return ioMode;
    }

    public int partitionRequestInitialBackoff() {
        return partitionRequestInitialBackoff;
    }

    public int partitionRequestMaxBackoff() {
        return partitionRequestMaxBackoff;
    }

    public int networkBuffersPerChannel() {
        return networkBuffersPerChannel;
    }

    public int floatingNetworkBuffersPerGate() {
        return floatingNetworkBuffersPerGate;
    }

    public NettyConfig nettyConfig() {
        return nettyConfig;
    }

    // ------------------------------------------------------------------------

    @Override
    public int hashCode() {
        int result = 1;
        result = 31 * result + networkBufferSize;
        result = 31 * result + ioMode.hashCode();
        result = 31 * result + partitionRequestInitialBackoff;
        result = 31 * result + partitionRequestMaxBackoff;
        result = 31 * result + networkBuffersPerChannel;
        result = 31 * result + floatingNetworkBuffersPerGate;
        result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        else if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        else {
            final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;

            return this.networkBufFraction == that.networkBufFraction &&
                    this.networkBufMin == that.networkBufMin &&
                    this.networkBufMax == that.networkBufMax &&
                    this.networkBufferSize == that.networkBufferSize &&
                    this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&
                    this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
                    this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
                    this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
                    this.ioMode == that.ioMode && 
                    (nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
        }
    }

    @Override
    public String toString() {
        return "NetworkEnvironmentConfiguration{" +
                "networkBufFraction=" + networkBufFraction +
                ", networkBufMin=" + networkBufMin +
                ", networkBufMax=" + networkBufMax +
                ", networkBufferSize=" + networkBufferSize +
                ", ioMode=" + ioMode +
                ", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
                ", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +
                ", networkBuffersPerChannel=" + networkBuffersPerChannel +
                ", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
                ", nettyConfig=" + nettyConfig +
                '}';
    }
}
  • The NetworkEnvironmentConfiguration is mainly related to flink network. There are NetworkBuffrace, networkBufMin, networkBufMax, networkBufferSize, ioMode, partitionRequestInitialBackoff, partitionRequestMaxBackoff, NetworkBuffersPerChannel, floatingNetworkBuffersPerGate, nettyConfig Properties

TaskManagerServicesConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

public class TaskManagerServicesConfiguration {

    //......

    /**
     * Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}.
     *
     * @param configuration to create the network environment configuration from
     * @param localTaskManagerCommunication true if task manager communication is local
     * @param taskManagerAddress address of the task manager
     * @param slots to start the task manager with
     * @return Network environment configuration
     */
    @SuppressWarnings("deprecation")
    private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
        Configuration configuration,
        boolean localTaskManagerCommunication,
        InetAddress taskManagerAddress,
        int slots) throws Exception {

        // ----> hosts / ports for communication and data exchange

        int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);

        checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
            "Leave config parameter empty or use 0 to let the system choose a port automatically.");

        checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
            "Number of task slots must be at least one.");

        final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());

        // check page size of for minimum size
        checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
            TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
            "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);

        // check page size for power of two
        checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
            TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
            "Memory segment size must be a power of 2.");

        // network buffer memory fraction

        float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
        long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
        long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
        checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);

        // fallback: number of network buffers
        final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
        checkNetworkConfigOld(numNetworkBuffers);

        if (!hasNewNetworkBufConf(configuration)) {
            // map old config to new one:
            networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize;
        } else {
            if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
                LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
                    TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
            }
        }

        final NettyConfig nettyConfig;
        if (!localTaskManagerCommunication) {
            final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);

            nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
                taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
        } else {
            nettyConfig = null;
        }

        // Default spill I/O mode for intermediate results
        final String syncOrAsync = configuration.getString(
            ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
            ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);

        final IOManager.IOMode ioMode;
        if (syncOrAsync.equals("async")) {
            ioMode = IOManager.IOMode.ASYNC;
        } else {
            ioMode = IOManager.IOMode.SYNC;
        }

        int initialRequestBackoff = configuration.getInteger(
            TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
        int maxRequestBackoff = configuration.getInteger(
            TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);

        int buffersPerChannel = configuration.getInteger(
            TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
        int extraBuffersPerGate = configuration.getInteger(
            TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);

        return new NetworkEnvironmentConfiguration(
            networkBufFraction,
            networkBufMin,
            networkBufMax,
            pageSize,
            ioMode,
            initialRequestBackoff,
            maxRequestBackoff,
            buffersPerChannel,
            extraBuffersPerGate,
            nettyConfig);
    }

    //......
}
  • Taskmanagerservicessconfiguration has a private method, parentworkenvironmentconfiguration, for creating networkenvironmentconfiguration; It reads taskmanageroptions.memory _ segment _ size, taskmanageroptions.network _ buffers _ memory _ fraction, taskmanageroptions.network _ buffers _ memory _ min, TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、 Configuration of taskmanageroptions.network _ request _ backoff _ initial, taskmanageroptions.network _ request _ backoff _ max, taskmanageroptions.network _ buffers _ per _ channel, taskmanageroptions.network _ extra _ buffers _ per _ gate, etc

TaskManagerOptions

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolving
public class TaskManagerOptions {
    //......

    /**
     * Size of memory buffers used by the network stack and the memory manager.
     */
    public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
            key("taskmanager.memory.segment-size")
            .defaultValue("32kb")
            .withDescription("Size of memory buffers used by the network stack and the memory manager.");

    /**
     * Fraction of JVM memory to use for network buffers.
     */
    public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
            key("taskmanager.network.memory.fraction")
            .defaultValue(0.1f)
            .withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
                " data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
                " are. If a job is rejected or you get a warning that the system has not enough buffers available," +
                " increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
                "` and \"taskmanager.network.memory.max\" may override this fraction.");

    /**
     * Minimum memory size for network buffers.
     */
    public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
            key("taskmanager.network.memory.min")
            .defaultValue("64mb")
            .withDescription("Minimum memory size for network buffers.");

    /**
     * Maximum memory size for network buffers.
     */
    public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
            key("taskmanager.network.memory.max")
            .defaultValue("1gb")
            .withDescription("Maximum memory size for network buffers.");

    /**
     * Number of buffers used in the network stack. This defines the number of possible tasks and
     * shuffles.
     *
     * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
     * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
     */
    @Deprecated
    public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
            key("taskmanager.network.numberOfBuffers")
            .defaultValue(2048);

    /**
     * Minimum backoff for partition requests of input channels.
     */
    public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
            key("taskmanager.network.request-backoff.initial")
            .defaultValue(100)
            .withDeprecatedKeys("taskmanager.net.request-backoff.initial")
            .withDescription("Minimum backoff in milliseconds for partition requests of input channels.");

    /**
     * Maximum backoff for partition requests of input channels.
     */
    public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
            key("taskmanager.network.request-backoff.max")
            .defaultValue(10000)
            .withDeprecatedKeys("taskmanager.net.request-backoff.max")
            .withDescription("Maximum backoff in milliseconds for partition requests of input channels.");

    /**
     * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
     *
     * <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
     */
    public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
            key("taskmanager.network.memory.buffers-per-channel")
            .defaultValue(2)
            .withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
                "In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
                " configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
                " for parallel serialization.");

    /**
     * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
     */
    public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
            key("taskmanager.network.memory.floating-buffers-per-gate")
            .defaultValue(8)
            .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
                " In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
                " The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
                " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
                " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");                                                                                                        
    //......
}
  • Memory.segment-size specifies the size of the memory segment, which defaults to 32kb; ; Taskmanager.network.memory.fraction specifies the proportion of memory used by network buffers, which defaults to 0.1; Taskmanager.network.memory.min specifies the minimum memory used by network buffers, which defaults to 64mb; ; Taskmanager.network.memory.max specifies the maximum memory used by netbuffers, which defaults to 1gb; ; Taskmanager.network.numberofbuffers specifies the number of buffers used by the network, which defaults to 2048. this configuration has been abandoned and replaced by taskmanager.network.memory.fragment, taskmanager.network.memory.min, taskmanager.network.memory.max
  • Initial specifies the minimum backoff time for partition requests of input channels (Millisecond), the default is 100; Taskmanager.network.request-backoff.max specifies the maximum backoff time for partition requests of input channels (Millisecond), the default is 10000
  • Taskmanager.network.memory.buffers-per-channel specifies the number of buffers to use for each outgoing/incoming channel, which defaults to 2; Taskmanager.network.memory.floating-buffers-per-gate specifies the number of buffers used by each outgoing/incoming gate, which defaults to 8

NettyConfig

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java

public class NettyConfig {

    private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);

    // - Config keys ----------------------------------------------------------

    public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
            .key("taskmanager.network.netty.num-arenas")
            .defaultValue(-1)
            .withDeprecatedKeys("taskmanager.net.num-arenas")
            .withDescription("The number of Netty arenas.");

    public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
            .key("taskmanager.network.netty.server.numThreads")
            .defaultValue(-1)
            .withDeprecatedKeys("taskmanager.net.server.numThreads")
            .withDescription("The number of Netty server threads.");

    public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
            .key("taskmanager.network.netty.client.numThreads")
            .defaultValue(-1)
            .withDeprecatedKeys("taskmanager.net.client.numThreads")
            .withDescription("The number of Netty client threads.");

    public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
            .key("taskmanager.network.netty.server.backlog")
            .defaultValue(0) // default: 0 => Netty's default
            .withDeprecatedKeys("taskmanager.net.server.backlog")
            .withDescription("The netty server connection backlog.");

    public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
            .key("taskmanager.network.netty.client.connectTimeoutSec")
            .defaultValue(120) // default: 120s = 2min
            .withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
            .withDescription("The Netty client connection timeout.");

    public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
            .key("taskmanager.network.netty.sendReceiveBufferSize")
            .defaultValue(0) // default: 0 => Netty's default
            .withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
            .withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
                " (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");

    public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
            .key("taskmanager.network.netty.transport")
            .defaultValue("nio")
            .withDeprecatedKeys("taskmanager.net.transport")
            .withDescription("The Netty transport type, either \"nio\" or \"epoll\"");

    // ------------------------------------------------------------------------

    enum TransportType {
        NIO, EPOLL, AUTO
    }

    static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";

    static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";

    private final InetAddress serverAddress;

    private final int serverPort;

    private final int memorySegmentSize;

    private final int numberOfSlots;

    private final Configuration config; // optional configuration

    public NettyConfig(
            InetAddress serverAddress,
            int serverPort,
            int memorySegmentSize,
            int numberOfSlots,
            Configuration config) {

        this.serverAddress = checkNotNull(serverAddress);

        checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port number.");
        this.serverPort = serverPort;

        checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
        this.memorySegmentSize = memorySegmentSize;

        checkArgument(numberOfSlots > 0, "Number of slots");
        this.numberOfSlots = numberOfSlots;

        this.config = checkNotNull(config);

        LOG.info(this.toString());
    }

    InetAddress getServerAddress() {
        return serverAddress;
    }

    int getServerPort() {
        return serverPort;
    }

    int getMemorySegmentSize() {
        return memorySegmentSize;
    }

    public int getNumberOfSlots() {
        return numberOfSlots;
    }

    // ------------------------------------------------------------------------
    // Getters
    // ------------------------------------------------------------------------

    public int getServerConnectBacklog() {
        return config.getInteger(CONNECT_BACKLOG);
    }

    public int getNumberOfArenas() {
        // default: number of slots
        final int configValue = config.getInteger(NUM_ARENAS);
        return configValue == -1 ? numberOfSlots : configValue;
    }

    public int getServerNumThreads() {
        // default: number of task slots
        final int configValue = config.getInteger(NUM_THREADS_SERVER);
        return configValue == -1 ? numberOfSlots : configValue;
    }

    public int getClientNumThreads() {
        // default: number of task slots
        final int configValue = config.getInteger(NUM_THREADS_CLIENT);
        return configValue == -1 ? numberOfSlots : configValue;
    }

    public int getClientConnectTimeoutSeconds() {
        return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
    }

    public int getSendAndReceiveBufferSize() {
        return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
    }

    public TransportType getTransportType() {
        String transport = config.getString(TRANSPORT_TYPE);

        switch (transport) {
            case "nio":
                return TransportType.NIO;
            case "epoll":
                return TransportType.EPOLL;
            default:
                return TransportType.AUTO;
        }
    }

    @Nullable
    public SSLHandlerFactory createClientSSLEngineFactory() throws Exception {
        return getSSLEnabled() ?
                SSLUtils.createInternalClientSSLEngineFactory(config) :
                null;
    }

    @Nullable
    public SSLHandlerFactory createServerSSLEngineFactory() throws Exception {
        return getSSLEnabled() ?
                SSLUtils.createInternalServerSSLEngineFactory(config) :
                null;
    }

    public boolean getSSLEnabled() {
        return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
            && SSLUtils.isInternalSSLEnabled(config);
    }

    public boolean isCreditBasedEnabled() {
        return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
    }

    public Configuration getConfig() {
        return config;
    }

    @Override
    public String toString() {
        String format = "NettyConfig [" +
                "server address: %s, " +
                "server port: %d, " +
                "ssl enabled: %s, " +
                "memory segment size (bytes): %d, " +
                "transport type: %s, " +
                "number of server threads: %d (%s), " +
                "number of client threads: %d (%s), " +
                "server connect backlog: %d (%s), " +
                "client connect timeout (sec): %d, " +
                "send/receive buffer size (bytes): %d (%s)]";

        String def = "use Netty's default";
        String man = "manual";

        return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false",
                memorySegmentSize, getTransportType(), getServerNumThreads(),
                getServerNumThreads() == 0 ? def : man,
                getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
                getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
                getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
                getSendAndReceiveBufferSize() == 0 ? def : man);
    }
}
  • The constructor of NettyConfig receives serverAddress, serverPort, memorySegmentSize, numberOfSlots, config. It also provides getServerConnectBacklog, getNumberOfArenas, getServerNumThreads, getClientNumThreads, getClientConnectTimeoutSeconds, getSendAndReceiveBufferSize, getTransportType and other methods for reading configuration from config.
  • Netty.server.backlog is used to specify netty server’s connection backlog. the default value is 0, which means netty’s default configuration is used. Net work.netty.client.connecttimeoutsec specifies netty client’s connection timeout, which defaults to 120 (Unit second); Taskmanager.network.netty.sendreceivesize specifies the netty send/receive buffer size. the default is 0, which means netty’s default configuration is used, and the default is the configuration using system buffer size, which means /proc/sys/net/ipv4/tcp_[rw]mem; Netty.transport specifies the type of netty transport, which defaults to nio.
  • Netty.num-arenas specifies the number of netty arenas, which defaults to-1; Netty.server.numthreads specifies the number of threads for netty server, which defaults to-1; Netty.client.numthreads specifies the number of threads for netty client, which defaults to-1; For these configurations, when the configuration value is -1, the corresponding get method returns the numberOfSlots value

Summary

  • The NetworkEnvironmentConfiguration is mainly related to flink network. There are NetworkBuffrace, networkBufMin, networkBufMax, networkBufferSize, ioMode, partitionRequestInitialBackoff, partitionRequestMaxBackoff, NetworkBuffersPerChannel, floatingNetworkBuffersPerGate, nettyConfig Properties
  • Taskmanagerservicessconfiguration has a private method, parentworkenvironmentconfiguration, for creating networkenvironmentconfiguration; It reads taskmanageroptions.memory _ segment _ size, taskmanageroptions.network _ buffers _ memory _ fraction, taskmanageroptions.network _ buffers _ memory _ min, TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、 Configuration of taskmanageroptions.network _ request _ backoff _ initial, taskmanageroptions.network _ request _ backoff _ max, taskmanageroptions.network _ buffers _ per _ channel, taskmanageroptions.network _ extra _ buffers _ per _ gate, etc
  • The constructor of NettyConfig receives serverAddress, serverPort, memorySegmentSize, numberOfSlots, config. It also provides getServerConnectBacklog, getNumberOfArenas, getServerNumThreads, getClientNumThreads, getClientConnectTimeoutSeconds, getSendAndReceiveBufferSize, getTransportType and other methods for reading configuration from config.

doc