Talk about flink TaskManager’s managed memory.

  flink

Order

This article mainly studies the managed memory of flink TaskManager.

TaskManagerOptions

flink-core-1.7.2-sources.jar! /org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolving
public class TaskManagerOptions {
    //......

    /**
     * JVM heap size for the TaskManagers with memory size.
     */
    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
    public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY =
            key("taskmanager.heap.size")
            .defaultValue("1024m")
            .withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
                    " the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
                    " YARN container, minus a certain tolerance value.");

    /**
     * Amount of memory to be allocated by the task manager's memory manager. If not
     * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
     */
    public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
            key("taskmanager.memory.size")
            .defaultValue("0")
            .withDescription("Amount of memory to be allocated by the task manager's memory manager." +
                " If not set, a relative fraction will be allocated.");

    /**
     * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is
     * not set.
     */
    public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
            key("taskmanager.memory.fraction")
            .defaultValue(0.7f)
            .withDescription("The relative amount of memory (after subtracting the amount of memory used by network" +
                " buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." +
                " For example, a value of `0.8` means that a task manager reserves 80% of its memory" +
                " for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" +
                " created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() +
                " is not set.");

    /**
     * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager
     * as well as the network buffers.
     **/
    public static final ConfigOption<Boolean> MEMORY_OFF_HEAP =
            key("taskmanager.memory.off-heap")
            .defaultValue(false)
            .withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" +
                " TaskManager as well as the network buffers.");

    /**
     * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
     */
    public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
            key("taskmanager.memory.preallocate")
            .defaultValue(false)
            .withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.");

    //......
}
  • The taskmanager.memory.size setting is the amount of memory managed by the task manager memory manager (Mainly used for sorting,hashing and caching.), the default is 0; Taskmanager.heap.size sets the memory of taskmanager’s heap and offHeap.

TaskManagerServices.calculateHeapSizeMB

flink-runtime_2.11-1.7.2-sources.jar! /org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
    //......

    /**
     * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
     * based on the total memory to use and the given configuration parameters.
     *
     * @param totalJavaMemorySizeMB
     *         overall available memory to use (heap and off-heap)
     * @param config
     *         configuration object
     *
     * @return heap memory to use (in megabytes)
     */
    public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
        Preconditions.checkArgument(totalJavaMemorySizeMB > 0);

        // subtract the Java memory used for network buffers (always off-heap)
        final long networkBufMB =
            calculateNetworkBufferMemory(
                totalJavaMemorySizeMB << 20, // megabytes to bytes
                config) >> 20; // bytes to megabytes
        final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;

        // split the available Java memory between heap and off-heap

        final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);

        final long heapSizeMB;
        if (useOffHeap) {

            long offHeapSize;
            String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
            if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
                try {
                    offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
                } catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException(
                        "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
                }
            } else {
                offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
            }

            if (offHeapSize <= 0) {
                // calculate off-heap section via fraction
                double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
                offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
            }

            TaskManagerServicesConfiguration
                .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
                    TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
                    "Managed memory size too large for " + networkBufMB +
                        " MB network buffer memory and a total of " + totalJavaMemorySizeMB +
                        " MB JVM memory");

            heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
        } else {
            heapSizeMB = remainingJavaMemorySizeMB;
        }

        return heapSizeMB;
    }

    //......
}
  • If the value of taskmanager.memory.size is less than or equal to 0, it will be allocated according to the taskmanager.memory.fraction configuration, and the default value is 0.7.
  • If taskmanager.memory.off-heap is turned on, the value obtained by taskmanager.memory.fraction * (taskmanager.heap.size-networkbufmb) is taken as offHeapSize managed by task manager memory manager.
  • If taskmanager.memory.off-heap is turned on, the Xmx value of taskmanager is taskmanager.heap.size-networkbufmb-offheap.

TaskManagerServices.createMemoryManager

