Talk about flink StreamOperator’s initializeState method

  flink

Order

This article mainly studies the initializeState method of flink StreamOperator.

Task.run

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {

    public void run() {

        // ----------------------------
        //  Initial State transition
        // ----------------------------
        while (true) {
            ExecutionState current = this.executionState;
            if (current == ExecutionState.CREATED) {
                if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
                    // success, we can start our work
                    break;
                }
            }
            else if (current == ExecutionState.FAILED) {
                // we were immediately failed. tell the TaskManager that we reached our final state
                notifyFinalState();
                if (metrics != null) {
                    metrics.close();
                }
                return;
            }
            else if (current == ExecutionState.CANCELING) {
                if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
                    // we were immediately canceled. tell the TaskManager that we reached our final state
                    notifyFinalState();
                    if (metrics != null) {
                        metrics.close();
                    }
                    return;
                }
            }
            else {
                if (metrics != null) {
                    metrics.close();
                }
                throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
            }
        }

        // all resource acquisitions and registrations from here on
        // need to be undone in the end
        Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
        AbstractInvokable invokable = null;

        try {
            // ----------------------------
            //  Task Bootstrap - We periodically
            //  check for canceling as a shortcut
            // ----------------------------

            //......

            // ----------------------------------------------------------------
            //  call the user code initialization methods
            // ----------------------------------------------------------------

            TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());

            Environment env = new RuntimeEnvironment(
                jobId,
                vertexId,
                executionId,
                executionConfig,
                taskInfo,
                jobConfiguration,
                taskConfiguration,
                userCodeClassLoader,
                memoryManager,
                ioManager,
                broadcastVariableManager,
                taskStateManager,
                accumulatorRegistry,
                kvStateRegistry,
                inputSplitProvider,
                distributedCacheEntries,
                producedPartitions,
                inputGates,
                network.getTaskEventDispatcher(),
                checkpointResponder,
                taskManagerConfig,
                metrics,
                this);

            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            // make sure, we enter the catch block if the task leaves the invoke() method due
            // to the fact that it has been canceled
            if (isCanceledOrFailed()) {
                throw new CancelTaskException();
            }

            // ----------------------------------------------------------------
            //  finalization of a successful execution
            // ----------------------------------------------------------------

            // finish the produced partitions. if this fails, we consider the execution failed.
            for (ResultPartition partition : producedPartitions) {
                if (partition != null) {
                    partition.finish();
                }
            }

            // try to mark the task as finished
            // if that fails, the task was canceled/failed in the meantime
            if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
                throw new CancelTaskException();
            }
        }
        catch (Throwable t) {
            //......
        }
        finally {
            //......
        }
    }
    
    //......
}
  • The run method of a Task calls invokable.invokable (), where invoke is StreamTask.

StreamTask.invoke

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/tasks/StreamTask.java

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

    @Override
    public final void invoke() throws Exception {

        boolean disposed = false;
        try {
            // -------- Initialize ---------
            LOG.debug("Initializing {}.", getName());

            asyncOperationsThreadPool = Executors.newCachedThreadPool();

            CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();

            synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
                getExecutionConfig().isFailTaskOnCheckpointError(),
                getEnvironment());

            asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);

            stateBackend = createStateBackend();
            checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());

            // if the clock is not already set, then assign a default TimeServiceProvider
            if (timerService == null) {
                ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
                    "Time Trigger for " + getName(), getUserCodeClassLoader());

                timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
            }

            operatorChain = new OperatorChain<>(this, streamRecordWriters);
            headOperator = operatorChain.getHeadOperator();

            // task specific initialization
            init();

            // save the work of reloading state, etc, if the task is already canceled
            if (canceled) {
                throw new CancelTaskException();
            }

            // -------- Invoke --------
            LOG.debug("Invoking {}", getName());

            // we need to make sure that any triggers scheduled in open() cannot be
            // executed before all operators are opened
            synchronized (lock) {

                // both the following operations are protected by the lock
                // so that we avoid race conditions in the case that initializeState()
                // registers a timer, that fires before the open() is called.

                initializeState();
                openAllOperators();
            }

            // final check to exit early before starting to run
            if (canceled) {
                throw new CancelTaskException();
            }

            // let the task do its work
            isRunning = true;
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            LOG.debug("Finished task {}", getName());

            // make sure no further checkpoint and notification actions happen.
            // we make sure that no other thread is currently in the locked scope before
            // we close the operators by trying to acquire the checkpoint scope lock
            // we also need to make sure that no triggers fire concurrently with the close logic
            // at the same time, this makes sure that during any "regular" exit where still
            synchronized (lock) {
                // this is part of the main logic, so if this fails, the task is considered failed
                closeAllOperators();

                // make sure no new timers can come
                timerService.quiesce();

                // only set the StreamTask to not running after all operators have been closed!
                // See FLINK-7430
                isRunning = false;
            }

            // make sure all timers finish
            timerService.awaitPendingAfterQuiesce();

            LOG.debug("Closed operators for task {}", getName());

            // make sure all buffered data is flushed
            operatorChain.flushOutputs();

            // make an attempt to dispose the operators such that failures in the dispose call
            // still let the computation fail
            tryDisposeAllOperators();
            disposed = true;
        }
        finally {
            //......
        }
    }

    private void initializeState() throws Exception {

        StreamOperator<?>[] allOperators = operatorChain.getAllOperators();

        for (StreamOperator<?> operator : allOperators) {
            if (null != operator) {
                operator.initializeState();
            }
        }
    }

    //......
}
  • The invoke method of StreamTask calls the initializeState method, which traverses allOperators (StreamOperator), calling its initializeState method; For example, the operator here is StreamSource.

