Talk about flink’s CheckpointScheduler

  flink

Order

This article mainly studies flink’s CheckpointScheduler

CheckpointCoordinatorDeActivator

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java

/**
 * This actor listens to changes in the JobStatus and activates or deactivates the periodic
 * checkpoint scheduler.
 */
public class CheckpointCoordinatorDeActivator implements JobStatusListener {

    private final CheckpointCoordinator coordinator;

    public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
        this.coordinator = checkNotNull(coordinator);
    }

    @Override
    public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
        if (newJobStatus == JobStatus.RUNNING) {
            // start the checkpoint scheduler
            coordinator.startCheckpointScheduler();
        } else {
            // anything else should stop the trigger for now
            coordinator.stopCheckpointScheduler();
        }
    }
}
  • CheckPoint CoordinatorOrderActivity implements the JobStatusListener interface. When jobStatusChanges, the coordinator. StartCheckPoint CoordinatorScheduler or coordinator. StopcheckpointScheduler is called according to the state.

CheckpointCoordinator.ScheduledTrigger

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

/**
 * The checkpoint coordinator coordinates the distributed snapshots of operators and state.
 * It triggers the checkpoint by sending the messages to the relevant tasks and collects the
 * checkpoint acknowledgements. It also collects and maintains the overview of the state handles
 * reported by the tasks that acknowledge the checkpoint.
 */
public class CheckpointCoordinator {

    /** Map from checkpoint ID to the pending checkpoint */
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;

    /** The number of consecutive failed trigger attempts */
    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);

    //......

    public void startCheckpointScheduler() {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            // make sure all prior timers are cancelled
            stopCheckpointScheduler();

            periodicScheduling = true;
            long initialDelay = ThreadLocalRandom.current().nextLong(
                minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
            currentPeriodicTrigger = timer.scheduleAtFixedRate(
                    new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
        }
    }

    public void stopCheckpointScheduler() {
        synchronized (lock) {
            triggerRequestQueued = false;
            periodicScheduling = false;

            if (currentPeriodicTrigger != null) {
                currentPeriodicTrigger.cancel(false);
                currentPeriodicTrigger = null;
            }

            for (PendingCheckpoint p : pendingCheckpoints.values()) {
                p.abortError(new Exception("Checkpoint Coordinator is suspending."));
            }

            pendingCheckpoints.clear();
            numUnsuccessfulCheckpointsTriggers.set(0);
        }
    }

    private final class ScheduledTrigger implements Runnable {

        @Override
        public void run() {
            try {
                triggerCheckpoint(System.currentTimeMillis(), true);
            }
            catch (Exception e) {
                LOG.error("Exception while triggering checkpoint for job {}.", job, e);
            }
        }
    }

    //......
}
  • The CheckpointCoordinator’s startCheckpointScheduler method first calls stopCheckpointScheduler to cancel the PendingCheckpoint, and then uses Timer. ScheduleLettIxedra to reschedule the ScheduledTrigger.
  • StopCheckpointScheduler calls PendingCheckpoint.abortError to cancel pendingCheckpoints and then empties pendingCheckpoints (Map<Long, PendingCheckpoint>) and numunsuccessfully checkpointtriggers (AtomicInteger)
  • ScheduledTrigger implements runnable interface. Its Run method mainly calls triggerCheckpoint, and the passed isPeriodic parameter is true

CheckpointCoordinator.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

/**
 * The checkpoint coordinator coordinates the distributed snapshots of operators and state.
 * It triggers the checkpoint by sending the messages to the relevant tasks and collects the
 * checkpoint acknowledgements. It also collects and maintains the overview of the state handles
 * reported by the tasks that acknowledge the checkpoint.
 */
public class CheckpointCoordinator {

    /** Tasks who need to be sent a message when a checkpoint is started */
    private final ExecutionVertex[] tasksToTrigger;

    /** Tasks who need to acknowledge a checkpoint before it succeeds */
    private final ExecutionVertex[] tasksToWaitFor;

