Talk about flink’s CheckPoint

  flink

Order

This article mainly studies flink’s CheckpointedFunction.

Example

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}
  • This BufferingSink implements the CheckpointedFunction interface, which defines checkpointedState of ListState type and bufferedElements of liststructure.
  • In the invoke method, value is cached to bufferedElements. when the number of caches triggers the threshold, sink operation is executed, and bufferedElements are then emptied
  • In the snapshotState method, snapshot operations are performed on bufferedElements, and ListStateDescriptor is first created in initializeState. Then get the ListState through functioninitialization context. getoperatorstatestore () . getListState (descriptor), and then judge whether the state has restored in the snapshot of the previous execution, and if so, restore the data in the liststate to bufferedElements.

CheckpointedFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java

@PublicEvolving
@SuppressWarnings("deprecation")
public interface CheckpointedFunction {

    /**
     * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
     * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
     * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
     *
     * @param context the context for drawing a snapshot of the operator
     * @throws Exception
     */
    void snapshotState(FunctionSnapshotContext context) throws Exception;

    /**
     * This method is called when the parallel function instance is created during distributed
     * execution. Functions typically set up their state storing data structures in this method.
     *
     * @param context the context for initializing the operator
     * @throws Exception
     */
    void initializeState(FunctionInitializationContext context) throws Exception;

}
  • CheckpointedFunction is the core interface of stateful transformation functions and is used to maintain state across stream.
  • SnapshotState is called when checkpoint point, which is used for snapshot state and is usually used for flush, commit, synchronize external systems.
  • Initialization state is called during the initialization of parallel function (the first initialization or the previous checkpoint recover), and is usually used to initializeState and logic for handling state recovery.

FunctionSnapshotContext

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/FunctionSnapshotContext.java

/**
 * This interface provides a context in which user functions that use managed state (i.e. state that is managed by state
 * backends) can participate in a snapshot. As snapshots of the backends themselves are taken by the system, this
 * interface mainly provides meta information about the checkpoint.
 */
@PublicEvolving
public interface FunctionSnapshotContext extends ManagedSnapshotContext {
}
  • FunctionSnapshotContext inherits the ManagedSnapshotContext interface.

ManagedSnapshotContext

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/ManagedSnapshotContext.java

/**
 * This interface provides a context in which operators that use managed state (i.e. state that is managed by state
 * backends) can perform a snapshot. As snapshots of the backends themselves are taken by the system, this interface
 * mainly provides meta information about the checkpoint.
 */
@PublicEvolving
public interface ManagedSnapshotContext {

    /**
     * Returns the ID of the checkpoint for which the snapshot is taken.
     * 
     * <p>The checkpoint ID is guaranteed to be strictly monotonously increasing across checkpoints.
     * For two completed checkpoints <i>A</i> and <i>B</i>, {@code ID_B > ID_A} means that checkpoint
     * <i>B</i> subsumes checkpoint <i>A</i>, i.e., checkpoint <i>B</i> contains a later state
     * than checkpoint <i>A</i>.
     */
    long getCheckpointId();

    /**
     * Returns timestamp (wall clock time) when the master node triggered the checkpoint for which
     * the state snapshot is taken.
     */
    long getCheckpointTimestamp();
}
  • ManagedSnapshotContext defines getCheckpointId, getCheckpointTimestamp methods

FunctionInitializationContext

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/FunctionInitializationContext.java

/**
 * This interface provides a context in which user functions can initialize by registering to managed state (i.e. state
 * that is managed by state backends).
 *
 * <p>
 * Operator state is available to all functions, while keyed state is only available for functions after keyBy.
 *
 * <p>
 * For the purpose of initialization, the context signals if the state is empty or was restored from a previous
 * execution.
 *
 */
@PublicEvolving
public interface FunctionInitializationContext extends ManagedInitializationContext {
}
  • FunctionInitializationContext inherits the ManagedInitializationContext interface.

ManagedInitializationContext

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/state/ManagedInitializationContext.java

/**
 * This interface provides a context in which operators can initialize by registering to managed state (i.e. state that
 * is managed by state backends).
 *
 * <p>
 * Operator state is available to all operators, while keyed state is only available for operators after keyBy.
 *
 * <p>
 * For the purpose of initialization, the context signals if the state is empty (new operator) or was restored from
 * a previous execution of this operator.
 *
 */
public interface ManagedInitializationContext {

    /**
     * Returns true, if state was restored from the snapshot of a previous execution. This returns always false for
     * stateless tasks.
     */
    boolean isRestored();

    /**
     * Returns an interface that allows for registering operator state with the backend.
     */
    OperatorStateStore getOperatorStateStore();

    /**
     * Returns an interface that allows for registering keyed state with the backend.
     */
    KeyedStateStore getKeyedStateStore();

}
  • The ManagedInitializationContext interface defines isRestored, getOperatorStateStore, getKeyedStateStore methods

Summary

  • Flink has two basic state, Keyed State and operatorsstate (non-keyed state); Where Keyed State can only be used on functions and operators on KeyedStream; Each operator state is bound to an instance of parallel operator; Operator State supports redistributing when parallelism changes.
  • Keyed State and Operator State have managed and raw forms respectively. managed is managed by flink runtime, which is responsible for encoding and writing to checkpoint; . The raw form of state is managed by operators themselves. flink runtime cannot understand the data structure of the state and regards it as raw bytes; . All datastream function can use managed state, while raw state is generally limited to its own operators.
  • Stateful function can use managed operatorstate through CheckpointedFunction interface or listcheckpointfunction interface; CheckpointedFunction defines two methods: snapshotState and initializeState State. SnapshotState is called whenever checkpoint is executed; The initializeState method initializes each time a user-defined function is initialized (The first initialization or the previous checkpoint recoverThis method can be used not only to initialize the state but also to process the logic of state recovery
  • For manageed operator state, only the form of List-style is supported at present, i.e. state is required to be the list structure of serializable objects, which is convenient to redistributed; when rescale; There are currently two modes of redistribution schemes, namely, Even-split redistribution (When restore/redistribution, each operator only gets the sublist of the entire state.) and Union redistribution (When restore/redistribution, each operator gets a complete list of the entire state.)
  • FunctionSnapshotContext inherits the ManagedSnapshotContext interface, which defines getCheckpointId and getCheckpointTimestamp methods. FunctionInitializationContext inherits the ManagedInitializationContext interface, which defines isRestored, getOperatorStateStore, getKeyedStateStore methods. Can be used to determine whether it was restored in the snapshot of the previous execution, and to obtain the OperatorStateStore and KeyedStateStore objects

doc