Talk about storm’s ICommitterTridentSpout

  storm

Order

This article mainly studies storm’s ICommitterTridentSpout

ICommitterTridentSpout

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/spout/ICommitterTridentSpout.java

public interface ICommitterTridentSpout<X> extends ITridentSpout<X> {
    public interface Emitter extends ITridentSpout.Emitter {
        void commit(TransactionAttempt attempt);
    } 
    
    @Override
    public Emitter getEmitter(String txStateId, Map conf, TopologyContext context);    
}
  • ICommitterTridentSpout inherits ITridentSpout, mainly covering getEmitter method and returning extended Emitter, which inherits ITridentSpout.Emitter and defines one more commit interface.

TridentTopologyBuilder.buildTopology

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java

   public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
        TopologyBuilder builder = new TopologyBuilder();
        Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
        Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);

        Map<String, List<String>> batchesToCommitIds = new HashMap<>();
        Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
        
        for(String id: _spouts.keySet()) {
            TransactionalSpoutComponent c = _spouts.get(id);
            if(c.spout instanceof IRichSpout) {
                
                //TODO: wrap this to set the stream name
                builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
            } else {
                String batchGroup = c.batchGroupId;
                if(!batchesToCommitIds.containsKey(batchGroup)) {
                    batchesToCommitIds.put(batchGroup, new ArrayList<String>());
                }
                batchesToCommitIds.get(batchGroup).add(c.commitStateId);

                if(!batchesToSpouts.containsKey(batchGroup)) {
                    batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
                }
                batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
                
                
                BoltDeclarer scd =
                      builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                
                for(Map<String, Object> m: c.componentConfs) {
                    scd.addConfigurations(m);
                }
                
                Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
                specs.put(c.batchGroupId, new CoordSpec());
                BoltDeclarer bd = builder.setBolt(id,
                        new TridentBoltExecutor(
                          new TridentSpoutExecutor(
                            c.commitStateId,
                            c.streamName,
                            ((ITridentSpout) c.spout)),
                            batchIdsForSpouts,
                            specs),
                        c.parallelism);
                bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
                bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                if(c.spout instanceof ICommitterTridentSpout) {
                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
                }
                for(Map<String, Object> m: c.componentConfs) {
                    bd.addConfigurations(m);
                }
            }
        }
        
        //......

        return builder.createTopology();
    }
  • When Trident Topology Builder. BuildTopology judges the user’s spout, if it is of ICommitterTridentSpout type, ALl GROUP (MASTER COORDINATOR. COMMIT _ STREAM _ ID) will be configured.