    /** Map from checkpoint ID to the pending checkpoint */
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;

    /** The maximum number of checkpoints that may be in progress at the same time */
    private final int maxConcurrentCheckpointAttempts;

    /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
     * enforce minimum processing time between checkpoint attempts */
    private final long minPauseBetweenCheckpointsNanos;

    /**
     * Triggers a new standard checkpoint and uses the given timestamp as the checkpoint
     * timestamp.
     *
     * @param timestamp The timestamp for the checkpoint.
     * @param isPeriodic Flag indicating whether this triggered checkpoint is
     * periodic. If this flag is true, but the periodic scheduler is disabled,
     * the checkpoint will be declined.
     * @return <code>true</code> if triggering the checkpoint succeeded.
     */
    public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
        return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess();
    }

    @VisibleForTesting
    public CheckpointTriggerResult triggerCheckpoint(
            long timestamp,
            CheckpointProperties props,
            @Nullable String externalSavepointLocation,
            boolean isPeriodic) {

        // make some eager pre-checks
        synchronized (lock) {
            // abort if the coordinator has been shutdown in the meantime
            if (shutdown) {
                return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
            }

            // Don't allow periodic checkpoint if scheduling has been disabled
            if (isPeriodic && !periodicScheduling) {
                return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
            }

            // validate whether the checkpoint can be triggered, with respect to the limit of
            // concurrent checkpoints, and the minimum time between checkpoints.
            // these checks are not relevant for savepoints
            if (!props.forceCheckpoint()) {
                // sanity check: there should never be more than one trigger request queued
                if (triggerRequestQueued) {
                    LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
                    return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
                }

                // if too many checkpoints are currently in progress, we need to mark that a request is queued
                if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
                    triggerRequestQueued = true;
                    if (currentPeriodicTrigger != null) {
                        currentPeriodicTrigger.cancel(false);
                        currentPeriodicTrigger = null;
                    }
                    return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                }

                // make sure the minimum interval between checkpoints has passed
                final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
                final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;

                if (durationTillNextMillis > 0) {
                    if (currentPeriodicTrigger != null) {
                        currentPeriodicTrigger.cancel(false);
                        currentPeriodicTrigger = null;
                    }
                    // Reassign the new trigger to the currentPeriodicTrigger
                    currentPeriodicTrigger = timer.scheduleAtFixedRate(
                            new ScheduledTrigger(),
                            durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);

                    return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                }
            }
        }

        // check if all tasks that we need to trigger are running.
        // if not, abort the checkpoint
        Execution[] executions = new Execution[tasksToTrigger.length];
        for (int i = 0; i < tasksToTrigger.length; i++) {
            Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
            if (ee == null) {
                LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                        tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                        job);
                return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            } else if (ee.getState() == ExecutionState.RUNNING) {
                executions[i] = ee;
            } else {
                LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
                        tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                        job,
                        ExecutionState.RUNNING,
                        ee.getState());
                return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            }
        }

        // next, check if all tasks that need to acknowledge the checkpoint are running.
        // if not, abort the checkpoint
        Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);

        for (ExecutionVertex ev : tasksToWaitFor) {
            Execution ee = ev.getCurrentExecutionAttempt();
            if (ee != null) {
                ackTasks.put(ee.getAttemptId(), ev);
            } else {
                LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                        ev.getTaskNameWithSubtaskIndex(),
                        job);
                return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            }
        }

        // we will actually trigger this checkpoint!

        // we lock with a special lock to make sure that trigger requests do not overtake each other.
        // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
        // may issue blocking operations. Using a different lock than the coordinator-wide lock,
        // we avoid blocking the processing of 'acknowledge/decline' messages during that time.
        synchronized (triggerLock) {

            final CheckpointStorageLocation checkpointStorageLocation;
            final long checkpointID;

            try {
                // this must happen outside the coordinator-wide lock, because it communicates
                // with external services (in HA mode) and may block for a while.
                checkpointID = checkpointIdCounter.getAndIncrement();

                checkpointStorageLocation = props.isSavepoint() ?
                        checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
                        checkpointStorage.initializeLocationForCheckpoint(checkpointID);
            }
            catch (Throwable t) {
                int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
                LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
                        job,
                        numUnsuccessful,
                        t);
                return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
            }

            final PendingCheckpoint checkpoint = new PendingCheckpoint(
                job,
                checkpointID,
                timestamp,
                ackTasks,
                props,
                checkpointStorageLocation,
                executor);

            if (statsTracker != null) {
                PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
                    checkpointID,
                    timestamp,
                    props);

                checkpoint.setStatsCallback(callback);
            }

            // schedule the timer that will clean up the expired checkpoints
            final Runnable canceller = () -> {
                synchronized (lock) {
                    // only do the work if the checkpoint is not discarded anyways
                    // note that checkpoint completion discards the pending checkpoint object
                    if (!checkpoint.isDiscarded()) {
                        LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);

                        checkpoint.abortExpired();
                        pendingCheckpoints.remove(checkpointID);
                        rememberRecentCheckpointId(checkpointID);

                        triggerQueuedRequests();
                    }
                }
            };

            try {
                // re-acquire the coordinator-wide lock
                synchronized (lock) {
                    // since we released the lock in the meantime, we need to re-check
                    // that the conditions still hold.
                    if (shutdown) {
                        return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
                    }
                    else if (!props.forceCheckpoint()) {
                        if (triggerRequestQueued) {
                            LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
                            return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
                        }

                        if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
                            triggerRequestQueued = true;
                            if (currentPeriodicTrigger != null) {
                                currentPeriodicTrigger.cancel(false);
                                currentPeriodicTrigger = null;
                            }
                            return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                        }

                        // make sure the minimum interval between checkpoints has passed
                        final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
                        final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;

                        if (durationTillNextMillis > 0) {
                            if (currentPeriodicTrigger != null) {
                                currentPeriodicTrigger.cancel(false);
                                currentPeriodicTrigger = null;
                            }

                            // Reassign the new trigger to the currentPeriodicTrigger
                            currentPeriodicTrigger = timer.scheduleAtFixedRate(
                                    new ScheduledTrigger(),
                                    durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);

                            return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                        }
                    }

                    LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);

                    pendingCheckpoints.put(checkpointID, checkpoint);

                    ScheduledFuture<?> cancellerHandle = timer.schedule(
                            canceller,
                            checkpointTimeout, TimeUnit.MILLISECONDS);

                    if (!checkpoint.setCancellerHandle(cancellerHandle)) {
                        // checkpoint is already disposed!
                        cancellerHandle.cancel(false);
                    }

                    // trigger the master hooks for the checkpoint
                    final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
                            checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
                    for (MasterState s : masterStates) {
                        checkpoint.addMasterState(s);
                    }
                }
                // end of lock scope

                final CheckpointOptions checkpointOptions = new CheckpointOptions(
                        props.getCheckpointType(),
                        checkpointStorageLocation.getLocationReference());

                // send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
                    execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                }

                numUnsuccessfulCheckpointsTriggers.set(0);
                return new CheckpointTriggerResult(checkpoint);
            }
            catch (Throwable t) {
                // guard the map against concurrent modifications
                synchronized (lock) {
                    pendingCheckpoints.remove(checkpointID);
                }

                int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
                LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
                        checkpointID, job, numUnsuccessful, t);

                if (!checkpoint.isDiscarded()) {
                    checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
                }

                try {
                    checkpointStorageLocation.disposeOnFailure();
                }
                catch (Throwable t2) {
                    LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
                }

                return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
            }

        } // end trigger lock
    }

    //......
}
  • Firstly, if it is not forceCheckpoint, it is determined whether the current pendingCheckpoints value exceeds MAXCONCURRENTCHECKPOINTETEMPTS. If it exceeds MAXCONCURRENTCHECKPOINTETEMPTS, it immediately fail fast and returns CHECKPOINT TRIGGERRESULT (CHECKPOINT DECLINEARATION. TOO _ MANY _ CONCURRENT _ CHECKPOINTS); After that, it is determined whether the time from lastCheckpointCompletionNanos is greater than or equal to MinpauseBETWEEN checkpoints, otherwise fail fast returns CheckPointTriggerResult (CheckPointDeclineReason. Minimum _ Time _ Between _ CheckPoints) to ensure that CheckPoint is not frequently triggered.
  • Then check tasksToTrigger’s task (Task to be notified when checkpoint is triggered) are all in RUNNING state, if not, fail fast immediately and return checkpointtriggerresult (checkpointdeclinereason.not _ all _ required _ tasks _ running)
  • Then check tasksToWaitFor’s task (The task that requires ack checkpoint when the execution is successful.) are all in the RUNNING state, if not, immediately fail fast and return checkpointtriggerresult (checkpointdeclinereason.not _ all _ required _ tasks _ running)
  • The triggering of the real checkpoint will not start until the previous checks have passed. It first assigns a checkpointID, then initializes checkpointStorageLocation, and returns CheckpointTriggerResult (CheckpointDeclineReason. Exception) if the exception occurs; The PendingCheckpoint is then created while canceller (For performing abort operation in case of failure); After that, another round of TOO _ MANY _ CONCURRENT _ CHECKPOINTS, MINIMUM _ TIME _ BETWEEN _ CHECKPOINTS will be performed for those that are not forceCheckpoint.
  • Finally, for Execution, triggerCheckpoint operations that trigger execution one by one successfully return checkpointtrigger (check point), and exceptions return checkpointtrigger (checkpointdeclinereason.exception)