StreamOperator.initializeState

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/StreamOperator.java

@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {
    /**
     * Provides a context to initialize all state in the operator.
     */
    void initializeState() throws Exception;

    //......
}
  • The StreamOperator interface defines the initializeState method used to initialize the operator’s state.

StreamSource.initializeState

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/StreamSource.java

@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
        extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {

        //......
}
  • StreamSource inherits the AbstractStreamOperator, which does not override the Initialization State, and the AbstractStreamOperator does not override the Initialization State method, so it executes the Initialization State of the parent class of the AbstractStreamOperator.

AbstractStreamOperator.initializeState

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>, Serializable {

    @Override
    public final void initializeState() throws Exception {

        final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());

        final StreamTask<?, ?> containingTask =
            Preconditions.checkNotNull(getContainingTask());
        final CloseableRegistry streamTaskCloseableRegistry =
            Preconditions.checkNotNull(containingTask.getCancelables());
        final StreamTaskStateInitializer streamTaskStateManager =
            Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());

        final StreamOperatorStateContext context =
            streamTaskStateManager.streamOperatorStateContext(
                getOperatorID(),
                getClass().getSimpleName(),
                this,
                keySerializer,
                streamTaskCloseableRegistry,
                metrics);

        this.operatorStateBackend = context.operatorStateBackend();
        this.keyedStateBackend = context.keyedStateBackend();

        if (keyedStateBackend != null) {
            this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
        }

        timeServiceManager = context.internalTimerServiceManager();

        CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
        CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();

        try {
            StateInitializationContext initializationContext = new StateInitializationContextImpl(
                context.isRestored(), // information whether we restore or start for the first time
                operatorStateBackend, // access to operator state backend
                keyedStateStore, // access to keyed state backend
                keyedStateInputs, // access to keyed state stream
                operatorStateInputs); // access to operator state stream

            initializeState(initializationContext);
        } finally {
            closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
            closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
        }
    }

    /**
     * Stream operators with state which can be restored need to override this hook method.
     *
     * @param context context that allows to register different states.
     */
    public void initializeState(StateInitializationContext context) throws Exception {

    }

    //......
}
  • AbstractStreamOperator implements the Initialization State method defined by the StreamOperator interface. The method calls the Initialization State (I nitialization Context) method, which is overridden by its subclass AbstractStreamOperator.

AbstractUdfStreamOperator.initializeState(initializationContext)

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
        extends AbstractStreamOperator<OUT>
        implements OutputTypeConfigurable<OUT> {

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        StreamingFunctionUtils.restoreFunctionState(context, userFunction);
    }
    
    //......
}
  • The initializestate (initializecontext) method calls streamingfunctionalutils.restorefunctionalstate here.

StreamingFunctionUtils.restoreFunctionState

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java

    public static void restoreFunctionState(
            StateInitializationContext context,
            Function userFunction) throws Exception {

        Preconditions.checkNotNull(context);

        while (true) {

            if (tryRestoreFunction(context, userFunction)) {
                break;
            }

            // inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function
            if (userFunction instanceof WrappingFunction) {
                userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
            } else {
                break;
            }
        }
    }

    private static boolean tryRestoreFunction(
            StateInitializationContext context,
            Function userFunction) throws Exception {

        if (userFunction instanceof CheckpointedFunction) {
            ((CheckpointedFunction) userFunction).initializeState(context);

            return true;
        }

        if (context.isRestored() && userFunction instanceof ListCheckpointed) {
            @SuppressWarnings("unchecked")
            ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;

            ListState<Serializable> listState = context.getOperatorStateStore().
                    getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

            List<Serializable> list = new ArrayList<>();

            for (Serializable serializable : listState.get()) {
                list.add(serializable);
            }

            try {
                listCheckpointedFun.restoreState(list);
            } catch (Exception e) {

                throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
            }

            return true;
        }

        return false;
    }
  • RestoreFunctionState mainly calls tryRestoreFunction method, which will judge that if userFunction implements CheckpointedFunction interface, it calls its initializeState method. If userFunction implements ListCheckpointed interface, and context.isRestored () is true, then ListState will be obtained from OperatorStateStore, the value in it will be converted to List, and the listcheckpointpointedt.restored method will be called.

Summary

  • The run method of a Task triggers invokable.invoke (), where invoke is StreamTask. the invoke method of StreamTask calls the initializeState method, which traverses allOperators (StreamOperator), calling its initializeState method; For example, the operator here is StreamSource, which inherits the AbstractUdfStreamOperator.
  • The Streamoperator interface defines the initializestate method to initialize the Operator’s State. Its abstract subclass AbstractStreamOperator implem ents the initializeState method, but it calls the InitializeState (InitializationContext) method internally, and its subclass AbstractStreamOperator overwrites the method.
  • The initializestate (initializecontext) method of AbstractUdfStreamOperator calls streamingfunctionalutils.restorefunctionalstate, which determines that, If the userFunction implements the CheckpointedFunction interface, it calls its initializeState method. if the userFunction implements the listcheckpointfunction interface and is context.isRestored () is true. Then you will get ListState from OperatorStateStore, convert the values in it to List, and call the ListCheckpointed.restoreState method.

doc