MasterBatchCoordinator

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/MasterBatchCoordinator.java

    @Override
    public void nextTuple() {
        sync();
    }

    private void sync() {
        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
        // max_spout_pending = 3
        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
        if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
            LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
        }
        
        if(_active) {
            if(_activeTx.size() < _maxTransactionActive) {
                Long curr = _currTransaction;
                for(int i=0; i<_maxTransactionActive; i++) {
                    if(!_activeTx.containsKey(curr) && isReady(curr)) {
                        // by using a monotonically increasing attempt id, downstream tasks
                        // can be memory efficient by clearing out state for old attempts
                        // as soon as they see a higher attempt id for a transaction
                        Integer attemptId = _attemptIds.get(curr);
                        if(attemptId==null) {
                            attemptId = 0;
                        } else {
                            attemptId++;
                        }
                        _attemptIds.put(curr, attemptId);
                        for(TransactionalState state: _states) {
                            state.setData(CURRENT_ATTEMPTS, _attemptIds);
                        }
                        
                        TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                        final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                        _activeTx.put(curr, newTransactionStatus);
                        _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                        LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);
                        _throttler.markEvent();
                    }
                    curr = nextTransactionId(curr);
                }
            }
        }
    }

    @Override
    public void ack(Object msgId) {
        TransactionAttempt tx = (TransactionAttempt) msgId;
        TransactionStatus status = _activeTx.get(tx.getTransactionId());
        LOG.debug("Ack. [tx_attempt = {}], [tx_status = {}], [{}]", tx, status, this);
        if(status!=null && tx.equals(status.attempt)) {
            if(status.status==AttemptStatus.PROCESSING) {
                status.status = AttemptStatus.PROCESSED;
                LOG.debug("Changed status. [tx_attempt = {}] [tx_status = {}]", tx, status);
            } else if(status.status==AttemptStatus.COMMITTING) {
                _activeTx.remove(tx.getTransactionId());
                _attemptIds.remove(tx.getTransactionId());
                _collector.emit(SUCCESS_STREAM_ID, new Values(tx));
                _currTransaction = nextTransactionId(tx.getTransactionId());
                for(TransactionalState state: _states) {
                    state.setData(CURRENT_TX, _currTransaction);                    
                }
                LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", SUCCESS_STREAM_ID, tx, status, this);
            }
            sync();
        }
    }
  • When the MasterBatchCoordinator receives the ack, if the status is AttemptStatus.PROCESSING status, it will change the status to ATTEMPT status. PROCESSED; If status is AttemptStatus.COMMITTING, a tuple; is transmitted to SUCCESS_STREAM_ID; Then call the sync method.
  • The nextTuple method also calls the sync method, judging that if it is the AttemptStatus.PROCESSED state, the status will be changed to ATTEMPT status. COMMITTING, and the tuple will be transmitted to COMMIT_STREAM_ID at the same time.
  • You can see that the status here has changed from AttemptStatus.PROCESSING to AttemptStatus.PROCESSED (The nextTuple method changes the AttemptStatus.PROCESSED to AttemptStatus.COMMITTING, and then transmits the tuple to COMMIT_STREAM_ID., and then changed to AttemptStatus.COMMITTING (When ack, if it is AttemptStatus.COMMITTING STATUS, a tuple is transmitted to SUCCESS_STREAM_ID)

TridentSpoutExecutor

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/spout/TridentSpoutExecutor.java

    public void execute(BatchInfo info, Tuple input) {
        // there won't be a BatchInfo for the success stream
        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
        if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
            if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
                ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
                _activeBatches.remove(attempt.getTransactionId());
            } else {
                 throw new FailedException("Received commit for different transaction attempt");
            }
        } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            // valid to delete before what's been committed since 
            // those batches will never be accessed again
            _activeBatches.headMap(attempt.getTransactionId()).clear();
            _emitter.success(attempt);
        } else {            
            _collector.setBatch(info.batchId);
            _emitter.emitBatch(attempt, input.getValue(1), _collector);
            _activeBatches.put(attempt.getTransactionId(), attempt);
        }
    }
  • When execute, TridentSpoutExecutor judges that if it is the data of MasterBatchCoordinator. Commit _ Stream _ ID and the txid of TransactionAttempt are equal, it calls ((icommittetRidentSport. Emitter) _ Emitter). Commit (ATTEMPT)

TridentBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java

    public void execute(Tuple tuple) {
        if (TupleUtils.isTick(tuple)) {
            long now = System.currentTimeMillis();
            if (now - _lastRotate > _messageTimeoutMs) {
                _batches.rotate();
                _lastRotate = now;
            }
            return;
        }
        String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
        if (batchGroup == null) {
            // this is so we can do things like have simple DRPC that doesn't need to use batch processing
            _coordCollector.setCurrBatch(null);
            _bolt.execute(null, tuple);
            _collector.ack(tuple);
            return;
        }
        IBatchID id = (IBatchID) tuple.getValue(0);
        //get transaction id
        //if it already exists and attempt id is greater than the attempt there


        TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
        //        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
        //            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
        //                    + " (" + _batches.size() + ")" +
        //                    "\ntuple: " + tuple +
        //                    "\nwith tracked " + tracked +
        //                    "\nwith id " + id +
        //                    "\nwith group " + batchGroup
        //                    + "\n");
        //
        //        }
        //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());

        // this code here ensures that only one attempt is ever tracked for a batch, so when
        // failures happen you don't get an explosion in memory usage in the tasks
        if (tracked != null) {
            if (id.getAttemptId() > tracked.attemptId) {
                _batches.remove(id.getId());
                tracked = null;
            } else if (id.getAttemptId() < tracked.attemptId) {
                // no reason to try to execute a previous attempt than we've already seen
                return;
            }
        }

        if (tracked == null) {
            tracked =
                new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup),
                                 id.getAttemptId());
            _batches.put(id.getId(), tracked);
        }
        _coordCollector.setCurrBatch(tracked);

        //System.out.println("TRACKED: " + tracked + " " + tuple);

        TupleType t = getTupleType(tuple, tracked);
        if (t == TupleType.COMMIT) {
            tracked.receivedCommit = true;
            checkFinish(tracked, tuple, t);
        } else if (t == TupleType.COORD) {
            int count = tuple.getInteger(1);
            tracked.reportedTasks++;
            tracked.expectedTupleCount += count;
            checkFinish(tracked, tuple, t);
        } else {
            tracked.receivedTuples++;
            boolean success = true;
            try {
                _bolt.execute(tracked.info, tuple);
                if (tracked.condition.expectedTaskReports == 0) {
                    success = finishBatch(tracked, tuple);
                }
            } catch (FailedException e) {
                failBatch(tracked, e);
            }
            if (success) {
                _collector.ack(tuple);
            } else {
                _collector.fail(tuple);
            }
        }
        _coordCollector.setCurrBatch(null);
    }
  • After calling _ bolt.execute (tracked.info, tuple) again, it will call _collector.ack(tuple) to complete ack.