Execution.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/executiongraph/Execution.java

public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {

    /**
     * Trigger a new checkpoint on the task of this execution.
     *
     * @param checkpointId of th checkpoint to trigger
     * @param timestamp of the checkpoint to trigger
     * @param checkpointOptions of the checkpoint to trigger
     */
    public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        final LogicalSlot slot = assignedResource;

        if (slot != null) {
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
                "no longer running.");
        }
    }

    //......
}
  • TriggerCheckpoint mainly calls taskManagerGateway. triggercheckpoint, where taskmanagergateway is RpcTaskManagerGateway.

RpcTaskManagerGateway

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java

/**
 * Implementation of the {@link TaskManagerGateway} for Flink's RPC system.
 */
public class RpcTaskManagerGateway implements TaskManagerGateway {

    private final TaskExecutorGateway taskExecutorGateway;

    public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        taskExecutorGateway.triggerCheckpoint(
            executionAttemptID,
            checkpointId,
            timestamp,
            checkpointOptions);
    }

    //......
}
  • The triggerCheckpoint method of RpcTaskManagerGateway calls taskExecutorGateway.triggercheckpoint, where TaskExecutorganateway is AkkaInvocationHandler, notifies taskexecutor via rpc.

TaskExecutor.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/taskexecutor/TaskExecutor.java

