[case42] Talk about storm’s ack Mechanism

  storm

Order

This paper mainly studies the ack mechanism of storm

Example

SentenceSpout

public class AckSentenceSpout extends BaseRichSpout {

    private ConcurrentHashMap<UUID, Values> pending;

    private SpoutOutputCollector collector;

    private int index = 0;

    private String[] sentences = {
            "my dog has fleas",
            "i like cold beverages",
            "the dog ate my homework",
            "don't have a cow man",
            "i don't think i like fleas"
    };

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.pending = new ConcurrentHashMap<UUID, Values>();
    }

    @Override
    public void nextTuple() {
        Values values = new Values(sentences[index]);
        UUID msgId = UUID.randomUUID();
        this.pending.put(msgId, values);
//        this.collector.emit(values);
        //NOTE 这里要传入msgId
        this.collector.emit(values, msgId);
        index++;
        if (index >= sentences.length) {
            index = 0;
        }
        Utils.sleep(100);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    @Override
    public void ack(Object msgId) {
        this.pending.remove(msgId);
    }

    //NOTE 对于ack是失败的,要重新发送
    @Override
    public void fail(Object msgId) {
        this.collector.emit(this.pending.get(msgId), msgId);
    }
}
  • For spout, msgId needs to be specified at emit, then data needs to be cached, deleted at ack, and retransmitted at fail for retry.

AckWordCountBolt

public class AckWordCountBolt extends BaseRichBolt {
    private static final Logger LOGGER = LoggerFactory.getLogger(AckWordCountBolt.class);
    private OutputCollector collector;
    private HashMap<String, Long> counts = null;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap<String, Long>();
    }

    public void execute(Tuple tuple) {
        try{
            String word = tuple.getStringByField("word");
            Long count = this.counts.get(word);
            if(count == null){
                count = 0L;
            }
            count++;
            this.counts.put(word, count);

            //NOTE 传入当前处理的tuple作为anchor
            this.collector.emit(tuple, new Values(word, count));

            //NOTE 这里要自己ack
            this.collector.ack(tuple);
        }catch (Exception e){
            LOGGER.error(e.getMessage(),e);
            //NOTE 处理异常要fail
            this.collector.fail(tuple);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
  • For bolt, there are two things to do. One is to anchor, connect the input and output tuple at emit, and build tupletree; However, to ack the processed tuple, fail

Source code analysis

SpoutOutputCollectorImpl.emit

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

    @Override
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        try {
            return sendSpoutMsg(streamId, tuple, messageId, null);
        } catch (InterruptedException e) {
            LOG.warn("Spout thread interrupted during emit().");
            throw new RuntimeException(e);
        }
    }

    private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws
        InterruptedException {
        emittedCount.increment();

        List<Integer> outTasks;
        if (outTaskId != null) {
            outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
        } else {
            outTasks = taskData.getOutgoingTasks(stream, values);
        }

        final boolean needAck = (messageId != null) && hasAckers;

        final List<Long> ackSeq = needAck ? new ArrayList<>() : null;

        final long rootId = needAck ? MessageId.generateId(random) : 0;

        for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators.
            Integer t = outTasks.get(i);
            MessageId msgId;
            if (needAck) {
                long as = MessageId.generateId(random);
                msgId = MessageId.makeRootId(rootId, as);
                ackSeq.add(as);
            } else {
                msgId = MessageId.makeUnanchored();
            }

            final TupleImpl tuple =
                new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
            AddressedTuple adrTuple = new AddressedTuple(t, tuple);
            executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
        }
        if (isEventLoggers) {
            taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
        }

        if (needAck) {
            boolean sample = executor.samplerCheck();
            TupleInfo info = new TupleInfo();
            info.setTaskId(this.taskId);
            info.setStream(stream);
            info.setMessageId(messageId);
            if (isDebug) {
                info.setValues(values);
            }
            if (sample) {
                info.setTimestamp(System.currentTimeMillis());
            }

            pending.put(rootId, info);
            List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
            taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
        } else if (messageId != null) {
            // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
            if (isDebug) {
                if (spoutExecutorThdId != Thread.currentThread().getId()) {
                    throw new RuntimeException("Detected background thread emitting tuples for the spout. " +
                                               "Spout Output Collector should only emit from the main spout executor thread.");
                }
            }
            globalTupleInfo.clear();
            globalTupleInfo.setStream(stream);
            globalTupleInfo.setValues(values);
            globalTupleInfo.setMessageId(messageId);
            globalTupleInfo.setTimestamp(0);
            globalTupleInfo.setId("0:");
            Long timeDelta = 0L;
            executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo);
        }
        return outTasks;
    }
  • For needAck, first create rootId, then call ackSeq.add(as), and then trigger taskdata.sendunchored (acker.acker _ init _ stream _ id, ackinituple, executor.getexecutortransfer (), executor.getpendingemissions ())

