Talk about the jvm-exit-on-oom configuration of flink taskmanager.

  flink

Order

This article mainly studies the jvm-exit-on-oom configuration of flink taskmanager.

taskmanager.jvm-exit-on-oom

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolving
public class TaskManagerOptions {
    //......

    /**
     * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.
     */
    public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
            key("taskmanager.jvm-exit-on-oom")
            .defaultValue(false)
            .withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.");

    //......
}
  • Jvm-exit-on-oom configuration defaults to false, which specifies whether the TaskManager needs to be kill when the taskthread throws OutOfMemoryError.

TaskManagerConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java

public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {

    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class);

    private final int numberSlots;

    private final String[] tmpDirectories;

    private final Time timeout;

    // null indicates an infinite duration
    @Nullable
    private final Time maxRegistrationDuration;

    private final Time initialRegistrationPause;
    private final Time maxRegistrationPause;
    private final Time refusedRegistrationPause;

    private final UnmodifiableConfiguration configuration;

    private final boolean exitJvmOnOutOfMemory;

    private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;

    private final String[] alwaysParentFirstLoaderPatterns;

    @Nullable
    private final String taskManagerLogPath;

    @Nullable
    private final String taskManagerStdoutPath;

    public TaskManagerConfiguration(
        int numberSlots,
        String[] tmpDirectories,
        Time timeout,
        @Nullable Time maxRegistrationDuration,
        Time initialRegistrationPause,
        Time maxRegistrationPause,
        Time refusedRegistrationPause,
        Configuration configuration,
        boolean exitJvmOnOutOfMemory,
        FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
        String[] alwaysParentFirstLoaderPatterns,
        @Nullable String taskManagerLogPath,
        @Nullable String taskManagerStdoutPath) {

        this.numberSlots = numberSlots;
        this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
        this.timeout = Preconditions.checkNotNull(timeout);
        this.maxRegistrationDuration = maxRegistrationDuration;
        this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
        this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
        this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
        this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
        this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
        this.classLoaderResolveOrder = classLoaderResolveOrder;
        this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;
        this.taskManagerLogPath = taskManagerLogPath;
        this.taskManagerStdoutPath = taskManagerStdoutPath;
    }

    public int getNumberSlots() {
        return numberSlots;
    }

    public Time getTimeout() {
        return timeout;
    }

    @Nullable
    public Time getMaxRegistrationDuration() {
        return maxRegistrationDuration;
    }

    public Time getInitialRegistrationPause() {
        return initialRegistrationPause;
    }

    @Nullable
    public Time getMaxRegistrationPause() {
        return maxRegistrationPause;
    }

    public Time getRefusedRegistrationPause() {
        return refusedRegistrationPause;
    }

    @Override
    public Configuration getConfiguration() {
        return configuration;
    }

    @Override
    public String[] getTmpDirectories() {
        return tmpDirectories;
    }

    @Override
    public boolean shouldExitJvmOnOutOfMemoryError() {
        return exitJvmOnOutOfMemory;
    }

    public FlinkUserCodeClassLoaders.ResolveOrder getClassLoaderResolveOrder() {
        return classLoaderResolveOrder;
    }

    public String[] getAlwaysParentFirstLoaderPatterns() {
        return alwaysParentFirstLoaderPatterns;
    }

    @Nullable
    public String getTaskManagerLogPath() {
        return taskManagerLogPath;
    }

    @Nullable
    public String getTaskManagerStdoutPath() {
        return taskManagerStdoutPath;
    }

    // --------------------------------------------------------------------------------------------
    //  Static factory methods
    // --------------------------------------------------------------------------------------------

    public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
        int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);

        if (numberSlots == -1) {
            numberSlots = 1;
        }

        final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration);

        final Time timeout;

        try {
            timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
        } catch (Exception e) {
            throw new IllegalArgumentException(
                "Invalid format for '" + AkkaOptions.ASK_TIMEOUT.key() +
                    "'.Use formats like '50 s' or '1 min' to specify the timeout.");
        }

        LOG.info("Messages have a max timeout of " + timeout);

        final Time finiteRegistrationDuration;

        try {
            Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));
            if (maxRegistrationDuration.isFinite()) {
                finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
            } else {
                finiteRegistrationDuration = null;
            }
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Invalid format for parameter " +
                TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e);
        }

        final Time initialRegistrationPause;
        try {
            Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));
            if (pause.isFinite()) {
                initialRegistrationPause = Time.milliseconds(pause.toMillis());
            } else {
                throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
            }
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Invalid format for parameter " +
                TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
        }

        final Time maxRegistrationPause;
        try {
            Duration pause = Duration.create(configuration.getString(
                TaskManagerOptions.REGISTRATION_MAX_BACKOFF));
            if (pause.isFinite()) {
                maxRegistrationPause = Time.milliseconds(pause.toMillis());
            } else {
                throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
            }
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Invalid format for parameter " +
                TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
        }

        final Time refusedRegistrationPause;
        try {
            Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));
            if (pause.isFinite()) {
                refusedRegistrationPause = Time.milliseconds(pause.toMillis());
            } else {
                throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
            }
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Invalid format for parameter " +
                TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
        }

        final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);

        final String classLoaderResolveOrder =
            configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);

        final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(configuration);

        final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));
        final String taskManagerStdoutPath;

        if (taskManagerLogPath != null) {
            final int extension = taskManagerLogPath.lastIndexOf('.');

            if (extension > 0) {
                taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out";
            } else {
                taskManagerStdoutPath = null;
            }
        } else {
            taskManagerStdoutPath = null;
        }

        return new TaskManagerConfiguration(
            numberSlots,
            tmpDirPaths,
            timeout,
            finiteRegistrationDuration,
            initialRegistrationPause,
            maxRegistrationPause,
            refusedRegistrationPause,
            configuration,
            exitOnOom,
            FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
            alwaysParentFirstLoaderPatterns,
            taskManagerLogPath,
            taskManagerStdoutPath);
    }
}
  • The static method of TaskManagerConfiguration fromConfiguration reads exitOnOom through Configuration. GetBoolean (TaskmanagerOptions. Kill _ on _ out _ of _ memory) and then passes it to the exitJvmOnOutOfMemory property in the constructor. At the same time, the method of shouldexitjvmonoutofmemoryerror is provided to read the exitjvmonoutomemoryattribute.