/**
 * TaskExecutor implementation. The task executor is responsible for the execution of multiple
 * {@link Task}.
 */
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {

    public CompletableFuture<Acknowledge> triggerCheckpoint(
            ExecutionAttemptID executionAttemptID,
            long checkpointId,
            long checkpointTimestamp,
            CheckpointOptions checkpointOptions) {
        log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

        final Task task = taskSlotTable.getTask(executionAttemptID);

        if (task != null) {
            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';

            log.debug(message);
            return FutureUtils.completedExceptionally(new CheckpointException(message));
        }
    }

    //......
}
  • The triggerCheckpoint method of TaskExecutor calls taskhere.

Task.triggerCheckpointBarrier

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {

    /** The invokable of this task, if initialized. All accesses must copy the reference and
     * check for null, as this field is cleared as part of the disposal logic. */
    @Nullable
    private volatile AbstractInvokable invokable;

    /**
     * Calls the invokable to trigger a checkpoint.
     *
     * @param checkpointID The ID identifying the checkpoint.
     * @param checkpointTimestamp The timestamp associated with the checkpoint.
     * @param checkpointOptions Options for performing this checkpoint.
     */
    public void triggerCheckpointBarrier(
            final long checkpointID,
            long checkpointTimestamp,
            final CheckpointOptions checkpointOptions) {

        final AbstractInvokable invokable = this.invokable;
        final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

        if (executionState == ExecutionState.RUNNING && invokable != null) {

            // build a local closure
            final String taskName = taskNameWithSubtask;
            final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
                FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();

            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    // set safety net from the task's context for checkpointing thread
                    LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                    FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);

                    try {
                        boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                        if (!success) {
                            checkpointResponder.declineCheckpoint(
                                    getJobID(), getExecutionId(), checkpointID,
                                    new CheckpointDeclineTaskNotReadyException(taskName));
                        }
                    }
                    catch (Throwable t) {
                        if (getExecutionState() == ExecutionState.RUNNING) {
                            failExternally(new Exception(
                                "Error while triggering checkpoint " + checkpointID + " for " +
                                    taskNameWithSubtask, t));
                        } else {
                            LOG.debug("Encountered error while triggering checkpoint {} for " +
                                "{} ({}) while being not in state running.", checkpointID,
                                taskNameWithSubtask, executionId, t);
                        }
                    } finally {
                        FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
                    }
                }
            };
            executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
        }
        else {
            LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);

            // send back a message that we did not do the checkpoint
            checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                    new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
        }
    }

    //......
}
  • Task’s triggerCheckpointBarrier method first determines whether executionState is RUNNING and invokable is not null, and executes checkpointresponse.declinecheckpoint if the condition is not met.
  • If the condition is met, execute executeasynccallrunnable (runnable, string.format (“checkpointtrigger for% s (%s)”, tasknamewithsubtask, executionid))
  • Trigger checkpoint (checkpointmetadata, checkpointoptions) is executed in this runnable method, where the invokable is SourceStreamTask.

