Talk about heap headsize setting of flink JobManager

  flink

Order

This article mainly studies the heap size setting of flink JobManager.

JobManagerOptions

flink-core-1.7.1-sources.jar! /org/apache/flink/configuration/JobManagerOptions.java

@PublicEvolving
public class JobManagerOptions {
    //......

    /**
     * JVM heap size for the JobManager with memory size.
     */
    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
    public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
        key("jobmanager.heap.size")
        .defaultValue("1024m")
        .withDescription("JVM heap size for the JobManager.");

    /**
     * JVM heap size (in megabytes) for the JobManager.
     * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
     */
    @Deprecated
    public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =
        key("jobmanager.heap.mb")
        .defaultValue(1024)
        .withDescription("JVM heap size (in megabytes) for the JobManager.");

    //......
}
  • The JobManager.heap.size configuration is used to specify the jobmanager size, which defaults to 1024 m; Mb configuration has been discarded.

ConfigurationUtils

flink-core-1.7.1-sources.jar! /org/apache/flink/configuration/ConfigurationUtils.java

public class ConfigurationUtils {

    private static final String[] EMPTY = new String[0];

    /**
     * Get job manager's heap memory. This method will check the new key
     * {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
     * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
     *
     * @param configuration the configuration object
     * @return the memory size of job manager's heap memory.
     */
    public static MemorySize getJobManagerHeapMemory(Configuration configuration) {
        if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
            return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
        } else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {
            return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");
        } else {
            //use default value
            return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
        }
    }

    //......
}
  • The getJobManagerHeapMemory method of ConfigurationUtils reads the Configuration from configuration and then resolves to MemorySize

MemorySize

flink-core-1.7.1-sources.jar! /org/apache/flink/configuration/MemorySize.java

@PublicEvolving
public class MemorySize implements java.io.Serializable {

    private static final long serialVersionUID = 1L;

    // ------------------------------------------------------------------------

    /** The memory size, in bytes. */
    private final long bytes;

    /**
     * Constructs a new MemorySize.
     *
     * @param bytes The size, in bytes. Must be zero or larger.
     */
    public MemorySize(long bytes) {
        checkArgument(bytes >= 0, "bytes must be >= 0");
        this.bytes = bytes;
    }

    // ------------------------------------------------------------------------

    /**
     * Gets the memory size in bytes.
     */
    public long getBytes() {
        return bytes;
    }

    /**
     * Gets the memory size in Kibibytes (= 1024 bytes).
     */
    public long getKibiBytes() {
        return bytes >> 10;
    }

    /**
     * Gets the memory size in Mebibytes (= 1024 Kibibytes).
     */
    public int getMebiBytes() {
        return (int) (bytes >> 20);
    }

    /**
     * Gets the memory size in Gibibytes (= 1024 Mebibytes).
     */
    public long getGibiBytes() {
        return bytes >> 30;
    }

    /**
     * Gets the memory size in Tebibytes (= 1024 Gibibytes).
     */
    public long getTebiBytes() {
        return bytes >> 40;
    }

    // ------------------------------------------------------------------------

    @Override
    public int hashCode() {
        return (int) (bytes ^ (bytes >>> 32));
    }

    @Override
    public boolean equals(Object obj) {
        return obj == this ||
                (obj != null && obj.getClass() == this.getClass() && ((MemorySize) obj).bytes == this.bytes);
    }

    @Override
    public String toString() {
        return bytes + " bytes";
    }

    // ------------------------------------------------------------------------
    //  Parsing
    // ------------------------------------------------------------------------

    /**
     * Parses the given string as as MemorySize.
     *
     * @param text The string to parse
     * @return The parsed MemorySize
     *
     * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
     */
    public static MemorySize parse(String text) throws IllegalArgumentException {
        return new MemorySize(parseBytes(text));
    }

    /**
     * Parses the given string with a default unit.
     *
     * @param text The string to parse.
     * @param defaultUnit specify the default unit.
     * @return The parsed MemorySize.
     *
     * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
     */
    public static MemorySize parse(String text, MemoryUnit defaultUnit) throws IllegalArgumentException {
        if (!hasUnit(text)) {
            return parse(text + defaultUnit.getUnits()[0]);
        }

        return parse(text);
    }