BoltOutputCollectorImpl.ack&fail

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java

    @Override
    public void ack(Tuple input) {
        if (!ackingEnabled) {
            return;
        }
        long ackValue = ((TupleImpl) input).getAckVal();
        Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
        for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
            task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID,
                                new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
                                executor.getExecutorTransfer(), executor.getPendingEmits());
        }
        long delta = tupleTimeDelta((TupleImpl) input);
        if (isDebug) {
            LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
        }

        if (!task.getUserContext().getHooks().isEmpty()) {
            BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
            boltAckInfo.applyOn(task.getUserContext());
        }
        if (delta >= 0) {
            executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta,
                                               task.getTaskMetrics().getAcked(input.getSourceStreamId()));
        }
    }

    @Override
    public void fail(Tuple input) {
        if (!ackingEnabled) {
            return;
        }
        Set<Long> roots = input.getMessageId().getAnchors();
        for (Long root : roots) {
            task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
                                new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits());
        }
        long delta = tupleTimeDelta((TupleImpl) input);
        if (isDebug) {
            LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
        }
        BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
        boltFailInfo.applyOn(task.getUserContext());
        if (delta >= 0) {
            executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta,
                                                task.getTaskMetrics().getFailed(input.getSourceStreamId()));
        }
    }
  • Both ack and fail of BoltOutputCollectorImpl call task.sendUnanchored operation
  • Ack sent to Acker.ACKER_ACK_STREAM_ID, fail sent to Acker.ACKER_FAIL_STREAM_ID

Task.sendUnanchored

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java

    // Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument
    public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
        Tuple tuple = getTuple(stream, values);
        List<Integer> tasks = getOutgoingTasks(stream, values);
        for (Integer t : tasks) {
            AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
            transfer.tryTransfer(addressedTuple, pendingEmits);
        }
    }
  • Trytransfer is called here.

ExecutorTransfer.tryTransfer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java

    // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
    public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
        if (isDebug) {
            LOG.info("TRANSFERRING tuple {}", addressedTuple);
        }

        JCQueue localQueue = getLocalQueue(addressedTuple);
        if (localQueue != null) {
            return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
        }
        return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer);
    }

    /**
     * Adds tuple to localQueue (if overflow is empty). If localQueue is full adds to pendingEmits instead. pendingEmits can be null.
     * Returns false if unable to add to localQueue.
     */
    public boolean tryTransferLocal(AddressedTuple tuple, JCQueue localQueue, Queue<AddressedTuple> pendingEmits) {
        workerData.checkSerialize(serializer, tuple);
        if (pendingEmits != null) {
            if (pendingEmits.isEmpty() && localQueue.tryPublish(tuple)) {
                queuesToFlush.set(tuple.dest - indexingBase, localQueue);
                return true;
            } else {
                pendingEmits.add(tuple);
                return false;
            }
        } else {
            return localQueue.tryPublish(tuple);
        }
    }
  • First, it is determined whether the target queue is local according to the addressedTuple. If so, tryTransferLocal; is called. If not, call workerData.tryTransferRemote
  • TryTransferLocal operation, executed by localQueue.tryPublish, is to put the data into the recvQueue queue of JCQueue.
  • WorkerData.tryTransferRemote is to transfer the data to the TransferDrainer through the WorkerTransfer and transfer the data to the remote node node when flush.