SpoutOutputCollector

storm-core-1.2.2-sources.jar! /org/apache/storm/spout/SpoutOutputCollector.java

    /**
     * Emits a new tuple to the specified output stream with the given message ID.
     * When Storm detects that this tuple has been fully processed, or has failed
     * to be fully processed, the spout will receive an ack or fail callback respectively
     * with the messageId as long as the messageId was not null. If the messageId was null,
     * Storm will not track the tuple and no callback will be received. 
     * Note that Storm's event logging functionality will only work if the messageId
     * is serializable via Kryo or the Serializable interface. The emitted values must be immutable.
     *
     * @return the list of task ids that this tuple was sent to
     */
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        return _delegate.emit(streamId, tuple, messageId);
    }
  • Emit of _delegate.emit is called here, and _delegate here is SpoutOutputCollectorImpl.

SpoutOutputCollectorImpl

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        try {
            return sendSpoutMsg(streamId, tuple, messageId, null);
        } catch (InterruptedException e) {
            LOG.warn("Spout thread interrupted during emit().");
            throw new RuntimeException(e);
        }
    }

    private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws
        InterruptedException {
        emittedCount.increment();

        List<Integer> outTasks;
        if (outTaskId != null) {
            outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
        } else {
            outTasks = taskData.getOutgoingTasks(stream, values);
        }

        final boolean needAck = (messageId != null) && hasAckers;

        final List<Long> ackSeq = needAck ? new ArrayList<>() : null;

        final long rootId = needAck ? MessageId.generateId(random) : 0;

        for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators.
            Integer t = outTasks.get(i);
            MessageId msgId;
            if (needAck) {
                long as = MessageId.generateId(random);
                msgId = MessageId.makeRootId(rootId, as);
                ackSeq.add(as);
            } else {
                msgId = MessageId.makeUnanchored();
            }

            final TupleImpl tuple =
                new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
            AddressedTuple adrTuple = new AddressedTuple(t, tuple);
            executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
        }
        if (isEventLoggers) {
            taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
        }

        if (needAck) {
            boolean sample = executor.samplerCheck();
            TupleInfo info = new TupleInfo();
            info.setTaskId(this.taskId);
            info.setStream(stream);
            info.setMessageId(messageId);
            if (isDebug) {
                info.setValues(values);
            }
            if (sample) {
                info.setTimestamp(System.currentTimeMillis());
            }

            pending.put(rootId, info);
            List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
            taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
        } else if (messageId != null) {
            // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
            if (isDebug) {
                if (spoutExecutorThdId != Thread.currentThread().getId()) {
                    throw new RuntimeException("Detected background thread emitting tuples for the spout. " +
                                               "Spout Output Collector should only emit from the main spout executor thread.");
                }
            }
            globalTupleInfo.clear();
            globalTupleInfo.setStream(stream);
            globalTupleInfo.setValues(values);
            globalTupleInfo.setMessageId(messageId);
            globalTupleInfo.setTimestamp(0);
            globalTupleInfo.setId("0:");
            Long timeDelta = 0L;
            executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo);
        }
        return outTasks;
    }
  • If neekAck is used here, TaskData.Sendunanchored (Acker.Acker _ Init _ Stream _ ID, Ackinituple, Executor.GetExecutorTransfer (), Executor.GetPendingEmits ()) will be called.
  • Note that ackInitTuple here is values (rootid, utils. bitxorvalues (ackseq), this. taskid). the second value performs utils. bitxorvalues on List<Long> ackSeq
  • AckSeq is an empty list without outTask, and its Utils.bitXorVals operation is 0

