Talk about memory size settings for flink TaskManager

  flink

Order

This article mainly studies the memory size setting of flink TaskManager.

flink-conf.yaml

flink-release-1.7.2/flink-dist/src/main/resources/flink-conf.yaml

# The heap size for the TaskManager JVM

taskmanager.heap.size: 1024m


# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 1

# Specify whether TaskManager's managed memory should be allocated when starting
# up (true) or when memory is requested.
#
# We recommend to set this value to 'true' only in setups for pure batch
# processing (DataSet API). Streaming setups currently do not use the TaskManager's
# managed memory: The 'rocksdb' state backend uses RocksDB's own memory management,
# while the 'memory' and 'filesystem' backends explicitly keep data as objects
# to save on serialization cost.
#
# taskmanager.memory.preallocate: false

# The amount of memory going to the network stack. These numbers usually need 
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, teh default max is 1GB.
# 
# taskmanager.network.memory.fraction: 0.1
# taskmanager.network.memory.min: 64mb
# taskmanager.network.memory.max: 1gb
  • Flink-conf.yaml provides taskmanager.heap.size to set taskmanager’s memory (Heap and offHeap) size
  • Memory related configuration (taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size) is used to set memory
  • Provides taskmanager.network.memory related configurations (taskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、 taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min) is used to set the memory of taskmanager’s netstack.

config.sh

flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/config.sh

#!/usr/bin/env bash

# WARNING !!! , these values are only used if there is nothing else is specified in
# conf/flink-conf.yaml

DEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=5                               # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS=""                            # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM=""                         # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM=""                         # Optional JVM args (TaskManager)
DEFAULT_ENV_JAVA_OPTS_HS=""                         # Optional JVM args (HistoryServer)
DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessary

KEY_TASKM_MEM_SIZE="taskmanager.heap.size"
KEY_TASKM_MEM_MB="taskmanager.heap.mb"
KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate"

KEY_TASKM_NET_BUF_FRACTION="taskmanager.network.memory.fraction"
KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min"
KEY_TASKM_NET_BUF_MAX="taskmanager.network.memory.max"
KEY_TASKM_NET_BUF_NR="taskmanager.network.numberOfBuffers" # fallback

KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"

# Define FLINK_TM_HEAP if it is not already set
if [ -z "${FLINK_TM_HEAP}" ]; then
    FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
fi

# Try read old config key, if new key not exists
if [ "${FLINK_TM_HEAP}" == 0 ]; then
    FLINK_TM_HEAP_MB=$(readFromConfig ${KEY_TASKM_MEM_MB} 0 "${YAML_CONF}")
fi

# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
    FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")

    if hasUnit ${FLINK_TM_MEM_MANAGED_SIZE}; then
        FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}))
    else
        FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}"m"))
    fi
fi

# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
    FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0.7 "${YAML_CONF}")
fi

# Define FLINK_TM_OFFHEAP if it is not already set
if [ -z "${FLINK_TM_OFFHEAP}" ]; then
    FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} "false" "${YAML_CONF}")
fi

# Define FLINK_TM_MEM_PRE_ALLOCATE if it is not already set
if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then
    FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}")
fi


# Define FLINK_TM_NET_BUF_FRACTION if it is not already set
if [ -z "${FLINK_TM_NET_BUF_FRACTION}" ]; then
    FLINK_TM_NET_BUF_FRACTION=$(readFromConfig ${KEY_TASKM_NET_BUF_FRACTION} 0.1 "${YAML_CONF}")
fi

# Define FLINK_TM_NET_BUF_MIN and FLINK_TM_NET_BUF_MAX if not already set (as a fallback)
if [ -z "${FLINK_TM_NET_BUF_MIN}" -a -z "${FLINK_TM_NET_BUF_MAX}" ]; then
    FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_NR} -1 "${YAML_CONF}")
    if [ $FLINK_TM_NET_BUF_MIN != -1 ]; then
        FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
        FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN}
    fi
fi

# Define FLINK_TM_NET_BUF_MIN if it is not already set
if [ -z "${FLINK_TM_NET_BUF_MIN}" -o "${FLINK_TM_NET_BUF_MIN}" = "-1" ]; then
    # default: 64MB = 67108864 bytes (same as the previous default with 2048 buffers of 32k each)
    FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_MIN} 67108864 "${YAML_CONF}")
    FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
fi

# Define FLINK_TM_NET_BUF_MAX if it is not already set
if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
    # default: 1GB = 1073741824 bytes
    FLINK_TM_NET_BUF_MAX=$(readFromConfig ${KEY_TASKM_NET_BUF_MAX} 1073741824 "${YAML_CONF}")
    FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX})
fi
  • Config.sh initializes variables such as flink _ TM _ header, FLINK_TM_MEM_MANAGED_SIZE, flink _ TM _ mem _ managed _ fragment, flink _ TM _ offheader, FLINK_TM_MEM_PRE_ALLOCATE, flink _ TM _ net _ buf _ fragment without setting related variables