StormCommon.systemTopology

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

    public static StormTopology systemTopology(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
        return _instance.systemTopologyImpl(topoConf, topology);
    }

    protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
        validateBasic(topology);

        StormTopology ret = topology.deepCopy();
        addAcker(topoConf, ret);
        if (hasEventLoggers(topoConf)) {
            addEventLogger(topoConf, ret);
        }
        addMetricComponents(topoConf, ret);
        addSystemComponents(topoConf, ret);
        addMetricStreams(ret);
        addSystemStreams(ret);

        validateStructure(ret);

        return ret;
    }

    public static void addAcker(Map<String, Object> conf, StormTopology topology) {
        int ackerNum =
            ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
        Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);

        Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
        outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
        outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
        outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));

        Map<String, Object> ackerConf = new HashMap<>();
        ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
        ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));

        Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);

        for (Bolt bolt : topology.get_bolts().values()) {
            ComponentCommon common = bolt.get_common();
            common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
            common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
            common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
        }

        for (SpoutSpec spout : topology.get_spouts().values()) {
            ComponentCommon common = spout.get_common();
            Map<String, Object> spoutConf = componentConf(spout);
            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
                          ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
            common.set_json_conf(JSONValue.toJSONString(spoutConf));
            common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
                                  Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
        }

        topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
    }

    public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
        Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
        Set<String> boltIds = topology.get_bolts().keySet();
        Set<String> spoutIds = topology.get_spouts().keySet();

        for (String id : spoutIds) {
            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("id")));
        }

        for (String id : boltIds) {
            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("id")));
            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("id")));
            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("id")));
        }
        return inputs;
    }

    public static IBolt makeAckerBolt() {
        return _instance.makeAckerBoltImpl();
    }

    public IBolt makeAckerBoltImpl() {
        return new Acker();
    }
  • The WorkerState constructor calls the systemTopology method and adds some system components, such as Acker, MetricsConsumerBolt, SystemBolt
  • AddAcker executes the logic to create ack, ackerNum is objectreader.getint (conf.get (config.topology _ acker _ executors), objectreader.getint (conf.get (config.topology _ workers))). That is, if config.topology _ acker _ executors is not configured, the value of Config.TOPOLOGY_WORKERS is taken.
  • Config.topology _ tick _ tuple _ freq _ secs is configured here for ack, and the value is objectreader.getint (conf.get (config.topology _ message _ timeout _ secs)), that is, when Acker configures tickTuple, config.topology _ message _ timeout _ secs, timeout operation is triggered.
  • When THRIFT. PREPARESerializedBOLTDETAILS passes in parameters, it calls the makeAckerBolt () method to create Acker
  • Acker.ACKER_ACK_STREAM_ID, Acker.ACKER_FAIL_STREAM_ID are configured for input and output in ack
  • AddAcker configured config.topology _ tick _ tuple _ freq _ secs, Acker.ACKER_ACK_STREAM_ID, Acker.ACKER_FAIL_STREAM_ID, acker.acker _ reset _ timeout _ stream _ id for spout.

Acker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Acker.java

public class Acker implements IBolt {
    public static final String ACKER_COMPONENT_ID = "__acker";
    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
    public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout";
    public static final int TIMEOUT_BUCKET_NUM = 3;
    private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
    private static final long serialVersionUID = 4430906880683183091L;
    private OutputCollector collector;
    private RotatingMap<Object, AckObject> pending;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM);
    }

    @Override
    public void execute(Tuple input) {
        if (TupleUtils.isTick(input)) {
            Map<Object, AckObject> tmp = pending.rotate();
            LOG.debug("Number of timeout tuples:{}", tmp.size());
            return;
        }

        boolean resetTimeout = false;
        String streamId = input.getSourceStreamId();
        Object id = input.getValue(0);
        AckObject curr = pending.get(id);
        if (ACKER_INIT_STREAM_ID.equals(streamId)) {
            if (curr == null) {
                curr = new AckObject();
                pending.put(id, curr);
            }
            curr.updateAck(input.getLong(1));
            curr.spoutTask = input.getInteger(2);
        } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
            if (curr == null) {
                curr = new AckObject();
                pending.put(id, curr);
            }
            curr.updateAck(input.getLong(1));
        } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
            // For the case that ack_fail message arrives before ack_init
            if (curr == null) {
                curr = new AckObject();
            }
            curr.failed = true;
            pending.put(id, curr);
        } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
            resetTimeout = true;
            if (curr != null) {
                pending.put(id, curr);
            } //else if it has not been added yet, there is no reason time it out later on
        } else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
            collector.flush();
            return;
        } else {
            LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
            return;
        }

        int task = curr.spoutTask;
        if (task >= 0 && (curr.val == 0 || curr.failed || resetTimeout)) {
            Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
            if (curr.val == 0) {
                pending.remove(id);
                collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple);
            } else if (curr.failed) {
                pending.remove(id);
                collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple);
            } else if (resetTimeout) {
                collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple);
            } else {
                throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code.");
            }
        }

        collector.ack(input);
    }

    @Override
    public void cleanup() {
        LOG.info("Acker: cleanup successfully");
    }

    private long getTimeDeltaMillis(long startTimeMillis) {
        return Time.currentTimeMillis() - startTimeMillis;
    }

    private static class AckObject {
        public long val = 0L;
        public long startTime = Time.currentTimeMillis();
        public int spoutTask = -1;
        public boolean failed = false;

        // val xor value
        public void updateAck(Long value) {
            val = Utils.bitXor(val, value);
        }
    }
}
  • For tickTuple, perform the RotatingMap.rotate operation
  • For success, the updateAck operation of AckObject is called, and for failure, the updateack operation is put back into pending
  • Finally, it is judged that if val of AckObject is 0, indicating that the entire tuple tree has been successfully operated, ACKER_ACK_STREAM_ID is notified. If it is failed, notify ACKER_FAIL_STREAM_ID; If resetTimeout, notify ACKER_RESET_TIMEOUT_STREAM_ID

SpoutExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

public class SpoutExecutor extends Executor {
    //......
    @Override
    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
        String streamId = tuple.getSourceStreamId();
        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
            spoutOutputCollector.flush();
        } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
            pending.rotate();
        } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
            metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
        } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
            Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
            if (spoutObj instanceof ICredentialsListener) {
                ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
            }
        } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
            Long id = (Long) tuple.getValue(0);
            TupleInfo pendingForId = pending.get(id);
            if (pendingForId != null) {
                pending.put(id, pendingForId);
            }
        } else {
            Long id = (Long) tuple.getValue(0);
            Long timeDeltaMs = (Long) tuple.getValue(1);
            TupleInfo tupleInfo = pending.remove(id);
            if (tupleInfo != null && tupleInfo.getMessageId() != null) {
                if (taskId != tupleInfo.getTaskId()) {
                    throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
                }
                Long timeDelta = null;
                if (hasAckers) {
                    long startTimeMs = tupleInfo.getTimestamp();
                    if (startTimeMs != 0) {
                        timeDelta = timeDeltaMs;
                    }
                }
                if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
                    ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
                } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
                    failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
                }
            }
        }
    }

    public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
        try {
            ISpout spout = (ISpout) taskData.getTaskObject();
            int taskId = taskData.getTaskId();
            if (executor.getIsDebug()) {
                LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
            }
            spout.ack(tupleInfo.getMessageId());
            if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
                new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
            }
            if (hasAckers && timeDelta != null) {
                executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta,
                                                    taskData.getTaskMetrics().getAcked(tupleInfo.getStream()));
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
        try {
            ISpout spout = (ISpout) taskData.getTaskObject();
            int taskId = taskData.getTaskId();
            if (executor.getIsDebug()) {
                LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getId(), tupleInfo, reason);
            }
            spout.fail(tupleInfo.getMessageId());
            new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
            if (timeDelta != null) {
                executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta,
                                                     taskData.getTaskMetrics().getFailed(tupleInfo.getStream()));
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
}
  • SpoutExecutor in tupleActionFn, if ACKER_ACK_STREAM_ID is received, ackSpoutMsg operation is performed; If ACKER_FAIL_STREAM_ID is received, failSpoutMsg is performed
  • The ack and fail methods of the specific spout are called in the ackSpoutMsg and failSpoutMsg of SpoutExecutor, respectively, to notify the result of ack to the original spout.

Summary

  • Storm guarantees the semantics of least once processing through ack mechanism
  • Storm called the systemTopology method in the WorkerState constructor and added some system components to the submitted topology, such as Acker, MetricsConsumerBolt, SystemBolt; ; Acker is added to addAcker, and ack-related configuration is also carried out on spout.
  • If spout’s emit method has messageId, it indicates that ack is required, and then the TaskData. SendUnanchored (Acker. Acker _ Init _ Stream _ ID, Ackinituple, Executor. GetExecutorTransfer (), Executor. GetPendingEmits ()) operation will be triggered.
  • Bolt sends the ack information through the BoltOutputCollectorImpl’s ack or fail method, in which task.sendUnanchored operation is called, and the operation is to call ExecutorTransfer.tryTransfer to send the addressedTuple to the target queue (If it is a remote node, the remote call will be made remotely.Acker _ ack _ stream _ id or Acker.ACKER_ACK_STREAM_ID _ stream _ id
  • Acker received Acker.ACKER_ACK_STREAM_ID’s updateAck operation calling AckObject. Acker.ACKER_FAIL_STREAM_ID is put back into pending again, and then the val of AckObject is judged. If it is 0, it means that the whole tuple tree operation is successful, then emitDirect notifies ACKER_ACK_STREAM_ID; If it is failed, emitDirect notifies the corresponding task; to ACKER_FAIL_STREAM_ID; If resetTimeout, notify the corresponding task to ACKER_RESET_TIMEOUT_STREAM_ID.
  • SpoutExecutor receives ACKER_ACK_STREAM_ID and performs ackSpoutMsg operation. If ACKER_FAIL_STREAM_ID is received, failSpoutMsg operation is performed; AckSpoutMsg and failSpoutMsg call ack and fail methods of specific spout respectively to notify the result of ack to the original spout.

doc