Task

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    @Override
    public void run() {

        // ----------------------------
        //  Initial State transition
        // ----------------------------
        //......

        // all resource acquisitions and registrations from here on
        // need to be undone in the end
        Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
        AbstractInvokable invokable = null;

        try {
            //......

            // ----------------------------------------------------------------
            //  call the user code initialization methods
            // ----------------------------------------------------------------

            TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());

            Environment env = new RuntimeEnvironment(
                jobId,
                vertexId,
                executionId,
                executionConfig,
                taskInfo,
                jobConfiguration,
                taskConfiguration,
                userCodeClassLoader,
                memoryManager,
                ioManager,
                broadcastVariableManager,
                taskStateManager,
                accumulatorRegistry,
                kvStateRegistry,
                inputSplitProvider,
                distributedCacheEntries,
                producedPartitions,
                inputGates,
                network.getTaskEventDispatcher(),
                checkpointResponder,
                taskManagerConfig,
                metrics,
                this);

            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            // make sure, we enter the catch block if the task leaves the invoke() method due
            // to the fact that it has been canceled
            if (isCanceledOrFailed()) {
                throw new CancelTaskException();
            }

            // ----------------------------------------------------------------
            //  finalization of a successful execution
            // ----------------------------------------------------------------

            // finish the produced partitions. if this fails, we consider the execution failed.
            for (ResultPartition partition : producedPartitions) {
                if (partition != null) {
                    partition.finish();
                }
            }

            // try to mark the task as finished
            // if that fails, the task was canceled/failed in the meantime
            if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
                throw new CancelTaskException();
            }
        }
        catch (Throwable t) {

            // unwrap wrapped exceptions to make stack traces more compact
            if (t instanceof WrappingRuntimeException) {
                t = ((WrappingRuntimeException) t).unwrap();
            }

            // ----------------------------------------------------------------
            // the execution failed. either the invokable code properly failed, or
            // an exception was thrown as a side effect of cancelling
            // ----------------------------------------------------------------

            try {
                // check if the exception is unrecoverable
                if (ExceptionUtils.isJvmFatalError(t) ||
                        (t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) {

                    // terminate the JVM immediately
                    // don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete
                    try {
                        LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t);
                    } finally {
                        Runtime.getRuntime().halt(-1);
                    }
                }

                // transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
                // loop for multiple retries during concurrent state changes via calls to cancel() or
                // to failExternally()
                while (true) {
                    ExecutionState current = this.executionState;

                    if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
                        if (t instanceof CancelTaskException) {
                            if (transitionState(current, ExecutionState.CANCELED)) {
                                cancelInvokable(invokable);
                                break;
                            }
                        }
                        else {
                            if (transitionState(current, ExecutionState.FAILED, t)) {
                                // proper failure of the task. record the exception as the root cause
                                failureCause = t;
                                cancelInvokable(invokable);

                                break;
                            }
                        }
                    }
                    else if (current == ExecutionState.CANCELING) {
                        if (transitionState(current, ExecutionState.CANCELED)) {
                            break;
                        }
                    }
                    else if (current == ExecutionState.FAILED) {
                        // in state failed already, no transition necessary any more
                        break;
                    }
                    // unexpected state, go to failed
                    else if (transitionState(current, ExecutionState.FAILED, t)) {
                        LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current);
                        break;
                    }
                    // else fall through the loop and
                }
            }
            catch (Throwable tt) {
                String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId);
                LOG.error(message, tt);
                notifyFatalError(message, tt);
            }
        }
        finally {
            //......
        }
    }

    //......
}
  • Task implements the Runnable interface, and its run method makes try catch on invokable.invoke (), and judges when catching. If it is ExceptionUtils. Isjvmdatalerror (t) or (tInstance of Out of Memorylerror & & TaskmanagerConfig. ShouldexITJVOutOfMemorylerror ()), Runtime.getRuntime().halt(-1) is called to stop the JVM