SourceStreamTask.triggerCheckpoint

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
    extends StreamTask<OUT, OP> {

    private volatile boolean externallyInducedCheckpoints;

    @Override
    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        if (!externallyInducedCheckpoints) {
            return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);
        }
        else {
            // we do not trigger checkpoints here, we simply state whether we can trigger them
            synchronized (getCheckpointLock()) {
                return isRunning();
            }
        }
    }

    //......
}
  • The triggerCheckpoint of SourceStreamTask judges first. If externallyInducedCheckpoints is false, the triggerCheckpoint of the parent class StreamTask is called.

StreamTask.triggerCheckpoint

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

    @Override
    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        try {
            // No alignment if we inject a checkpoint
            CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
                    .setBytesBufferedInAlignment(0L)
                    .setAlignmentDurationNanos(0L);

            return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
        }
        catch (Exception e) {
            // propagate exceptions only if the task is still in "running" state
            if (isRunning) {
                throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
                    " for operator " + getName() + '.', e);
            } else {
                LOG.debug("Could not perform checkpoint {} for operator {} while the " +
                    "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
                return false;
            }
        }
    }

    private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics) throws Exception {

        LOG.debug("Starting checkpoint ({}) {} on task {}",
            checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

        synchronized (lock) {
            if (isRunning) {
                // we can do a checkpoint

                // All of the following steps happen as an atomic step from the perspective of barriers and
                // records/watermarks/timers/callbacks.
                // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
                // checkpoint alignments

                // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
                //           The pre-barrier work should be nothing or minimal in the common case.
                operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());

                // Step (2): Send the checkpoint barrier downstream
                operatorChain.broadcastCheckpointBarrier(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                // Step (3): Take the state snapshot. This should be largely asynchronous, to not
                //           impact progress of the streaming topology
                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;
            }
            else {
                // we cannot perform our checkpoint - let the downstream operators know that they
                // should not wait for any input from this operator

                // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
                // yet be created
                final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
                Exception exception = null;

                for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {
                    try {
                        streamRecordWriter.broadcastEvent(message);
                    } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(
                            new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
                            exception);
                    }
                }

                if (exception != null) {
                    throw exception;
                }

                return false;
            }
        }
    }

    private void checkpointState(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics) throws Exception {

        CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
                checkpointMetaData.getCheckpointId(),
                checkpointOptions.getTargetLocation());

        CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
            this,
            checkpointMetaData,
            checkpointOptions,
            storage,
            checkpointMetrics);

        checkpointingOperation.executeCheckpointing();
    }

    //......
}
  • The main processing logic of the triggerCheckpoint method of StreamTask is on the performCheckpoint method, which performs different processing for the isRunning of task.
  • When isRunning is true, there are three steps to deal with here, the first step is to execute the Operator Chain. PrepareSnapshotReportier, the second step is to execute the Operator Chain. BroadcastCheckPointBarrier, and the third step is to execute the checkpointState method. CheckpointState creates a CheckpointingOperation and then calls checkpointing operation.executecheckpointing ()
  • If isRunning is false, then streamrecordwriter.broadcastevent (message) is here and CancelCheckpointMarker is here.