    /**
     * Parses the given string as bytes.
     * The supported expressions are listed under {@link MemorySize}.
     *
     * @param text The string to parse
     * @return The parsed size, in bytes.
     *
     * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
     */
    public static long parseBytes(String text) throws IllegalArgumentException {
        checkNotNull(text, "text");

        final String trimmed = text.trim();
        checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");

        final int len = trimmed.length();
        int pos = 0;

        char current;
        while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
            pos++;
        }

        final String number = trimmed.substring(0, pos);
        final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);

        if (number.isEmpty()) {
            throw new NumberFormatException("text does not start with a number");
        }

        final long value;
        try {
            value = Long.parseLong(number); // this throws a NumberFormatException on overflow
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("The value '" + number +
                    "' cannot be re represented as 64bit number (numeric overflow).");
        }

        final long multiplier;
        if (unit.isEmpty()) {
            multiplier = 1L;
        }
        else {
            if (matchesAny(unit, BYTES)) {
                multiplier = 1L;
            }
            else if (matchesAny(unit, KILO_BYTES)) {
                multiplier = 1024L;
            }
            else if (matchesAny(unit, MEGA_BYTES)) {
                multiplier = 1024L * 1024L;
            }
            else if (matchesAny(unit, GIGA_BYTES)) {
                multiplier = 1024L * 1024L * 1024L;
            }
            else if (matchesAny(unit, TERA_BYTES)) {
                multiplier = 1024L * 1024L * 1024L * 1024L;
            }
            else {
                throw new IllegalArgumentException("Memory size unit '" + unit +
                        "' does not match any of the recognized units: " + MemoryUnit.getAllUnits());
            }
        }

        final long result = value * multiplier;

        // check for overflow
        if (result / multiplier != value) {
            throw new IllegalArgumentException("The value '" + text +
                    "' cannot be re represented as 64bit number of bytes (numeric overflow).");
        }

        return result;
    }

    private static boolean matchesAny(String str, MemoryUnit unit) {
        for (String s : unit.getUnits()) {
            if (s.equals(str)) {
                return true;
            }
        }
        return false;
    }

    //......
}
  • MemorySize has a bytes field inside, taking bytes as the unit, followed by getBytes, getKibiBytes, getMebiBytes, getGibiBytes, getTebiBytes methods for fast conversion; The parse static method is used to parse and create MemorySize from the text, where the Parse method can receive the MemoryUnit parameter for the default MemoryUnit used when there is no MemoryUnit in the text, and finally the parseBytes method called.

MemoryUnit

flink-core-1.7.1-sources.jar! /org/apache/flink/configuration/MemorySize.java

    /**
     *  Enum which defines memory unit, mostly used to parse value from configuration file.
     *
     * <p>To make larger values more compact, the common size suffixes are supported:
     *
     * <ul>
     *     <li>q or 1b or 1bytes (bytes)
     *     <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
     *     <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
     *     <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
     *     <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
     * </ul>
     *
     */
    public enum MemoryUnit {

        BYTES(new String[] { "b", "bytes" }),
        KILO_BYTES(new String[] { "k", "kb", "kibibytes" }),
        MEGA_BYTES(new String[] { "m", "mb", "mebibytes" }),
        GIGA_BYTES(new String[] { "g", "gb", "gibibytes" }),
        TERA_BYTES(new String[] { "t", "tb", "tebibytes" });

        private String[] units;

        MemoryUnit(String[] units) {
            this.units = units;
        }

        public String[] getUnits() {
            return units;
        }

        public static String getAllUnits() {
            return concatenateUnits(BYTES.getUnits(), KILO_BYTES.getUnits(), MEGA_BYTES.getUnits(), GIGA_BYTES.getUnits(), TERA_BYTES.getUnits());
        }

        public static boolean hasUnit(String text) {
            checkNotNull(text, "text");

            final String trimmed = text.trim();
            checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");

            final int len = trimmed.length();
            int pos = 0;

            char current;
            while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
                pos++;
            }

            final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);

            return unit.length() > 0;
        }

        private static String concatenateUnits(final String[]... allUnits) {
            final StringBuilder builder = new StringBuilder(128);

            for (String[] units : allUnits) {
                builder.append('(');

                for (String unit : units) {
                    builder.append(unit);
                    builder.append(" | ");
                }

                builder.setLength(builder.length() - 3);
                builder.append(") / ");
            }

            builder.setLength(builder.length() - 3);
            return builder.toString();
        }

    }
  • MemoryUnit enumeration defines BYTES, KILO_BYTES, MEGA_BYTES, GIGA_BYTES, TERA_BYTES; ; It has the units property and is a string array, which is used to specify the text identification of each type of unit. The last match is converted to lowercase to match.

