[case49] Talk about flink’s checkpoint Configuration

  flink

Order

This paper mainly studies the checkpoint configuration of flink

Example

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
  • Enable checkpoint;ing method to set open checkpoint; You can use enablecheckpointing (longinterval) or enablecheckpointing (longinterval, checkpointing mode); Interval is used to specify the trigger interval of checkpoint (Milliseconds), and CheckpointingMode defaults to CheckpointingMode.EXACTLY_ONCE, or it can be specified as checkpointing mode. at _ lease _ once
  • You can also set CheckpointingMode through StreamExecutionEnvironment. GetCheckPointConfig (). SetCheckPointingMode. For ultra-low latency applications (About a few milliseconds) Checkpointing Mode. AT _ LEAST _ ONCE can be used, while CheckpointingMode.EXACTLY_ONCE can be used for most other applications
  • CheckpointTimeout specifies the timeout period (Milliseconds), timeout is not completed will be abort
  • MinPauseBetweenCheckpoints is used to specify the minimum waiting time for starting another checkpoint after the completion of the previous checkpoint on the checkpoint coordinator. When this parameter is specified, the value of maxConcurrentCheckpoints is 1
  • MaxConcurrentCheckpoints is used to specify the maximum number of checkpoints in operation. It will not spend too much time on checkpoints for packaging topology. If minPauseBetweenCheckpoints is set, the parameter maxConcurrentCheckpoints will not work (Values greater than 1 have no effect)
  • EnableExternalizedCheckpoints is used to open the external persistence of checkpoints, but it will not automatically clean up when the job fails, and it needs to manually clean up the state; itself. ExternalizedCheckpointCleanup is used to specify how the externalized checkpoint should be cleaned when jobcancelled. if delete _ on _ cancela tion, the externalized state will be automatically deleted when jobcancelled, but if it is FAILED, it will be retained. RETAIN _ ON _ CANCELATION will retain externalized checkpoint state when jobcancelled
  • FailOnCheckpointingErrors is used to specify whether the task should fail when an exception occurs at checkpoint. the default value is true. if it is s et to false, the task will reject checkpoint and continue running.

Flink-conf.yaml related configuration

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
# state.backend.incremental: false
  • State.backend is used to specify backend stored by checkpoint state, which defaults to none.
  • Async is used to specify whether backend uses asynchronous snapshot (The default is true), some state backend that does not support async or only supports async may ignore this parameter.
  • State.backend.fs.memory-threshold, which defaults to 1024, is used to specify the state size threshold stored in files. if it is less than this value, it will be stored in root checkpoint metadata file.
  • State.backend.incremental, which defaults to false, is used to specify whether incremental checkpoint is adopted. some backend that do not support incremental checkpoint ignore this configuration.
  • Local-recovery is false by default.
  • State.checkpoints.dir, which defaults to none, is used to specify the directories of data files and meta data storage of checkpoint, which must be visible to all participating TaskManagers and JobManagers.
  • State.checkpoints.num-retained, which defaults to 1 and specifies the number of completed checkpoints retained
  • State.savepoints.dir, which defaults to none, is used to specify the default directory for savepoints.
  • Taskmanager.state.local.root-dirs, default is none

Summary

  • The checkpoint; can be set to open by using the streamexecutionenvironment; enablecheckpointing method; You can use enablecheckpointing (longinterval) or enablecheckpointing (longinterval, checkpointing mode)
  • The advanced configuration of checkpoint can configure checkpointTimeout (Used to specify the timeout period for checkpoint execution, in milliseconds.),minPauseBetweenCheckpoints(Used to specify the minimum waiting time for another checkpoint after the completion of the previous checkpoint on the checkpoint coordinator.),maxConcurrentCheckpoints(It is used to specify the maximum number of checkpoints in operation. If minPauseBetweenCheckpoints is set, the value greater than 1 for maxConcurrentCheckpoints has no effect.),enableExternalizedCheckpoints(Used to turn on external persistence of checkpoints. Externally checkpointstate cannot be cleaned automatically when job failed, but it can be configured to delete or retain State when jobcancelled.)
  • In flink-conf.yaml, there are also checkpoint related configurations, mainly state backend configurations, such as state.backend.async, state.backend.incremental, state.checkpoints.dir, state.savepoints.dir, etc.

doc