Utils

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/Utils.java

    public static long bitXorVals(List<Long> coll) {
        long result = 0;
        for (Long val : coll) {
            result ^= val;
        }
        return result;
    }

    public static long bitXor(Long a, Long b) {
        return a ^ b;
    }
  • BitXor operation is the core operation of storm’s ack mechanism

Acker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Acker.java

    public void execute(Tuple input) {
        if (TupleUtils.isTick(input)) {
            Map<Object, AckObject> tmp = pending.rotate();
            LOG.debug("Number of timeout tuples:{}", tmp.size());
            return;
        }

        boolean resetTimeout = false;
        String streamId = input.getSourceStreamId();
        Object id = input.getValue(0);
        AckObject curr = pending.get(id);
        if (ACKER_INIT_STREAM_ID.equals(streamId)) {
            if (curr == null) {
                curr = new AckObject();
                pending.put(id, curr);
            }
            curr.updateAck(input.getLong(1));
            curr.spoutTask = input.getInteger(2);
        } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
            if (curr == null) {
                curr = new AckObject();
                pending.put(id, curr);
            }
            curr.updateAck(input.getLong(1));
        } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
            // For the case that ack_fail message arrives before ack_init
            if (curr == null) {
                curr = new AckObject();
            }
            curr.failed = true;
            pending.put(id, curr);
        } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
            resetTimeout = true;
            if (curr != null) {
                pending.put(id, curr);
            } //else if it has not been added yet, there is no reason time it out later on
        } else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
            collector.flush();
            return;
        } else {
            LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
            return;
        }

        int task = curr.spoutTask;
        if (task >= 0 && (curr.val == 0 || curr.failed || resetTimeout)) {
            Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
            if (curr.val == 0) {
                pending.remove(id);
                collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple);
            } else if (curr.failed) {
                pending.remove(id);
                collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple);
            } else if (resetTimeout) {
                collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple);
            } else {
                throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code.");
            }
        }

        collector.ack(input);
    }

    private static class AckObject {
        public long val = 0L;
        public long startTime = Time.currentTimeMillis();
        public int spoutTask = -1;
        public boolean failed = false;

        // val xor value
        public void updateAck(Long value) {
            val = Utils.bitXor(val, value);
        }
    }
  • When Acker receives ACKER_INIT_STREAM_ID, if the current AckObject is null, an AckObject is created, and its val defaults to 0; After that, curr.updateAck(input.getLong(1)) is called, that is, val of AckObject is updated according to the second value of tuple.
  • The tuple transmitted by SpoutOutputCollectorImpl is values (rootid, utils. bitxorValues (Ackseq), this. taskid), and its second value is utils. bitxorValues (Ackseq); AskSeq is List<Long >. when there is no outputTask, its list is empty, and Utils.bitXorVals value is 0. in this case, curr.updateAck(0) returns 0
  • Acker will judge at the end of execute that collector.emitdirect (task, acker _ ack _ stream _ id, tuple) will be triggered if curr.val == 0

SpoutExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
        String streamId = tuple.getSourceStreamId();
        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
            spoutOutputCollector.flush();
        } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
            pending.rotate();
        } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
            metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
        } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
            Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
            if (spoutObj instanceof ICredentialsListener) {
                ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
            }
        } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
            Long id = (Long) tuple.getValue(0);
            TupleInfo pendingForId = pending.get(id);
            if (pendingForId != null) {
                pending.put(id, pendingForId);
            }
        } else {
            Long id = (Long) tuple.getValue(0);
            Long timeDeltaMs = (Long) tuple.getValue(1);
            TupleInfo tupleInfo = pending.remove(id);
            if (tupleInfo != null && tupleInfo.getMessageId() != null) {
                if (taskId != tupleInfo.getTaskId()) {
                    throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
                }
                Long timeDelta = null;
                if (hasAckers) {
                    long startTimeMs = tupleInfo.getTimestamp();
                    if (startTimeMs != 0) {
                        timeDelta = timeDeltaMs;
                    }
                }
                if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
                    ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
                } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
                    failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
                }
            }
        }
    }

    public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
        try {
            ISpout spout = (ISpout) taskData.getTaskObject();
            int taskId = taskData.getTaskId();
            if (executor.getIsDebug()) {
                LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
            }
            spout.ack(tupleInfo.getMessageId());
            if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
                new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
            }
            if (hasAckers && timeDelta != null) {
                executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta,
                                                    taskData.getTaskMetrics().getAcked(tupleInfo.getStream()));
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
  • When SpoutExecutor receives Acker.ackER_ACK_STREAM_ID, it calls the ackSpoutMsg method, which calls back the ACK method of the original spout, namely SPOUT. ACK (tupleinfo. getMessageID ())

Summary

  • When MasterBatchCoordinator receives the ack of the same msgId for the first time (Called for the first time), the status is changed from the initial AttemptStatus.PROCESSING to AttemptStatus.PROCESSED. in the following sync method, AttemptStatus.PROCESSED is changed to AttemptStatus.COMMITTING, and then a tuple is transmitted to masterbatch coordinator.commit _ stream _ id.
  • When the user’s spout is ICommitterTridentSpout and TRIDENTOPOLOGY BUILDER. BUILD TOPOLOGY, ALl GROUP (MASTER COORDINATOR, MASTERBATCH COORDINATOR. COMMIT _ STREAM _ ID) will be configured; TridentSpoutExecutor will receive the data of MasterBatchCoordinator. Commit _ Stream _ ID, and then call ((icommittertridentSport. Emitter) _ Emitter). Commit (ATTEMPT) method; After that, TridentBoltExecutor will automatically ack the tuple after TridentSpoutExecutor.execute finishes executing, and then call the ack method of MasterBatchCoordinator (Called the second timeEmit (success _ stream _ id, newvalues (tx))
  • When the user’s spout is not ICommitterTridentSpout, there is no component in the whole topology to receive the tuple transmitted by masterbatch coordinator.commit _ stream _ id at this time, i.e. outgoingTasks is empty. Then under the condition of needAck, SpoutOutputCollectorImpl will send a tuple to Acker.ACKER_INIT_STREAM_ID, the second value of which is Utils.bitXorVals(ackSeq), and ackSeq is an empty list (Calculated by outgoingTasks), the value is 0; Then when Acker receives ACKER_INIT_STREAM_ID, the value of curr.val is 0 after curr.updateAck(input.getLong(1)); In this way, Acker will see that curr.val is 0 at the end of execute, and will send a tuple to Acker.ackER_ACK_STREAM_ID. spoutExecutor will call the ackSpoutMsg method when it receives Acker.ACKER_ACK_STREAM_ID, which will call back the ACK method of the original Spout, namely, Spout. ACK (tupleinfo. getMessageID ()); That is, when a streamId is not consumed by component, it will automatically ack;; In this way, for the case where spout is not ICommitterTridentSpout, after transmitting tuple to MasterBatchCoordinator.commit _ stream _ id, the ack method of masterbatch coordinator will be called (Called the second timeEmit (success _ stream _ id, newvalues (tx))

The difference between whether spout is ICOMTTERRIDDENTSPORT type or not is that it is not ICOMTTERRIDDENTSPORT type. After transmitting a tuple to MasterBatchCoordinator.comMIT _ STREAM _ ID, Acker will automatically ack and call the ack method of MasterBatchCoordinator (Called the second time); On the other hand, the ICommitterTridentSpout type executes the ((icommittertridentsport.emitter) _ emitter). commit (attmpt) method first, then the TridentBoltExecutor will acknowledge it, and then the MasterBatchCoordinator’s ack method (Called the second time); In a successful scenario, both will eventually send a tuple to SUCCESS_STREAM_ID

doc