FlinkYarnSessionCli

flink-1.7.1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java

public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId> {
    //......

    private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
        if (cmd.hasOption(container.getOpt())) { // number of containers is required option!
            LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt());
        }

        // TODO: The number of task manager should be deprecated soon
        final int numberTaskManagers;

        if (cmd.hasOption(container.getOpt())) {
            numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt()));
        } else {
            numberTaskManagers = 1;
        }

        // JobManager Memory
        final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();

        // Task Managers memory
        final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();

        int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);

        return new ClusterSpecification.ClusterSpecificationBuilder()
            .setMasterMemoryMB(jobManagerMemoryMB)
            .setTaskManagerMemoryMB(taskManagerMemoryMB)
            .setNumberTaskManagers(numberTaskManagers)
            .setSlotsPerTaskManager(slotsPerTaskManager)
            .createClusterSpecification();
    }

    //......
}
  • FlinkYarnSessionCli’s createClusterSpecification method uses configurationutils. getjobmanagerheademory (configuration) to read jobManagerMemoryMB

config.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/config.sh

//......

DEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=5                               # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS=""                            # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM=""                         # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM=""                         # Optional JVM args (TaskManager)
DEFAULT_ENV_JAVA_OPTS_HS=""                         # Optional JVM args (HistoryServer)
DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessary

//......

# Define FLINK_JM_HEAP if it is not already set
if [ -z "${FLINK_JM_HEAP}" ]; then
    FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
fi

# Try read old config key, if new key not exists
if [ "${FLINK_JM_HEAP}" == 0 ]; then
    FLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}")
fi

//......

if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
    FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")

    # Remove leading and ending double quotes (if present) of value
    FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//'  -e 's/"$//' )"
fi

if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
    FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
    # Remove leading and ending double quotes (if present) of value
    FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//'  -e 's/"$//' )"
fi

//......

# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
if [ -z "${JVM_ARGS}" ]; then
    JVM_ARGS=""
fi

//......
  • Config.sh first judges whether the environment variable FLINK_JM_HEAP is set, and if not, reads the jobmanager.heap.size configuration from flink-conf.yaml to flink _ jm _ heap; If FLINK_JM_HEAP is 0, the jobmanager.heap.mb configuration is read to FLINK_JM_HEAP_MB
  • If FLINK_ENV_JAVA_OPTS is not set, the env.java.opts configuration is read from flink-conf.yaml; if there is no such configuration, DEFAULT_ENV_JAVA_OPTS is used, which is null by default; If FLINK_ENV_JAVA_OPTS_JM is not set, the env.java.opts.jobmanager configuration is read from flink-conf.yaml; if it is not set, DEFAULT_ENV_JAVA_OPTS_JM is used, which is empty by default
  • JVM_ARGS variable will be used by job and task manager. If it is not set, it will be initialized to null. Note do not set memory related parameters to JVM_ARGS, but use jobmanager.heap.size and taskmanager.heap.size in flink-conf.yaml to configure

jobmanager.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/jobmanager.sh

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Start/stop a Flink JobManager.
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"

STARTSTOP=$1
HOST=$2 # optional when starting multiple instances
WEBUIPORT=$3 # optional when starting multiple instances

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  echo $USAGE
  exit 1
fi

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

ENTRYPOINT=standalonesession