OperatorChain.prepareSnapshotPreBarrier

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/tasks/OperatorChain.java

@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        // go forward through the operator chain and tell each operator
        // to prepare the checkpoint
        final StreamOperator<?>[] operators = this.allOperators;
        for (int i = operators.length - 1; i >= 0; --i) {
            final StreamOperator<?> op = operators[i];
            if (op != null) {
                op.prepareSnapshotPreBarrier(checkpointId);
            }
        }
    }

    //......
}
  • OperatorChain’s preparesnapshotparer traverses allOperators calling the StreamOperator’s preparesnapshotparer method one by one.

OperatorChain.broadcastCheckpointBarrier

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/tasks/OperatorChain.java

@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {

    public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
        CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
        for (RecordWriterOutput<?> streamOutput : streamOutputs) {
            streamOutput.broadcastEvent(barrier);
        }
    }

    //......
}
  • OperatorChain’s broadcastCheckpointBarrier method calls the broadcastEvent method of streamOutputs one by one, traversing streamoutputs.

CheckpointingOperation.executeCheckpointing

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/tasks/StreamTask.java

    private static final class CheckpointingOperation {

        private final StreamTask<?, ?> owner;

        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointOptions checkpointOptions;
        private final CheckpointMetrics checkpointMetrics;
        private final CheckpointStreamFactory storageLocation;

        private final StreamOperator<?>[] allOperators;

        private long startSyncPartNano;
        private long startAsyncPartNano;

        // ------------------------

        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;

        public CheckpointingOperation(
                StreamTask<?, ?> owner,
                CheckpointMetaData checkpointMetaData,
                CheckpointOptions checkpointOptions,
                CheckpointStreamFactory checkpointStorageLocation,
                CheckpointMetrics checkpointMetrics) {

            this.owner = Preconditions.checkNotNull(owner);
            this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
            this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions);
            this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
            this.storageLocation = Preconditions.checkNotNull(checkpointStorageLocation);
            this.allOperators = owner.operatorChain.getAllOperators();
            this.operatorSnapshotsInProgress = new HashMap<>(allOperators.length);
        }

        public void executeCheckpointing() throws Exception {
            startSyncPartNano = System.nanoTime();

            try {
                for (StreamOperator<?> op : allOperators) {
                    checkpointStreamOperator(op);
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                        checkpointMetaData.getCheckpointId(), owner.getName());
                }

                startAsyncPartNano = System.nanoTime();

                checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

                // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
                AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
                    owner,
                    operatorSnapshotsInProgress,
                    checkpointMetaData,
                    checkpointMetrics,
                    startAsyncPartNano);

                owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - finished synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }
            } catch (Exception ex) {
                // Cleanup to release resources
                for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
                    if (null != operatorSnapshotResult) {
                        try {
                            operatorSnapshotResult.cancel();
                        } catch (Exception e) {
                            LOG.warn("Could not properly cancel an operator snapshot result.", e);
                        }
                    }
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }

                owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);
            }
        }

        @SuppressWarnings("deprecation")
        private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
            if (null != op) {

                OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions,
                        storageLocation);
                operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
            }
        }

        private enum AsyncCheckpointState {
            RUNNING,
            DISCARDED,
            COMPLETED
        }
    }
  • CheckpointingOperation is defined in the StreamTask class. the executeCheckpointing method performs checkpointing operation on all StreamOperator first . The checkpointStreamOperator method calls the snapshotState method of the StreamOperator, then creates the AsyncCheckpointRunnable task and submits it for asynchronous operation.