flink-runtime_2.11-1.7.2-sources.jar! /org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
    //......

    /**
     * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.
     *
     * @param taskManagerServicesConfiguration to create the memory manager from
     * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
     * @param maxJvmHeapMemory the maximum JVM heap size
     * @return Memory manager
     * @throws Exception
     */
    private static MemoryManager createMemoryManager(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            long freeHeapMemoryWithDefrag,
            long maxJvmHeapMemory) throws Exception {
        // computing the amount of memory to use depends on how much memory is available
        // it strictly needs to happen AFTER the network stack has been initialized

        // check if a value has been configured
        long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();

        MemoryType memType = taskManagerServicesConfiguration.getMemoryType();

        final long memorySize;

        boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();

        if (configuredMemory > 0) {
            if (preAllocateMemory) {
                LOG.info("Using {} MB for managed memory." , configuredMemory);
            } else {
                LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
            }
            memorySize = configuredMemory << 20; // megabytes to bytes
        } else {
            // similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)
            float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();

            if (memType == MemoryType.HEAP) {
                // network buffers allocated off-heap -> use memoryFraction of the available heap:
                long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);
                if (preAllocateMemory) {
                    LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
                        memoryFraction , relativeMemSize >> 20);
                } else {
                    LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
                        "memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20);
                }
                memorySize = relativeMemSize;
            } else if (memType == MemoryType.OFF_HEAP) {
                // The maximum heap memory has been adjusted according to the fraction (see
                // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.
                // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)
                // directMemorySize = jvmTotalNoNet * memoryFraction
                long directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction);
                if (preAllocateMemory) {
                    LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
                        memoryFraction, directMemorySize >> 20);
                } else {
                    LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
                        " memory will be allocated lazily.", memoryFraction, directMemorySize >> 20);
                }
                memorySize = directMemorySize;
            } else {
                throw new RuntimeException("No supported memory type detected.");
            }
        }

        // now start the memory manager
        final MemoryManager memoryManager;
        try {
            memoryManager = new MemoryManager(
                memorySize,
                taskManagerServicesConfiguration.getNumberOfSlots(),
                taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),
                memType,
                preAllocateMemory);
        } catch (OutOfMemoryError e) {
            if (memType == MemoryType.HEAP) {
                throw new Exception("OutOfMemory error (" + e.getMessage() +
                    ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
            } else if (memType == MemoryType.OFF_HEAP) {
                throw new Exception("OutOfMemory error (" + e.getMessage() +
                    ") while allocating the TaskManager off-heap memory (" + memorySize +
                    " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
            } else {
                throw e;
            }
        }
        return memoryManager;
    }

    //......
}
  • TaskManagerServices provides a private static method createMemoryManager; for creating MemoryManager according to configuration; Here, the memorySize is recalculated according to the MemoryType, and then passed to the MemoryManager’s constructor to create the MemoryManager.
  • When memType is MemoryType.HEAP, its memorySize is relativeMemSize, relativememsize = (long) (freeheadmemorywithdefrag * memoryfragment)
  • When memType is MemoryType.OFF_HEAP, its memorySize is directMemorySize, directmemorysize = jvmtotalnonet * memorysract. And MaxJVMHEAP = JVMTotalenet-JVMTotalenet * MemoryFragmentation = JVMTotalenet * (1-MemoryFragmentation), so DirectMemorySize = (Long) (MaxJVMHEAPMemory/(1.0-MemoryFragmentation) * MemoryFragmentation)

TaskManagerServicesConfiguration

flink-runtime_2.11-1.7.2-sources.jar! /org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

public class TaskManagerServicesConfiguration {
    //......

    /**
     * Utility method to extract TaskManager config parameters from the configuration and to
     * sanity check them.
     *
     * @param configuration The configuration.
     * @param remoteAddress identifying the IP address under which the TaskManager will be accessible
     * @param localCommunication True, to skip initializing the network stack.
     *                                      Use only in cases where only one task manager runs.
     * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
     */
    public static TaskManagerServicesConfiguration fromConfiguration(
            Configuration configuration,
            InetAddress remoteAddress,
            boolean localCommunication) throws Exception {

        // we need this because many configs have been written with a "-1" entry
        int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
        if (slots == -1) {
            slots = 1;
        }

        final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
        String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);