if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
    if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
        echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"
    else
        flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
        FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
    fi

    if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then
        echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
        exit 1
    fi

    if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"
    fi

    # Add JobManager-specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"

    # Startup parameters
    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
    if [ ! -z $HOST ]; then
        args+=("--host")
        args+=("${HOST}")
    fi

    if [ ! -z $WEBUIPORT ]; then
        args+=("--webui-port")
        args+=("${WEBUIPORT}")
    fi
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else
    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi
  • Jobmanager.sh first calls config.sh to initialize the relevant variables (FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS)
  • If the FLINK_JM_HEAP value is greater than 0, it is resolved to the FLINK_JM_HEAP_MB variable; If FLINK_JM_HEAP_MB is greater than 0, set Xms and Xmx with this value to append to JVM_ARGS variable; Then flip _ env _ java _ opts _ jm (According to env.java.opts.jobmanager configuration) to FLINK_ENV_JAVA_OPTS (According to env.java.optsIn
  • Sh finally called flink-console.sh to start the related class.

flink-console.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/flink-console.sh

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"

SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as array

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

case $SERVICE in
    (taskexecutor)
        CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    ;;

    (historyserver)
        CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
    ;;

    (zookeeper)
        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
    ;;

    (standalonesession)
        CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    ;;

    (standalonejob)
        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
    ;;

    (*)
        echo "Unknown service '${SERVICE}'. $USAGE."
        exit 1
    ;;
esac

FLINK_TM_CLASSPATH=`constructFlinkClassPath`

log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")

JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
    if [ "$JAVA_VERSION" -lt 18 ]; then
        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
    fi
fi

echo "Starting $SERVICE as a console application on host $HOSTNAME."
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
  • Sh appends-xx: maxperm size = 256m to JVM_ARGS when java is less than version 8; Then use jvm_ARGS and FLINK_ENV_JAVA_OPTS as JVM parameters to start CLASS_TO_RUN

Summary

  • The JobManager.heap.size configuration is used to specify the jobmanager size, which defaults to 1024 m; Mb configuration has been discarded; The getJobManagerHeapMemory method of ConfigurationUtils reads the Configuration from Configuration and then resolves to MemorySize; ; MemorySize has a bytes field inside, taking bytes as the unit, followed by getBytes, getKibiBytes, getMebiBytes, getGibiBytes, getTebiBytes methods for fast conversion; The parse static method is used to parse and create MemorySize from the text, where the Parse method can receive the MemoryUnit parameter for the default MemoryUnit used when there is no MemoryUnit in the text, and finally the parseBytes method called.
  • FlinkYarnSessionCli’s createClusterSpecification method uses configurationutils. getjobmanagerheademory (configuration) to read jobManagerMemoryMB
  • Config.sh first judges whether the environment variable FLINK_JM_HEAP is set, and if not, reads the jobmanager.heap.size configuration from flink-conf.yaml to flink _ jm _ heap; If FLINK_JM_HEAP is 0, read jobmanager.heap.mb’s configuration to flink _ jm _ heap _ mb; If FLINK_ENV_JAVA_OPTS is not set, the env.java.opts configuration is read from flink-conf.yaml; if there is no such configuration, DEFAULT_ENV_JAVA_OPTS is used, which is null by default; If FLINK_ENV_JAVA_OPTS_JM is not set, read env.java.opts.jobmanager configuration from flink-conf.yaml; if there is no such configuration, use DEFAULT_ENV_JAVA_OPTS_JM, which is empty by default; JVM_ARGS variable will be used by job and task manager. If it is not set, it will be initialized to null. Note do not set memory related parameters to JVM_ARGS, but use jobmanager.heap.size and taskmanager.heap.size in flink-conf.yaml to configure
  • Jobmanager.sh first calls config.sh to initialize the relevant variables (FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS); If the FLINK_JM_HEAP value is greater than 0, it is resolved to the FLINK_JM_HEAP_MB variable. If the FLINK_JM_HEAP_MB is greater than 0, Xms and Xmx are set with this value and appended to the JVM_ARGS variable. It will flip _ env _ java _ opts _ jm (According to env.java.opts.jobmanager configuration) to FLINK_ENV_JAVA_OPTS (According to env.java.opts) of the United States. Sh finally called flink-console.sh to start the related class.
  • Sh appends-xx: maxperm size = 256m to JVM_ARGS when java is less than version 8; Then use jvm_ARGS and FLINK_ENV_JAVA_OPTS as JVM parameters to start CLASS_TO_RUN

This shows that the final jvm parameters depend on JVM_ARGS and flink _ env _ Java _ opts; It is noted that the memory related parameters should not be set to JVM_ARGS, because jobmanager.sh is greater than 0 in FLINK_JM_HEAP_MB, then Xms and Xmx are set to be appended to JVM_ARGS variables using this value, while FLINK_JM_HEAP_MB depends on FLINK_JM_HEAP or jobmanager.heap.size configuration. FLINK_ENV_JAVA_OPTS configuration depends on env.java.opts and env.java.opts.jobmanager;; Therefore, to configure the jobmanager’s heap size, you can specify the FLINK_JM_HEAP environment variable (For example, FLINK_JM_HEAP=512m), or specify jobmanager.heap.size in flink-conf.yaml

doc