AbstractStreamOperator.snapshotState

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>, Serializable {

    @Override
    public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
            CheckpointStreamFactory factory) throws Exception {

        KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

        OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

        try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
                checkpointId,
                timestamp,
                factory,
                keyGroupRange,
                getContainingTask().getCancelables())) {

            snapshotState(snapshotContext);

            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }

            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
        } catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            } catch (Exception e) {
                snapshotException.addSuppressed(e);
            }

            String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
                getOperatorName() + ".";

            if (!getContainingTask().isCanceled()) {
                LOG.info(snapshotFailMessage, snapshotException);
            }
            throw new Exception(snapshotFailMessage, snapshotException);
        }

        return snapshotInProgress;
    }

    /**
     * Stream operators with state, which want to participate in a snapshot need to override this hook method.
     *
     * @param context context that provides information and means required for taking a snapshot
     */
    public void snapshotState(StateSnapshotContext context) throws Exception {
        final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
        //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
        if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
            ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {

            KeyedStateCheckpointOutputStream out;

            try {
                out = context.getRawKeyedOperatorStateOutput();
            } catch (Exception exception) {
                throw new Exception("Could not open raw keyed operator state stream for " +
                    getOperatorName() + '.', exception);
            }

            try {
                KeyGroupsList allKeyGroups = out.getKeyGroupList();
                for (int keyGroupIdx : allKeyGroups) {
                    out.startNewKeyGroup(keyGroupIdx);

                    timeServiceManager.snapshotStateForKeyGroup(
                        new DataOutputViewStreamWrapper(out), keyGroupIdx);
                }
            } catch (Exception exception) {
                throw new Exception("Could not write timer service of " + getOperatorName() +
                    " to checkpoint state stream.", exception);
            } finally {
                try {
                    out.close();
                } catch (Exception closeException) {
                    LOG.warn("Could not close raw keyed operator state stream for {}. This " +
                        "might have prevented deleting some state data.", getOperatorName(), closeException);
                }
            }
        }
    }

    //......
}
  • The snapshotState method of AbstractStreamOperator operates only if keyedStateBackend is of AbstractKeyedStateBackend type and requireslegacysynchroniz ationapsnapshots is true. Specifically, it triggers TimeServiceManager. SnapshotStateWorkeyGroup (New DataOutputViewstreamwrapper (OUT), KeyGroupIdx); However, it has different subclasses that may override snapshotState methods, such as AbstractUdfStreamOperator.

AbstractUdfStreamOperator

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
        extends AbstractStreamOperator<OUT>
        implements OutputTypeConfigurable<OUT> {

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
    }

        //......
}
  • AbstractStreamOperator covers the snapshotState method of the parent class AbstractStreamOperator and adds StreamingFunctionUtils. SnapshotFunctionState operation

StreamingFunctionUtils.snapshotFunctionState

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java

@Internal
public final class StreamingFunctionUtils {

