Talk about flink’s ListCheckpointed

  flink

Order

This article mainly studies flink’s ListCheckpointed

Example

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  current offset for exactly once semantics */
    private Long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}
  • CounterSource is a stateful RichParallelSourceFunction, which implements the ListCheckpointed interface. The snapshotstate method returns the current offset, while the restoreState method restores the local offset according to the passed-in State. It should be noted here that if the semantics of exactly-on ce are to be reached at the time of failure or recovery, the SOURCE CONTEXT. GETCHECKPOINTLOCK is used for synchronization operation when updating offset.

ListCheckpointed

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java

@PublicEvolving
public interface ListCheckpointed<T extends Serializable> {

    /**
     * Gets the current state of the function. The state must reflect the result of all prior
     * invocations to this function.
     *
     * <p>The returned list should contain one entry for redistributable unit of state. See
     * the {@link ListCheckpointed class docs} for an illustration how list-style state
     * redistribution works.
     *
     * <p>As special case, the returned list may be null or empty (if the operator has no state)
     * or it may contain a single element (if the operator state is indivisible).
     *
     * @param checkpointId The ID of the checkpoint - a unique and monotonously increasing value.
     * @param timestamp The wall clock timestamp when the checkpoint was triggered by the master.
     *
     * @return The operator state in a list of redistributable, atomic sub-states.
     *         Should not return null, but empty list instead.
     *
     * @throws Exception Thrown if the creation of the state object failed. This causes the
     *                   checkpoint to fail. The system may decide to fail the operation (and trigger
     *                   recovery), or to discard this checkpoint attempt and to continue running
     *                   and to try again with the next checkpoint attempt.
     */
    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

    /**
     * Restores the state of the function or operator to that of a previous checkpoint.
     * This method is invoked when the function is executed after a failure recovery.
     * The state list may be empty if no state is to be recovered by the particular parallel instance
     * of the function.
     *
     * <p>The given state list will contain all the <i>sub states</i> that this parallel
     * instance of the function needs to handle. Refer to the  {@link ListCheckpointed class docs}
     * for an illustration how list-style state redistribution works.
     *
     * <p><b>Important:</b> When implementing this interface together with {@link RichFunction},
     * then the {@code restoreState()} method is called before {@link RichFunction#open(Configuration)}.
     *
     * @param state The state to be restored as a list of atomic sub-states.
     *
     * @throws Exception Throwing an exception in this method causes the recovery to fail.
     *                   The exact consequence depends on the configured failure handling strategy,
     *                   but typically the system will re-attempt the recovery, or try recovering
     *                   from a different checkpoint.
     */
    void restoreState(List<T> state) throws Exception;
}
  • ListCheckpointed defines two interfaces, one is the snapshotState method and the other is the restoreState method.
  • SnapshotState method, the method has a checkpointId parameter, which is the only monotonically increasing number, while timestamp is the timestamp when master triggers checkpoint. the method returns the current state (List structure)
  • The restoreState method will be called at the time of failure recovery. The parameter passed is a state of type List, and the state can be recovered locally in the method.

Summary

  • Stateful function can use managed operatorstate through CheckpointedFunction interface or listcheckpointfunction interface; For manageed operator state, currently only the form of List-style is supported, i.e. state is required to be the list structure of serializable objects, which is convenient for redistributed; during rescale; There are currently two modes of redistribution schemes, namely, Even-split redistribution (when restoring/redistributive, each operator only gets the sublist of the whole state) and Union redistribution (when restoring/redistributive, each operator gets the complete list of the whole state)
  • ListCheckpointed is a restricted version of CheckPoint. It can only support list-style state in the Even-split redistribution mode.
  • ListCheckpointed defines two methods, snapshotState method and restoreState method. The snapshotstate method is called when master triggers checkpoint. The user needs to return to the cur rent state, while the Restoring State method is called when failure recovery. The parameter passed is a List-type State. The State can be restored to the local state in the method.

doc