Talk about storm’s CheckpointSpout

  storm

Order

This article mainly studies storm’s CheckpointSpout.

TopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java

    public StormTopology createTopology() {
        Map<String, Bolt> boltSpecs = new HashMap<>();
        Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
        maybeAddCheckpointSpout();
        for (String boltId : _bolts.keySet()) {
            IRichBolt bolt = _bolts.get(boltId);
            bolt = maybeAddCheckpointTupleForwarder(bolt);
            ComponentCommon common = getComponentCommon(boltId, bolt);
            try {
                maybeAddCheckpointInputs(common);
                boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException(
                        "Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }
        for (String spoutId : _spouts.keySet()) {
            IRichSpout spout = _spouts.get(spoutId);
            ComponentCommon common = getComponentCommon(spoutId, spout);
            try {
                spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException(
                        "Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }

        StormTopology stormTopology = new StormTopology(spoutSpecs,
                                                        boltSpecs,
                                                        new HashMap<>());

        stormTopology.set_worker_hooks(_workerHooks);

        if (!_componentToSharedMemory.isEmpty()) {
            stormTopology.set_component_to_shared_memory(_componentToSharedMemory);
            stormTopology.set_shared_memory(_sharedMemory);
        }

        return Utils.addVersions(stormTopology);
    }

    /**
     * If the topology has at least one stateful bolt add a {@link CheckpointSpout} component to the topology.
     */
    private void maybeAddCheckpointSpout() {
        if (hasStatefulBolt) {
            setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1);
        }
    }

    private void maybeAddCheckpointInputs(ComponentCommon common) {
        if (hasStatefulBolt) {
            addCheckPointInputs(common);
        }
    }

    /**
     * If the topology has at least one stateful bolt all the non-stateful bolts are wrapped in {@link CheckpointTupleForwarder} so that the
     * checkpoint tuples can flow through the topology.
     */
    private IRichBolt maybeAddCheckpointTupleForwarder(IRichBolt bolt) {
        if (hasStatefulBolt && !(bolt instanceof StatefulBoltExecutor)) {
            bolt = new CheckpointTupleForwarder(bolt);
        }
        return bolt;
    }

    /**
     * For bolts that has incoming streams from spouts (the root bolts), add checkpoint stream from checkpoint spout to its input. For other
     * bolts, add checkpoint stream from the previous bolt to its input.
     */
    private void addCheckPointInputs(ComponentCommon component) {
        Set<GlobalStreamId> checkPointInputs = new HashSet<>();
        for (GlobalStreamId inputStream : component.get_inputs().keySet()) {
            String sourceId = inputStream.get_componentId();
            if (_spouts.containsKey(sourceId)) {
                checkPointInputs.add(new GlobalStreamId(CHECKPOINT_COMPONENT_ID, CHECKPOINT_STREAM_ID));
            } else {
                checkPointInputs.add(new GlobalStreamId(sourceId, CHECKPOINT_STREAM_ID));
            }
        }
        for (GlobalStreamId streamId : checkPointInputs) {
            component.put_to_inputs(streamId, Grouping.all(new NullStruct()));
        }
    }
  • When TopologyBuilder createTopology, it will call maybeAddCheckpointSpout, and if hasStatefulBolt, it will automatically create and add CheckpointSpout.
  • If it is hasStatefulBolt and bolt is not of StatefulBoltExecutor type, it will be wrapped with CheckpointTupleForwarder.
  • If it is hasStatefulBolt, addCheckPointInputs will be called to configure inputs

CheckpointSpout

storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckpointSpout.java

/**
 * Emits checkpoint tuples which is used to save the state of the {@link org.apache.storm.topology.IStatefulComponent} across the topology.
 * If a topology contains Stateful bolts, Checkpoint spouts are automatically added to the topology. There is only one Checkpoint task per
 * topology. Checkpoint spout stores its internal state in a {@link KeyValueState}.
 *
 * @see CheckPointState
 */
public class CheckpointSpout extends BaseRichSpout {
    public static final String CHECKPOINT_STREAM_ID = "$checkpoint";
    public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout";
    public static final String CHECKPOINT_FIELD_TXID = "txid";
    public static final String CHECKPOINT_FIELD_ACTION = "action";
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointSpout.class);
    private static final String TX_STATE_KEY = "__state";
    private TopologyContext context;
    private SpoutOutputCollector collector;
    private long lastCheckpointTs;
    private int checkpointInterval;
    private int sleepInterval;
    private boolean recoveryStepInProgress;
    private boolean checkpointStepInProgress;
    private boolean recovering;
    private KeyValueState<String, CheckPointState> checkpointState;
    private CheckPointState curTxState;

    public static boolean isCheckpoint(Tuple input) {
        return CHECKPOINT_STREAM_ID.equals(input.getSourceStreamId());
    }

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        open(context, collector, loadCheckpointInterval(conf), loadCheckpointState(conf, context));
    }

    // package access for unit test
    void open(TopologyContext context, SpoutOutputCollector collector,
              int checkpointInterval, KeyValueState<String, CheckPointState> checkpointState) {
        this.context = context;
        this.collector = collector;
        this.checkpointInterval = checkpointInterval;
        this.sleepInterval = checkpointInterval / 10;
        this.checkpointState = checkpointState;
        this.curTxState = checkpointState.get(TX_STATE_KEY);
        lastCheckpointTs = 0;
        recoveryStepInProgress = false;
        checkpointStepInProgress = false;
        recovering = true;
    }

    @Override
    public void nextTuple() {
        if (shouldRecover()) {
            handleRecovery();
            startProgress();
        } else if (shouldCheckpoint()) {
            doCheckpoint();
            startProgress();
        } else {
            Utils.sleep(sleepInterval);
        }
    }

    @Override
    public void ack(Object msgId) {
        LOG.debug("Got ack with txid {}, current txState {}", msgId, curTxState);
        if (curTxState.getTxid() == ((Number) msgId).longValue()) {
            if (recovering) {
                handleRecoveryAck();
            } else {
                handleCheckpointAck();
            }
        } else {
            LOG.warn("Ack msgid {}, txState.txid {} mismatch", msgId, curTxState.getTxid());
        }
        resetProgress();
    }

    @Override
    public void fail(Object msgId) {
        LOG.debug("Got fail with msgid {}", msgId);
        if (!recovering) {
            LOG.debug("Checkpoint failed, will trigger recovery");
            recovering = true;
        }
        resetProgress();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
    }

    private int loadCheckpointInterval(Map<String, Object> topoConf) {
        int interval = 0;
        if (topoConf.containsKey(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)) {
            interval = ((Number) topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
        }
        // ensure checkpoint interval is not less than a sane low value.
        interval = Math.max(100, interval);
        LOG.info("Checkpoint interval is {} millis", interval);
        return interval;
    }

    private boolean shouldCheckpoint() {
        return !recovering && !checkpointStepInProgress &&
               (curTxState.getState() != COMMITTED || checkpointIntervalElapsed());
    }

    private boolean checkpointIntervalElapsed() {
        return (System.currentTimeMillis() - lastCheckpointTs) > checkpointInterval;
    }

    private void doCheckpoint() {
        LOG.debug("In checkpoint");
        if (curTxState.getState() == COMMITTED) {
            saveTxState(curTxState.nextState(false));
            lastCheckpointTs = System.currentTimeMillis();
        }
        Action action = curTxState.nextAction(false);
        emit(curTxState.getTxid(), action);
    }

    private void emit(long txid, Action action) {
        LOG.debug("Current state {}, emitting txid {}, action {}", curTxState, txid, action);
        collector.emit(CHECKPOINT_STREAM_ID, new Values(txid, action), txid);
    }

    //......
}
  • CheckpointSpout received information from config.topology _ state _ checkpoint _ interval (topology.state.checkpoint.interval.ms) The time interval for reading checkpoint is 1000 by default in defaults.yaml. If not specified, 100 is used and the lowest value is 100
  • NextTuple method first judges shouldRecover, if recovery is needed, calls handleRecovery to recover, and then startProgress; ; If checkpoint is required, checkpoint is carried out, otherwise sleepInterval will carry out the next judgment.
  • If recover is not required, call the shouldCheckpoint method to judge whether checkpoint is required. if the current state is not COMMITTED or the current time is longer than che ckpointInterval from the last checkpoint, doCheckpoint operation and send the next action to CHECKPOINT_STREAM_ID.
  • After receiving the ack, CheckpointSpout will perform the saveTxState operation, call checkpointState.commit to submit the entire checkpoint, and then call resetProgress to reset the state.
  • If it is an ack of fail, call resetProgress to reset the status.

CheckPointState

storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckPointState.java

    /**
     * Get the next state based on this checkpoint state.
     *
     * @param recovering if in recovering phase
     * @return the next checkpoint state based on this state.
     */
    public CheckPointState nextState(boolean recovering) {
        CheckPointState nextState;
        switch (state) {
            case PREPARING:
                nextState = recovering ? new CheckPointState(txid - 1, COMMITTED) : new CheckPointState(txid, COMMITTING);
                break;
            case COMMITTING:
                nextState = new CheckPointState(txid, COMMITTED);
                break;
            case COMMITTED:
                nextState = recovering ? this : new CheckPointState(txid + 1, PREPARING);
                break;
            default:
                throw new IllegalStateException("Unknown state " + state);
        }
        return nextState;
    }

    /**
     * Get the next action to perform based on this checkpoint state.
     *
     * @param recovering if in recovering phase
     * @return the next action to perform based on this state
     */
    public Action nextAction(boolean recovering) {
        Action action;
        switch (state) {
            case PREPARING:
                action = recovering ? Action.ROLLBACK : Action.PREPARE;
                break;
            case COMMITTING:
                action = Action.COMMIT;
                break;
            case COMMITTED:
                action = recovering ? Action.INITSTATE : Action.PREPARE;
                break;
            default:
                throw new IllegalStateException("Unknown state " + state);
        }
        return action;
    }
  • CheckPointState provides the nextState method to switch states, while the nextAction method provides the next action corresponding to the state.

BaseStatefulBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java

    public void execute(Tuple input) {
        if (CheckpointSpout.isCheckpoint(input)) {
            processCheckpoint(input);
        } else {
            handleTuple(input);
        }
    }

    /**
     * Invokes handleCheckpoint once checkpoint tuple is received on all input checkpoint streams to this component.
     */
    private void processCheckpoint(Tuple input) {
        CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
        long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
        if (shouldProcessTransaction(action, txid)) {
            LOG.debug("Processing action {}, txid {}", action, txid);
            try {
                if (txid >= lastTxid) {
                    handleCheckpoint(input, action, txid);
                    if (action == ROLLBACK) {
                        lastTxid = txid - 1;
                    } else {
                        lastTxid = txid;
                    }
                } else {
                    LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid);
                    collector.ack(input);
                }
            } catch (Throwable th) {
                LOG.error("Got error while processing checkpoint tuple", th);
                collector.fail(input);
                collector.reportError(th);
            }
        } else {
            LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, " +
                      "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount);
            collector.ack(input);
        }
    }

    /**
     * Checks if check points have been received from all tasks across all input streams to this component
     */
    private boolean shouldProcessTransaction(CheckPointState.Action action, long txid) {
        TransactionRequest request = new TransactionRequest(action, txid);
        Integer count;
        if ((count = transactionRequestCount.get(request)) == null) {
            transactionRequestCount.put(request, 1);
            count = 1;
        } else {
            transactionRequestCount.put(request, ++count);
        }
        if (count == checkPointInputTaskCount) {
            transactionRequestCount.remove(request);
            return true;
        }
        return false;
    }

    protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
    }
  • BaseStatefulBoltExecutor’s execute method first determines whether it is a tuple from CheckpointSpout through CheckpointSpout. IsCheckpoint (Input), and if so, executes processCheckpoint
  • ProcessCheckpoint first calls shouldProcessTransaction to determine whether all input stream task have checkpinttuples sent to it to decide whether to proceed.
  • If txid is greater than lastTxid, the handleCheckpoint method is called, which is implemented by subclasses.

StatefulBoltExecutor.handleCheckpoint

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java

public class StatefulBoltExecutor<T extends State> extends BaseStatefulBoltExecutor {
    //......

    protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
        LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", checkpointTuple, action, txid);
        if (action == PREPARE) {
            if (boltInitialized) {
                bolt.prePrepare(txid);
                state.prepareCommit(txid);
                preparedTuples.addAll(collector.ackedTuples());
            } else {
                /*
                 * May be the task restarted in the middle and the state needs be initialized.
                 * Fail fast and trigger recovery.
                 */
                LOG.debug("Failing checkpointTuple, PREPARE received when bolt state is not initialized.");
                collector.fail(checkpointTuple);
                return;
            }
        } else if (action == COMMIT) {
            bolt.preCommit(txid);
            state.commit(txid);
            ack(preparedTuples);
        } else if (action == ROLLBACK) {
            bolt.preRollback();
            state.rollback();
            fail(preparedTuples);
            fail(collector.ackedTuples());
        } else if (action == INITSTATE) {
            if (!boltInitialized) {
                bolt.initState((T) state);
                boltInitialized = true;
                LOG.debug("{} pending tuples to process", pendingTuples.size());
                for (Tuple tuple : pendingTuples) {
                    doExecute(tuple);
                }
                pendingTuples.clear();
            } else {
                /*
                 * If a worker crashes, the states of all workers are rolled back and an initState message is sent across
                 * the topology so that crashed workers can initialize their state.
                 * The bolts that have their state already initialized need not be re-initialized.
                 */
                LOG.debug("Bolt state is already initialized, ignoring tuple {}, action {}, txid {}",
                          checkpointTuple, action, txid);
            }
        }
        collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
        collector.delegate.ack(checkpointTuple);
    }

    //......
}
  • StatefulBoltExecutor inherits BaseStatefulBoltExecutor and implements handleCheckpoint method.
  • The method carries out corresponding processing according to different action. if PREPARE, it calls bolt’s prepare and calls state prepareCommit; ; COMMIT calls bolt’s precommit; and state. In case of ROLLBACK, call bolt’s preRollback, and call rollback on state; For INITSTATE, if bolt is not initialized, the initState of bolt is called.
  • According to the action after the execution, continue to flow checkpoint tuple, and then call collector.delegate.ack (checkpointtuple) for ack

CheckpointTupleForwarder.handleCheckpoint

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java

/**
 * Wraps {@link IRichBolt} and forwards checkpoint tuples in a stateful topology.
 * <p>
 * When a storm topology contains one or more {@link IStatefulBolt} all non-stateful bolts are wrapped in {@link CheckpointTupleForwarder}
 * so that the checkpoint tuples can flow through the entire topology DAG.
 * </p>
 */
public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor {
    //......
    /**
     * Forwards the checkpoint tuple downstream.
     *
     * @param checkpointTuple the checkpoint tuple
     * @param action          the action (prepare, commit, rollback or initstate)
     * @param txid            the transaction id.
     */
    protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
        collector.emit(CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
        collector.ack(checkpointTuple);
    }

    //......
}
  • CheckpointTupleForwarder is used to wrap non-stateful bolts, enabling checkpointtubules to flow smoothly throughout the topology DAG.

Summary

  • If topology has IStatefulBolt (IstatefulBolt provides bolt with an abstraction to access the state. State is persisted through checkpiont mechanism and at-least once semantics are provided through ack mechanism.), TopologyBuilder will automatically add CheckpointSpout, and for bolt that is not of StatefulBoltExecutor type, CheckpointTupleForwarder will be used for packaging, thus making checkpint tuple run through the DAG of the whole topology.
  • CheckpointSpout first determines whether it needs to recover in the nextTuple method. if it does not, it will sleep for a period of time. the sleepInterval is checkpointInterval/10, and the checkpointInterval is at least 100. it is read from config.topology _ state _ checkpoint _ interval configuration, and the default is 1000. Note that this value does not mean checkpoint detection is performed every other checkpointInterval, that is, it is not the fixedRate effect but the fixedDelay effect, that is, if the current checkpoint is not finished, checkpoint detection will not be repeated.
  • Both recover and checkpoint send tuple; to CHECKPOINT_STREAM_ID; BaseStatefulBoltExecutor encapsulates the processing of checkpoint tuple in the execute method, while non-checkpint tuple is implemented by subclasses through the abstract method handleTuple. The specific handleCheckpoint method is implemented b y subclasses. BaseStatefulBoltExecutor only judges the premise of the method. it requires receiving checkpointtuples from task of all input streams, and txid >= lastTxid to execute handleCheckpoint operation.
  • StatefulBoltExecutor inherits BaseStatefulBoltExecutor and implements handleCheckpoint method, which is used to analyze several action such as PREPARE, COMMIT, ROLLBACK, INITSTATE (Similar to the three phase commission protocol.) for corresponding processing, and then continue to flow checkpoint tuple, and ack
  • CheckpointSpout uses txid as msgId to send reliable tuple when sending checkpoint;tuples. after all checkpointtuples are ack in the DAG of the whole topology, it will receive ack and then call checkpointState.commit to submit the whole checkpoint; If it is fail, reset the relevant status; In general, config.topology _ state _ checkpoint _ interval (The default is 1000, i.e. 1 second.) value is less than config.topology _ message _ timeout _ secs (Secs, default is 30 seconds.); If the checkpointInterval is set too large, the intermediate assumption is that the state after worker crash is restored is not very real-time, thus losing the significance of checkpoint.

doc