    public static void snapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {

        Preconditions.checkNotNull(context);
        Preconditions.checkNotNull(backend);

        while (true) {

            if (trySnapshotFunctionState(context, backend, userFunction)) {
                break;
            }

            // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
            if (userFunction instanceof WrappingFunction) {
                userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
            } else {
                break;
            }
        }
    }

    private static boolean trySnapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {

        if (userFunction instanceof CheckpointedFunction) {
            ((CheckpointedFunction) userFunction).snapshotState(context);

            return true;
        }

        if (userFunction instanceof ListCheckpointed) {
            @SuppressWarnings("unchecked")
            List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
                    snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());

            ListState<Serializable> listState = backend.
                    getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

            listState.clear();

            if (null != partitionableState) {
                try {
                    for (Serializable statePartition : partitionableState) {
                        listState.add(statePartition);
                    }
                } catch (Exception e) {
                    listState.clear();

                    throw new Exception("Could not write partitionable state to operator " +
                        "state backend.", e);
                }
            }

            return true;
        }

        return false;
    }

    //......
}
  • SnapshotFunctionState method, where the trySnapshotFunctionState operation is executed, and where the type of userFunction, if the CheckPointFunction interface is implemented, the CheckPointFunction. Snapshot State is called. If the ListCheckPoint interface is implemented, the ListCheckpointed.snapshotState method is called. Note that the ListState is clear first, and then the ListState.add method is called to add the returned List to the ListState.

Summary

  • Flink’s CheckpointCoordinator ordectivator will trigger CheckpointCoordinator’s startCheckpointScheduler when the job’s status is RUNNING, and call checkpointcoordinator’s stopCheckpointScheduler method when it is not RUNNING.
  • The startcheckpointScheduler of the CheckpointCoordinator mainly registers the ScheduledTrigger task, its run method performs the triggerCheckpoint operation, and the triggerCheckpoint method performs a series of checks before actually triggering the Checkpoint. Fail fast immediately if not satisfied, the possible reasons are (checkpointdeclinereason.too _ many _ concurrent _ checkpoints, checkpointdeclinereason.minimum _ time _ between _ checkpoints, not _ all _ required _ tasks _ running); If the condition is met, it is to traverse executions one by one and call Execution.triggerCheckpoint, which calls the triggerCheckpoint method of TaskExecutor through rpc with the help of taskmanagergateway.triggercheckpoint.
  • The triggerCheckpoint of TaskExecutor is mainly to call the triggerCheckpointBarrier method of TaskExecutor, which is mainly to execute a runnable asynchronously, in which the run method is to call invokable.triggerCheckpoint. The invokable here is SourceStreamTask, and it is mainly to call the triggerCheckpoint method of the parent StreamTask. the main logic of this method is on performCheckpoint operation. When isRunning is true, performCheckpoint is processed in three steps: the first step is to execute OPERATOR CHAIN. PREPARESNAPSHOTPREBARRIER, and the second step is to execute OPERATOR CHAIN. BROADCASTCHECKPOINT BARRIER. The third step is to execute the checkpointState method, in which checkpointState creates CheckpointingOperation, and then calls checkpointing operation.executecheckpointing ()
  • The executeCheckpointing method of CheckpointingOperation performs CheckPointsProcessorOperation on all StreamOperators, while the CheckPointsProcessormethod calls the StreamOperator’s snapshotState method. The snapshotState method of AbstractStreamOperator operates onl y if keyedStateBackend is of AbstractKeyedStateBackend type and requireslegacysynchronizationapsnapshots is true
  • AbstractStreamOperator covers the snapshotState method of the parent class AbstractStreamOperator, and adds StreamingFunctionUtils. SnapshotFunctionState operation, which calls the corresponding method (If the CheckpointedFunction interface is implemented, the checkpointedfunction.snapshot state is called; if the listcheckpointedinterface is implemented, the ListCheckpointed.snapshotState method is called.)

doc