taskmanager.sh

flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/taskmanager.sh

#!/usr/bin/env bash
# Start/stop a Flink TaskManager.
USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"

STARTSTOP=$1

ARGS=("${@:2}")

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  echo $USAGE
  exit 1
fi

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

ENTRYPOINT=taskexecutor

if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then

    # if memory allocation mode is lazy and no other JVM options are set,
    # set the 'Concurrent Mark Sweep GC'
    if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
        export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
    fi

    if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
        echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`"
    else
        flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
        FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
    fi

    if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then
        echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
        exit 1
    fi

    if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then

        TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
        # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
        TM_MAX_OFFHEAP_SIZE="8388607T"

        export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"

    fi

    # Add TaskManager-specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"

    # Startup parameters
    ARGS+=("--configDir" "${FLINK_CONF_DIR}")
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
else
    if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
        # Start a single TaskManager
        "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
    else
        # Example output from `numactl --show` on an AWS c4.8xlarge:
        # policy: default
        # preferred node: current
        # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
        # cpubind: 0 1
        # nodebind: 0 1
        # membind: 0 1
        read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
        for NODE_ID in "${NODE_LIST[@]:1}"; do
            # Start a TaskManager for each NUMA node
            numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
        done
    fi
fi
  • Taskmanager.sh first calls config.sh to initialize relevant variables, then calculates and export JVM_ARGS and FLINK_ENV_JAVA_OPTS, and finally calls flink-console.sh to start relevant classes
  • If FLINK_TM_MEM_PRE_ALLOCATE is false and FLINK_ENV_JAVA_OPTS and FLINK_ENV_JAVA_OPTS_TM are not set, then -XX:+UseG1GC is appended to JVM_ARGS; ; Then read flink _ tm _ head to flink _ tm _ head _ mb; If FLINK_TM_HEAP_MB is greater than 0, calculate TM_HEAP_SIZE through calculateTaskManagerHeapSizeMB, then set xms and Xmx with TM_HEAP_SIZE, and set MaxDirectMemorySize with TM_MAX_OFFHEAP_SIZE, which is appended to JVM_ARGS; Flink _ env _ java _ opt _ tm will be appended to flink _ env _ java _ opt
  • CalculateTaskManagerHeapSizeMB is defined in config.sh, and its corresponding java code is in TaskManagerServices.calculateHeadpsizeMB.

TaskManagerServices

flink-runtime_2.11-1.7.2-sources.jar! /org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
    //......

    /**
     * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
     * based on the total memory to use and the given configuration parameters.
     *
     * @param totalJavaMemorySizeMB
     *         overall available memory to use (heap and off-heap)
     * @param config
     *         configuration object
     *
     * @return heap memory to use (in megabytes)
     */
    public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
        Preconditions.checkArgument(totalJavaMemorySizeMB > 0);

        // subtract the Java memory used for network buffers (always off-heap)
        final long networkBufMB =
            calculateNetworkBufferMemory(
                totalJavaMemorySizeMB << 20, // megabytes to bytes
                config) >> 20; // bytes to megabytes
        final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;

        // split the available Java memory between heap and off-heap

        final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);

        final long heapSizeMB;
        if (useOffHeap) {

            long offHeapSize;
            String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
            if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
                try {
                    offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
                } catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException(
                        "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
                }
            } else {
                offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
            }

            if (offHeapSize <= 0) {
                // calculate off-heap section via fraction
                double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
                offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
            }

            TaskManagerServicesConfiguration
                .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
                    TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
                    "Managed memory size too large for " + networkBufMB +
                        " MB network buffer memory and a total of " + totalJavaMemorySizeMB +
                        " MB JVM memory");

            heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
        } else {
            heapSizeMB = remainingJavaMemorySizeMB;
        }

        return heapSizeMB;
    }

    /**
     * Calculates the amount of memory used for network buffers based on the total memory to use and
     * the according configuration parameters.
     *
     * <p>The following configuration parameters are involved:
     * <ul>
     *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
     *     <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
     *     <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
     *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
     * </ul>.
     *
     * @param totalJavaMemorySize
     *         overall available memory to use (heap and off-heap, in bytes)
     * @param config
     *         configuration object
     *
     * @return memory to use for network buffers (in bytes); at least one memory segment
     */
    @SuppressWarnings("deprecation")
    public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
        Preconditions.checkArgument(totalJavaMemorySize > 0);

        int segmentSize =
            checkedDownCast(MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());

        final long networkBufBytes;
        if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(config)) {
            // new configuration based on fractions of available memory with selectable min and max
            float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
            long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
            long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();


            TaskManagerServicesConfiguration
                .checkNetworkBufferConfig(segmentSize, networkBufFraction, networkBufMin, networkBufMax);

            networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
                (long) (networkBufFraction * totalJavaMemorySize)));

            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes < totalJavaMemorySize,
                    "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
                    "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
                    "Network buffer memory size too large: " + networkBufBytes + " >= " +
                        totalJavaMemorySize + " (total JVM memory size)");
            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes >= segmentSize,
                    "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
                    "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
                    "Network buffer memory size too small: " + networkBufBytes + " < " +
                        segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
        } else {
            // use old (deprecated) network buffers parameter
            int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
            networkBufBytes = (long) numNetworkBuffers * (long) segmentSize;

            TaskManagerServicesConfiguration.checkNetworkConfigOld(numNetworkBuffers);

            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes < totalJavaMemorySize,
                    networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
                    "Network buffer memory size too large: " + networkBufBytes + " >= " +
                        totalJavaMemorySize + " (total JVM memory size)");
            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes >= segmentSize,
                    networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
                    "Network buffer memory size too small: " + networkBufBytes + " < " +
                        segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
        }

        return networkBufBytes;
    }

    //......
}
  • FLINK_TM_HEAP is set to taskmanager’s memory (Heap and offHeap) size, and network buffers always use offHeap, so here we must first deduct this part of offHeap from FLINK_TM_HEAP and then recalculate Xms and Xmx
  • CalculateHeapSizeMB first calls calculateNetworkBufferMemory to calculate networkBufMB, and then deducts networkBufMB from totalJavaMemorySizeMB to obtain remainingJavaMemorySizeMB
  • After reading the taskmanager.memory.off-heap setting, which defaults to false, it will be returned directly as remainingJavaMemorySizeMB; If true, the value of offheap needs to be calculated, then offHeapSize deducted from remainingJavaMemorySizeMB and returned.

Summary

  • Flink-conf.yaml provides taskmanager.heap.size to set taskmanager’s memory (Heap and offHeap) size; Memory related configuration (taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size) is used to set memory;; Provides taskmanager.network.memory related configurations (taskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、 taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min) is used to set the memory of taskmanager’s netstack.
  • Taskmanager.sh first calls config.sh to initialize relevant variables, then calculates and export JVM_ARGS and FLINK_ENV_JAVA_OPTS, and finally calls flink-console.sh to start relevant classes; If FLINK_TM_MEM_PRE_ALLOCATE is false and FLINK_ENV_JAVA_OPTS and FLINK_ENV_JAVA_OPTS_TM are not set, then -XX:+UseG1GC is appended to JVM_ARGS; ; Then read flink _ tm _ head to flink _ tm _ head _ mb; If FLINK_TM_HEAP_MB is greater than 0, calculate TM_HEAP_SIZE through calculateTaskManagerHeapSizeMB, then set xms and Xmx with TM_HEAP_SIZE, set MaxDirectMemorySize with TM_MAX_OFFHEAP_SIZE, and append to JVM_ARGS; Flink _ env _ java _ opt _ tm will be appended to flink _ env _ java _ opt; CalculateTaskManagerHeapSizeMB is defined in config.sh, and its corresponding java code is in TaskManagerServices.calculateHeadpsizeMB.
  • FLINK_TM_HEAP is set to taskmanager’s memory (Heap and offHeap) size, and the network buffers always use off heat, so here we first have to deduct this part of off heat from flink _ TM _ heat and then recalculate Xms and Xmx;; CalculateHeapSizeMB first calls calculateNetworkBufferMemory to calculate networkBufMB, and then deducts networkBufMB from totalJavaMemorySizeMB to obtain remainingJavaMemorySizeMB;; After reading the taskmanager.memory.off-heap setting, which defaults to false, it will be returned directly as remainingJavaMemorySizeMB; If true, the value of offheap needs to be calculated, then offHeapSize deducted from remainingJavaMemorySizeMB and returned.

This shows that the final jvm parameters depend on JVM_ARGS and flink _ env _ Java _ opts; It is important not to set memory related parameters to JVM_ARGS, because when FLINK_TM_HEAP_MB is greater than 0, taskmanager.sh uses this value to calculate TM_HEAP_SIZE settings Xms and Xmx to append to JVM_ARGS variables, while FLINK_TM_HEAP_MB depends on FLINK_TM_HEAP or taskmanager.heap.size configuration; FLINK_ENV_JAVA_OPTS configuration depends on env.java.opts and env.java.opts.taskmanager;; Therefore, to configure the taskmanager’s memory (Heap and offHeap) size, you can specify the FLINK_TM_HEAP environment variable (such as FLINK_TM_HEAP=512m), or specify taskmanager.heap.size; in flink-conf.yaml; The final Xms and Xmx are the result of FLINK_TM_HEAP deducting offHeap. It is determined to use offHeap as network buffers. The rest depends on whether taskmanager.memory.off-heap is turned on, which defaults to false.

doc