ExceptionUtils.isJvmFatalError

flink-1.7.2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java

@Internal
public final class ExceptionUtils {
    //......

    /**
     * Checks whether the given exception indicates a situation that may leave the
     * JVM in a corrupted state, meaning a state where continued normal operation can only be
     * guaranteed via clean process restart.
     *
     * <p>Currently considered fatal exceptions are Virtual Machine errors indicating
     * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError},
     * and {@link java.util.zip.ZipError} (a special case of InternalError).
     * The {@link ThreadDeath} exception is also treated as a fatal error, because when
     * a thread is forcefully stopped, there is a high chance that parts of the system
     * are in an inconsistent state.
     *
     * @param t The exception to check.
     * @return True, if the exception is considered fatal to the JVM, false otherwise.
     */
    public static boolean isJvmFatalError(Throwable t) {
        return (t instanceof InternalError) || (t instanceof UnknownError) || (t instanceof ThreadDeath);
    }

    //......
}
  • The isJvmFatalError method determines whether Throwable is InternalError or UnknownError or ThreadDeath, and returns true if it is

Runtime.getRuntime().halt

java.base/java/lang/Runtime.java

public class Runtime {
    //......

    private static final Runtime currentRuntime = new Runtime();

    /**
     * Returns the runtime object associated with the current Java application.
     * Most of the methods of class {@code Runtime} are instance
     * methods and must be invoked with respect to the current runtime object.
     *
     * @return  the {@code Runtime} object associated with the current
     *          Java application.
     */
    public static Runtime getRuntime() {
        return currentRuntime;
    }

    /**
     * Forcibly terminates the currently running Java virtual machine.  This
     * method never returns normally.
     *
     * <p> This method should be used with extreme caution.  Unlike the
     * {@link #exit exit} method, this method does not cause shutdown
     * hooks to be started.  If the shutdown sequence has already been
     * initiated then this method does not wait for any running
     * shutdown hooks to finish their work.
     *
     * @param  status
     *         Termination status. By convention, a nonzero status code
     *         indicates abnormal termination. If the {@link Runtime#exit exit}
     *         (equivalently, {@link System#exit(int) System.exit}) method
     *         has already been invoked then this status code
     *         will override the status code passed to that method.
     *
     * @throws SecurityException
     *         If a security manager is present and its
     *         {@link SecurityManager#checkExit checkExit} method
     *         does not permit an exit with the specified status
     *
     * @see #exit
     * @see #addShutdownHook
     * @see #removeShutdownHook
     * @since 1.3
     */
    public void halt(int status) {
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            sm.checkExit(status);
        }
        Shutdown.beforeHalt();
        Shutdown.halt(status);
    }

    //......
}
  • Check exit when SecurityManager is not null; Then call Shutdown.beforeHalt () and Shutdown.halt(status) to stop the JVM.

Summary

  • Jvm-exit-on-oom configuration defaults to false, which specifies whether the TaskManager needs to be kill when the taskthread throws OutOfMemoryError.
  • The static method of TaskManagerConfiguration fromConfiguration reads exitOnOom through Configuration. GetBoolean (TaskmanagerOptions. Kill _ on _ out _ of _ memory) and then passes it to the exitJvmOnOutOfMemory property in the constructor. At the same time, the method of shouldexitjvmonoutofmemoryerror is provided to read the exitjvmonoutomemoryattribute.
  • Task implements the Runnable interface, and its run method makes try catch on invokable.invoke (), and judges when catching. If it is EXCEPTION UTILS. ISJVM; FATALERROR (T) or (T Instance of Out of Memorylerror & & TaskmanagerConfig. ShouldexITJVMONOUTOF Memorylerror ()), Runtime.getRuntime().halt(-1) will be called to stop JVM; IsJvmFatalError method determines whether Throwable is InternalError or UnknownError or ThreadDeath, and returns true; if yes; Check exit when SecurityManager is not null; Then call Shutdown.beforeHalt () and Shutdown.halt(status) to stop the JVM.

doc