        if (localStateRootDir.length == 0) {
            // default to temp dirs.
            localStateRootDir = tmpDirs;
        }

        boolean localRecoveryMode = configuration.getBoolean(
            CheckpointingOptions.LOCAL_RECOVERY.key(),
            CheckpointingOptions.LOCAL_RECOVERY.defaultValue());

        final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
            configuration,
            localCommunication,
            remoteAddress,
            slots);

        final QueryableStateConfiguration queryableStateConfig =
                parseQueryableStateConfiguration(configuration);

        // extract memory settings
        long configuredMemory;
        String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
        if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
            try {
                configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
            } catch (IllegalArgumentException e) {
                throw new IllegalConfigurationException(
                    "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
            }
        } else {
            configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
        }

        checkConfigParameter(
            configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) ||
                configuredMemory > 0, configuredMemory,
            TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
            "MemoryManager needs at least one MB of memory. " +
                "If you leave this config parameter empty, the system automatically " +
                "pick a fraction of the available memory.");

        // check whether we use heap or off-heap memory
        final MemoryType memType;
        if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
            memType = MemoryType.OFF_HEAP;
        } else {
            memType = MemoryType.HEAP;
        }

        boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);

        float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
        checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,
            TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
            "MemoryManager fraction of the free memory must be between 0.0 and 1.0");

        long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();

        return new TaskManagerServicesConfiguration(
            remoteAddress,
            tmpDirs,
            localStateRootDir,
            localRecoveryMode,
            networkConfig,
            queryableStateConfig,
            slots,
            configuredMemory,
            memType,
            preAllocateMemory,
            memoryFraction,
            timerServiceShutdownTimeout,
            ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
    }

    //......
}
  • TaskmanagersServicesConfiguration provides a static method fromConfiguration for creating TaskmanagersServicesConfiguration from Configuration; Where memType is based on the configuration of taskmanager.memory.off-heap; if true, MemoryType.OFF_HEAP; otherwise, MemoryType.HEAP

Summary

  • TaskManager’s managed memory classifies two types: headand offheadtypes. The taskmanager.memory.size setting is the amount of memory managed by the task manager memory manager (Mainly used for sorting,hashing and caching.), the default is 0; Taskmanager.heap.size sets the memory; of taskmanager’s heap and offHeap; If the value of taskmanager.memory.size is less than or equal to 0, it will be allocated according to taskmanager.memory.fraction configuration, with a default of 0.7; If taskmanager.memory.off-heap is turned on, the value obtained by taskmanager.memory.fraction * (taskmanager.heap.size-networkbufmb) is taken as offHeapSize; managed by task manager memory manager; If taskmanager.memory.off-heap is turned on, the Xmx value of taskmanager is taskmanager.heap.size-networkbufmb-offheap.
  • TaskManagerServices provides a private static method createMemoryManager; for creating MemoryManager according to configuration; Here, the memorySize is recalculated according to MemoryType, and then passed to the constructor of MemoryManager to create memorymanager. When memType is MemoryType.HEAP, its memorySize is relativeMemSize, relativememsize = (long) (freeheadmemorywithdefrag * memoryfragment); When memType is MemoryType.HEAP, its memorySize is relativeMemSize, relativememsize = (long) (freeheadmemorywithdefrag * memoryfragment); When memType is MemoryType.OFF_HEAP, its memorySize is directMemorySize, directmemorysize = jvmtotalnonet * memorysract. And MaxJVMHEAP = JVMTotalenet-JVMTotalenet * MemoryFragmentation = JVMTotalenet * (1-MemoryFragmentation), so DirectMemorySize = (Long) (MaxJVMHEAPMemory/(1.0-MemoryFragmentation) * MemoryFragmentation)
  • TaskmanagersServicesConfiguration provides a static method fromConfiguration for creating TaskmanagersServicesConfiguration from Configuration; Where memType is based on the configuration of taskmanager.memory.off-heap; if true, MemoryType.OFF_HEAP; otherwise, MemoryType.HEAP

doc