Talk about storetritent’s coordinator.

  storm

Order

This article mainly studies the coordinator of storm trident.

Example

Code sample

    @Test
    public void testDebugTopologyBuild(){
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,
                new Values("nickt1", 4),
                new Values("nickt2", 7),
                new Values("nickt3", 8),
                new Values("nickt4", 9),
                new Values("nickt5", 7),
                new Values("nickt6", 11),
                new Values("nickt7", 5)
        );
        spout.setCycle(false);
        TridentTopology topology = new TridentTopology();
        Stream stream1 = topology.newStream("spout1",spout)
                .each(new Fields("user", "score"), new BaseFunction() {
                    @Override
                    public void execute(TridentTuple tuple, TridentCollector collector) {
                        System.out.println("tuple:"+tuple);
                    }
                },new Fields());

        topology.build();
    }
  • Spout used here is FixedBatchSpout, which is of the IBatchSpout type.

Topological graph

MasterBatchCoordinator

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java

public class MasterBatchCoordinator extends BaseRichSpout { 
    public static final Logger LOG = LoggerFactory.getLogger(MasterBatchCoordinator.class);
    
    public static final long INIT_TXID = 1L;
    
    
    public static final String BATCH_STREAM_ID = "$batch";
    public static final String COMMIT_STREAM_ID = "$commit";
    public static final String SUCCESS_STREAM_ID = "$success";

    private static final String CURRENT_TX = "currtx";
    private static final String CURRENT_ATTEMPTS = "currattempts";
    
    private List<TransactionalState> _states = new ArrayList();
    
    TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>();
    TreeMap<Long, Integer> _attemptIds;
    
    private SpoutOutputCollector _collector;
    Long _currTransaction;
    int _maxTransactionActive;
    
    List<ITridentSpout.BatchCoordinator> _coordinators = new ArrayList();
    
    
    List<String> _managedSpoutIds;
    List<ITridentSpout> _spouts;
    WindowedTimeThrottler _throttler;
    
    boolean _active = true;
    
    public MasterBatchCoordinator(List<String> spoutIds, List<ITridentSpout> spouts) {
        if(spoutIds.isEmpty()) {
            throw new IllegalArgumentException("Must manage at least one spout");
        }
        _managedSpoutIds = spoutIds;
        _spouts = spouts;
        LOG.debug("Created {}", this);
    }

    public List<String> getManagedSpoutIds(){
        return _managedSpoutIds;
    }

    @Override
    public void activate() {
        _active = true;
    }

    @Override
    public void deactivate() {
        _active = false;
    }
        
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
        for(String spoutId: _managedSpoutIds) {
            _states.add(TransactionalState.newCoordinatorState(conf, spoutId));
        }
        _currTransaction = getStoredCurrTransaction();

        _collector = collector;
        Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
        if(active==null) {
            _maxTransactionActive = 1;
        } else {
            _maxTransactionActive = active.intValue();
        }
        _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);

        
        for(int i=0; i<_spouts.size(); i++) {
            String txId = _managedSpoutIds.get(i);
            _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
        }
        LOG.debug("Opened {}", this);
    }

    @Override
    public void close() {
        for(TransactionalState state: _states) {
            state.close();
        }
        LOG.debug("Closed {}", this);
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // in partitioned example, in case an emitter task receives a later transaction than it's emitted so far,
        // when it sees the earlier txid it should know to emit nothing
        declarer.declareStream(BATCH_STREAM_ID, new Fields("tx"));
        declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx"));
        declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config ret = new Config();
        ret.setMaxTaskParallelism(1);
        ret.registerSerialization(TransactionAttempt.class);
        return ret;
    }

    //......
}
  • The prepare method starts with config.topology _ trident _ batch _ emit _ interval _ millis (Topology.tritent.batch.emit.interval.millis defaults to 500 in defaults.yaml) read the frequency configuration that triggered batch, and then create WindowedTimeThrottler with maxAmt value of 1
  • The transactionalState is used here to maintain the Transactional State on zookeeper.
  • Topology _ max _ sport _ pending (Topology.max.spout.pending is null by default in defaults.yaml) set _maxTransactionActive, or 1 if null

MasterBatchCoordinator.nextTuple

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java

    @Override
    public void nextTuple() {
        sync();
    }

    private void sync() {
        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
        // max_spout_pending = 3
        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
        if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
            LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
        }
        
        if(_active) {
            if(_activeTx.size() < _maxTransactionActive) {
                Long curr = _currTransaction;
                for(int i=0; i<_maxTransactionActive; i++) {
                    if(!_activeTx.containsKey(curr) && isReady(curr)) {
                        // by using a monotonically increasing attempt id, downstream tasks
                        // can be memory efficient by clearing out state for old attempts
                        // as soon as they see a higher attempt id for a transaction
                        Integer attemptId = _attemptIds.get(curr);
                        if(attemptId==null) {
                            attemptId = 0;
                        } else {
                            attemptId++;
                        }
                        _attemptIds.put(curr, attemptId);
                        for(TransactionalState state: _states) {
                            state.setData(CURRENT_ATTEMPTS, _attemptIds);
                        }
                        
                        TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                        final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                        _activeTx.put(curr, newTransactionStatus);
                        _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                        LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);
                        _throttler.markEvent();
                    }
                    curr = nextTransactionId(curr);
                }
            }
        }
    }
  • NextTuple is to call sync method, which is called in ack and fail. The sync method first submits to MasterBatchCoordinator.commit _ stream _ id ($commit) send tuple;; After that, according to the restrictions of _maxTransactionActive and WindowedTimeThrottler, the new TransactionAttempt is started only when the requirements are met, and the new transactionattempt is sent to the masterbatch coordinator.batch _ stream _ id ($batch) send tuple while marking the number of windowEvent under WindowedTimeThrottler.

MasterBatchCoordinator.ack

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java

    @Override
    public void ack(Object msgId) {
        TransactionAttempt tx = (TransactionAttempt) msgId;
        TransactionStatus status = _activeTx.get(tx.getTransactionId());
        LOG.debug("Ack. [tx_attempt = {}], [tx_status = {}], [{}]", tx, status, this);
        if(status!=null && tx.equals(status.attempt)) {
            if(status.status==AttemptStatus.PROCESSING) {
                status.status = AttemptStatus.PROCESSED;
                LOG.debug("Changed status. [tx_attempt = {}] [tx_status = {}]", tx, status);
            } else if(status.status==AttemptStatus.COMMITTING) {
                _activeTx.remove(tx.getTransactionId());
                _attemptIds.remove(tx.getTransactionId());
                _collector.emit(SUCCESS_STREAM_ID, new Values(tx));
                _currTransaction = nextTransactionId(tx.getTransactionId());
                for(TransactionalState state: _states) {
                    state.setData(CURRENT_TX, _currTransaction);                    
                }
                LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", SUCCESS_STREAM_ID, tx, status, this);
            }
            sync();
        }
    }
  • Ack mainly performs different operations according to the current transaction status. If it was the AttemptStatus.PROCESSING status before, it will be updated to AttemptStatus.PROCESSED;; If it was AttemptStatus.COMMITTING before, the current transaction is removed and then sent to MasterBatchCoordinator. SUCCESS _ STREAM _ ID ($success) send tuple, update _currTransaction to nextTransactionId;; Finally, call sync to trigger a new TransactionAttempt

MasterBatchCoordinator.fail

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java

    @Override
    public void fail(Object msgId) {
        TransactionAttempt tx = (TransactionAttempt) msgId;
        TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
        LOG.debug("Fail. [tx_attempt = {}], [tx_status = {}], [{}]", tx, stored, this);
        if(stored!=null && tx.equals(stored.attempt)) {
            _activeTx.tailMap(tx.getTransactionId()).clear();
            sync();
        }
    }
  • Fail method removes the current transaction from _activeTx, then empties the data with txId greater than the failed txId in _activeTx, and finally calls sync to determine whether it is time to trigger a new TransactionAttempt (Note that there is no change to _currTransaction here, so the sync method triggers the _txid of the new TransactionAttempt or the current failed _currTransaction)

TridentSpoutCoordinator

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/spout/TridentSpoutCoordinator.java

public class TridentSpoutCoordinator implements IBasicBolt {
    public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutCoordinator.class);
    private static final String META_DIR = "meta";

    ITridentSpout<Object> _spout;
    ITridentSpout.BatchCoordinator<Object> _coord;
    RotatingTransactionalState _state;
    TransactionalState _underlyingState;
    String _id;

    
    public TridentSpoutCoordinator(String id, ITridentSpout<Object> spout) {
        _spout = spout;
        _id = id;
    }
    
    @Override
    public void prepare(Map conf, TopologyContext context) {
        _coord = _spout.getCoordinator(_id, conf, context);
        _underlyingState = TransactionalState.newCoordinatorState(conf, _id);
        _state = new RotatingTransactionalState(_underlyingState, META_DIR);
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);

        if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            _state.cleanupBefore(attempt.getTransactionId());
            _coord.success(attempt.getTransactionId());
        } else {
            long txid = attempt.getTransactionId();
            Object prevMeta = _state.getPreviousState(txid);
            Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
            _state.overrideState(txid, meta);
            collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
        }
                
    }

    @Override
    public void cleanup() {
        _coord.close();
        _underlyingState.close();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(MasterBatchCoordinator.BATCH_STREAM_ID, new Fields("tx", "metadata"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config ret = new Config();
        ret.setMaxTaskParallelism(1);
        return ret;
    }   
}
  • The nextTuple of the TridentSpoutCoordinator is treated differently according to streamId.
  • If it is MasterBatchCoordinator. Success _ Stream _ ID ($success) indicates that the master side has successfully received ack, and then the coordinator clears the data before the txId, and then calls back the success method of ITridentSpout.BatchCoordinator
  • If it is masterbatch coordinator.batch _ stream _ id ($batch) to start a new TransactionAttempt, go to masterbatch coordinator.batch _ stream _ id ($batch) sends a tuple that will be received by the downstream bolt (In this example, the TridentSpoutExecutor is used to wrap the user spout.)

TridentBoltExecutor

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java

public class TridentBoltExecutor implements IRichBolt {
    public static final String COORD_STREAM_PREFIX = "$coord-";
    
    public static String COORD_STREAM(String batch) {
        return COORD_STREAM_PREFIX + batch;
    }

    RotatingMap<Object, TrackedBatch> _batches;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {        
        _messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L;
        _lastRotate = System.currentTimeMillis();
        _batches = new RotatingMap<>(2);
        _context = context;
        _collector = collector;
        _coordCollector = new CoordinatedOutputCollector(collector);
        _coordOutputCollector = new BatchOutputCollectorImpl(new OutputCollector(_coordCollector));
                
        _coordConditions = (Map) context.getExecutorData("__coordConditions");
        if(_coordConditions==null) {
            _coordConditions = new HashMap<>();
            for(String batchGroup: _coordSpecs.keySet()) {
                CoordSpec spec = _coordSpecs.get(batchGroup);
                CoordCondition cond = new CoordCondition();
                cond.commitStream = spec.commitStream;
                cond.expectedTaskReports = 0;
                for(String comp: spec.coords.keySet()) {
                    CoordType ct = spec.coords.get(comp);
                    if(ct.equals(CoordType.single())) {
                        cond.expectedTaskReports+=1;
                    } else {
                        cond.expectedTaskReports+=context.getComponentTasks(comp).size();
                    }
                }
                cond.targetTasks = new HashSet<>();
                for(String component: Utils.get(context.getThisTargets(),
                                        COORD_STREAM(batchGroup),
                                        new HashMap<String, Grouping>()).keySet()) {
                    cond.targetTasks.addAll(context.getComponentTasks(component));
                }
                _coordConditions.put(batchGroup, cond);
            }
            context.setExecutorData("_coordConditions", _coordConditions);
        }
        _bolt.prepare(conf, context, _coordOutputCollector);
    }

    //......

    @Override
    public void cleanup() {
        _bolt.cleanup();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        _bolt.declareOutputFields(declarer);
        for(String batchGroup: _coordSpecs.keySet()) {
            declarer.declareStream(COORD_STREAM(batchGroup), true, new Fields("id", "count"));
        }
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> ret = _bolt.getComponentConfiguration();
        if(ret==null) ret = new HashMap<>();
        ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
        // TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization
        return ret;
    }
}
  • When prepare, we first created the CoordinatedOutputCollector, then wrapped it with the OutputCollector, and finally wrapped it with BatchOutputCollectorImpl, calling ITridentBatchBolt.prepare method. ITridentBatchBolt uses the implementation class of TridentSpoutExecutor as the header.
  • Prepare initialized rotating map < object, tracked batch > _ batches = newrotating map < > (2);
  • What prepare mainly does is to build a CoordCondition, here it is mainly to calculate expectedTaskReports and TARGETS.

TridentBoltExecutor.execute

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java

    @Override
    public void execute(Tuple tuple) {
        if(TupleUtils.isTick(tuple)) {
            long now = System.currentTimeMillis();
            if(now - _lastRotate > _messageTimeoutMs) {
                _batches.rotate();
                _lastRotate = now;
            }
            return;
        }
        String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
        if(batchGroup==null) {
            // this is so we can do things like have simple DRPC that doesn't need to use batch processing
            _coordCollector.setCurrBatch(null);
            _bolt.execute(null, tuple);
            _collector.ack(tuple);
            return;
        }
        IBatchID id = (IBatchID) tuple.getValue(0);
        //get transaction id
        //if it already exists and attempt id is greater than the attempt there
        
        
        TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
//                    + " (" + _batches.size() + ")" +
//                    "\ntuple: " + tuple +
//                    "\nwith tracked " + tracked +
//                    "\nwith id " + id + 
//                    "\nwith group " + batchGroup
//                    + "\n");
//            
//        }
        //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
        
        // this code here ensures that only one attempt is ever tracked for a batch, so when
        // failures happen you don't get an explosion in memory usage in the tasks
        if(tracked!=null) {
            if(id.getAttemptId() > tracked.attemptId) {
                _batches.remove(id.getId());
                tracked = null;
            } else if(id.getAttemptId() < tracked.attemptId) {
                // no reason to try to execute a previous attempt than we've already seen
                return;
            }
        }
        
        if(tracked==null) {
            tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
            _batches.put(id.getId(), tracked);
        }
        _coordCollector.setCurrBatch(tracked);
        
        //System.out.println("TRACKED: " + tracked + " " + tuple);
        
        TupleType t = getTupleType(tuple, tracked);
        if(t==TupleType.COMMIT) {
            tracked.receivedCommit = true;
            checkFinish(tracked, tuple, t);
        } else if(t==TupleType.COORD) {
            int count = tuple.getInteger(1);
            tracked.reportedTasks++;
            tracked.expectedTupleCount+=count;
            checkFinish(tracked, tuple, t);
        } else {
            tracked.receivedTuples++;
            boolean success = true;
            try {
                _bolt.execute(tracked.info, tuple);
                if(tracked.condition.expectedTaskReports==0) {
                    success = finishBatch(tracked, tuple);
                }
            } catch(FailedException e) {
                failBatch(tracked, e);
            }
            if(success) {
                _collector.ack(tuple);                   
            } else {
                _collector.fail(tuple);
            }
        }
        _coordCollector.setCurrBatch(null);
    }

    private TupleType getTupleType(Tuple tuple, TrackedBatch batch) {
        CoordCondition cond = batch.condition;
        if(cond.commitStream!=null
                && tuple.getSourceGlobalStreamId().equals(cond.commitStream)) {
            return TupleType.COMMIT;
        } else if(cond.expectedTaskReports > 0
                && tuple.getSourceStreamId().startsWith(COORD_STREAM_PREFIX)) {
            return TupleType.COORD;
        } else {
            return TupleType.REGULAR;
        }
    }

    private void failBatch(TrackedBatch tracked, FailedException e) {
        if(e!=null && e instanceof ReportedFailedException) {
            _collector.reportError(e);
        }
        tracked.failed = true;
        if(tracked.delayedAck!=null) {
            _collector.fail(tracked.delayedAck);
            tracked.delayedAck = null;
        }
    }
  • The execute method of TridentBoltExecutor first determines whether it is a tickTuple. If it is the time to determine the distance _lastRotate (When prepare, it is initialized to the time at that time.) whether it exceeds _messageTimeoutMs, and if so, perform the _batches.rotate () operation; The transmission frequency of tickTuple is config.topology _ ticke _ tuple _ freq _ secs (topology.tick.tuple.freq.secs), which is set to 5 seconds in TridentBoltExecutor; _ messagetimeouts is context.maxtopology messagetimeout () * 1000l, which is derived from config.topology _ message _ timeout _ secs (Secs, defaults to 30 in topology.message.timeout.secs, defaults.yaml) Maximum *1000
  • _batches stores TrackedBatch information by txId of TransactionAttempt, and creates a new TrackedBatch if not; When TrackedBatch is created, the initBatchState method of _bolt is called back.
  • After that, judge the type of tuple, which is divided into TupleType.COMMIT, TupleType.COORD, TupleType.REGULAR;; If it is a TupleType.COMMIT type, set tracked.receivedCommit to true, and then call the checkFinish method; If it is a TupleType.COORD type, update the reportedTasks and expectedTupleCount counts, and then call the checkFinish method. Regular type (Batch information sent by coordinator, the receivedTuples count is updated, and then the _bolt.execute method (Here _bolt is TridentSpoutExecutor.), for tracked.condition.expertedtaskreports = = 0, finishBatch is immediately called to remove the batch from _batches; If there is a FailedException, failBatch directly reports the error information, and then ack or fail; the tuple; If the downstream is an each operation and if a partial FailedException exception is thrown in a batch, it is necessary to wait until all tuples in the batch are executed and until TupleType.COORD triggers the checkFinish detection, then fail can be notified to master, i.e. there is some lag, for example, there are 3 tuples in this batch. The second tuple throws a FailedException and will continue to execute the third tuple. Finally, the tuple of the batch is processed before receiving the TupleType.COORD trigger to detect checkFinish.

TridentBoltExecutor.checkFinish

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java

   private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
        if(tracked.failed) {
            failBatch(tracked);
            _collector.fail(tuple);
            return;
        }
        CoordCondition cond = tracked.condition;
        boolean delayed = tracked.delayedAck==null &&
                              (cond.commitStream!=null && type==TupleType.COMMIT
                               || cond.commitStream==null);
        if(delayed) {
            tracked.delayedAck = tuple;
        }
        boolean failed = false;
        if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
            if(tracked.receivedTuples == tracked.expectedTupleCount) {
                finishBatch(tracked, tuple);                
            } else {
                //TODO: add logging that not all tuples were received
                failBatch(tracked);
                _collector.fail(tuple);
                failed = true;
            }
        }
        
        if(!delayed && !failed) {
            _collector.ack(tuple);
        }
        
    }

    private void failBatch(TrackedBatch tracked) {
        failBatch(tracked, null);
    }

    private void failBatch(TrackedBatch tracked, FailedException e) {
        if(e!=null && e instanceof ReportedFailedException) {
            _collector.reportError(e);
        }
        tracked.failed = true;
        if(tracked.delayedAck!=null) {
            _collector.fail(tracked.delayedAck);
            tracked.delayedAck = null;
        }
    }
  • TridentBoltExecutor calls checkFinish when execute, when tuple is TupleType.COMMIT, and TupleType.COORD
  • Once the _ bolt.execute (tracked.info, tuple) method throws a FailedException, failBatch is called, which marks tracked.failed as true.
  • CheckFinish will call _collector.fail(tuple) when it finds tracked.failed to be true, and then call back the failure method of MasterBatchCoordinator.

TridentSpoutExecutor

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java

public class TridentSpoutExecutor implements ITridentBatchBolt {
    public static final String ID_FIELD = "$tx";
    
    public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);

    AddIdCollector _collector;
    ITridentSpout<Object> _spout;
    ITridentSpout.Emitter<Object> _emitter;
    String _streamName;
    String _txStateId;
    
    TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap<>();

    public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout<Object> spout) {
        _txStateId = txStateId;
        _spout = spout;
        _streamName = streamName;
    }
    
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) {
        _emitter = _spout.getEmitter(_txStateId, conf, context);
        _collector = new AddIdCollector(_streamName, collector);
    }

    @Override
    public void execute(BatchInfo info, Tuple input) {
        // there won't be a BatchInfo for the success stream
        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
        if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
            if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
                ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
                _activeBatches.remove(attempt.getTransactionId());
            } else {
                 throw new FailedException("Received commit for different transaction attempt");
            }
        } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            // valid to delete before what's been committed since 
            // those batches will never be accessed again
            _activeBatches.headMap(attempt.getTransactionId()).clear();
            _emitter.success(attempt);
        } else {            
            _collector.setBatch(info.batchId);
            _emitter.emitBatch(attempt, input.getValue(1), _collector);
            _activeBatches.put(attempt.getTransactionId(), attempt);
        }
    }

    @Override
    public void cleanup() {
        _emitter.close();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        List<String> fields = new ArrayList<>(_spout.getOutputFields().toList());
        fields.add(0, ID_FIELD);
        declarer.declareStream(_streamName, new Fields(fields));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return _spout.getComponentConfiguration();
    }

    @Override
    public void finishBatch(BatchInfo batchInfo) {
    }

    @Override
    public Object initBatchState(String batchGroup, Object batchId) {
        return null;
    }
}
  • The BatchOutputCollector used by TridentSpoutExecutor is constructed by TridentBoltExecutor in the prepare method. After several layers of packaging, the first is CoordinatedOutputCollector, the last is BatchOutputCollectorImpl;. The most important thing here is the coordinator package, which maintains the number of tuple issued by each taskId. In the prepare method of the executor, the collector is packaged as AddIdCollector, mainly adding batchId information (TransactionAttempt information)
  • The ITridentSpout of TridentSpoutExecutor original spout (IBatchSpout typeBatchSpoutExecutor (Assuming that the original spout is of type IBatchSpout, it will be wrapped as type ITridentSpout by BatchSpoutExecutor.), its execute method performs different processing according to different stream types. If it is MasterBatchCoordinator. Commit _ STREAM _ ID ($commit) then the emitter’s commit method is called to commit the current TransactionAttempt (The example in this article has no commit information), and then remove the tx from _activeBatches; If it is MasterBatchCoordinator. Success _ Stream _ ID ($success) then the transaction attampt with txId smaller than the txId in _ activebatch is removed first, and then the success method of emitter is called to mark the success of the transaction attampt. the method calls back the original spout (IBatchSpout typeAck method of
  • Com mit _ stream _ id ($commit) and masterbatch coordinator.success _ stream _ id ($success) type tuple is the message to start batch, where batchId is set and emitBatch of emitter is called to send data (The batchId passed here is the txId of TransactionAttempt.), while placing the TransactionAttempt in _activeBatches (Batch here is equivalent to TransactionAttempt.)

FixedBatchSpout

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/testing/FixedBatchSpout.java

public class FixedBatchSpout implements IBatchSpout {

    Fields fields;
    List<Object>[] outputs;
    int maxBatchSize;
    HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
    
    public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
        this.fields = fields;
        this.outputs = outputs;
        this.maxBatchSize = maxBatchSize;
    }
    
    int index = 0;
    boolean cycle = false;
    
    public void setCycle(boolean cycle) {
        this.cycle = cycle;
    }
    
    @Override
    public void open(Map conf, TopologyContext context) {
        index = 0;
    }

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        List<List<Object>> batch = this.batches.get(batchId);
        if(batch == null){
            batch = new ArrayList<List<Object>>();
            if(index>=outputs.length && cycle) {
                index = 0;
            }
            for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) {
                batch.add(outputs[index]);
            }
            this.batches.put(batchId, batch);
        }
        for(List<Object> list : batch){
            collector.emit(list);
        }
    }

    @Override
    public void ack(long batchId) {
        this.batches.remove(batchId);
    }

    @Override
    public void close() {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.setMaxTaskParallelism(1);
        return conf;
    }

    @Override
    public Fields getOutputFields() {
        return fields;
    }
    
}
  • Spout used by the user is of IBatchSpout type, where the tuple data corresponding to each batchId is cached to implement the semantics of transactional spout

TridentTopology.newStream

storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java

     public Stream newStream(String txId, IRichSpout spout) {
        return newStream(txId, new RichSpoutBatchExecutor(spout));
    }
    
    public Stream newStream(String txId, IBatchSpout spout) {
        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return addNode(n);
    }
    
    public Stream newStream(String txId, ITridentSpout spout) {
        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return addNode(n);
    }
    
    public Stream newStream(String txId, IPartitionedTridentSpout spout) {
        return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
    }
    
    public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
    }

    public Stream newStream(String txId, ITridentDataSource dataSource) {
        if (dataSource instanceof IBatchSpout) {
            return newStream(txId, (IBatchSpout) dataSource);
        } else if (dataSource instanceof ITridentSpout) {
            return newStream(txId, (ITridentSpout) dataSource);
        } else if (dataSource instanceof IPartitionedTridentSpout) {
            return newStream(txId, (IPartitionedTridentSpout) dataSource);
        } else if (dataSource instanceof IOpaquePartitionedTridentSpout) {
            return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);
        } else {
            throw new UnsupportedOperationException("Unsupported stream");
        }
    }
  • Users in TridentTopology.newStream can directly use an IBatchspout-like Spout. The advantage of using it is that TridentTopology build use BatchSpoutExecutor to package it as ITridentSpout type (It saves users from implementing the relevant interface of ITridentSpout and shields the relevant logic of trident spout, so that users who have been using common topology before can quickly get started with trident topology.)
  • BatchSpoutExecutor implements the ITRIDDENTSPORT interface. IBatchSpout is adapted to ITRIDDENTSPORT. The coordinator used is the EmptyCoordinator and the emitter used is BatchSpoutEmitter.
  • If the spout the user uses in TridentTopology.newStream is of type IPartitionedTridentSpout, TridentTopology uses partitionedtridentspoutecutter to wrap it as itridentsport type inside the newStream method, and IOpaquePartitionedTridentSpout to wrap it as itridentsport type.

Summary

  • ITridentSpoutTopology build use IBatchSpout (In the build method)、IPartitionedTridentSpout(In the newStream method)、IOpaquePartitionedTridentSpout(In the newStream method) Adapted to ITridentSpout Type; Adapt with BatchPartitionedTrident Executor, PartitionedTrident Executor, OpaquePartitionedTrident Executor (When TridentTopologyBuilder buildTopology, it first uses Trident spoutExecutor to package spouts of ITridentSpout type, then TridentBoltExecutor to package them, and finally converts them into bolt. The real Sport of the whole TridentTopology is MasterBatchCoordinator;. Here, we can see that an IBatchSpout's spout is first packaged as ITridentSpout type by batchsport, and then packaged as bolt by tridentsport and TridentSpoutExecutor.)
  • IBatchSpout’s ack is for the batch dimension, that is, the TransactionAttempt dimension. note that there is no fail method. if the emitBatch method throws a FailedException exception exception, TridentBoltExecutor will call the failBatch method (A batch's tuples will wait for all tuples to execute before triggering checkFinish), reportError and marking TrackedBatch failed as true, then TridentBoltExecutor will call _collector.fail(tuple) when it finds tracked.failed as true when checking finish, and then call back the failure method of MasterBatchCoordinator.
  • The fail method of MasterBatchCoordinator will remove the current Transactional ATTEMPT from _activeTx, then remove the data with txId greater than the failed txId together, and finally call the sync method to continue Transactional ATTEMPT (Note that the value of _currTransaction has not been changed here, so it will continue to retry from the failed txId. only in the ack method will the _currTransaction be changed to nextTransactionId)
  • The execute method of TridentBoltExecutor detects whether the distance from the last rotate exceeds _messageTimeoutMs (Take the maximum value *1000 of config.topology _ message _ timeout _ secs in component, where *1000 converts seconds to milliseconds), if it exceeds, rotate, and the last bucket of _batches will be removed. The frequency of tickTuple here is 5 seconds. if config. topology _ message _ timeout _ secs is calculated as 30 seconds, _messageTimeoutMs is 30*1000, which is equivalent to checking whether the time from the last rotate exceeds 30 seconds every 5 seconds. if so, rotate and discard the data of the last bucket (TrackedBatch), which is equivalent to resetting the timed-out TrackedBatch message
  • Regarding the fail of masterBatchCoordinator, there are several situations. One is that downstream componnent actively throws a FailException, which will trigger the fail of Master at this time and retry TransactionAttempt; again. One is that the downstream component takes longer to process tuple than config.topology _ message _ timeout _ secs (Secs, defaults to 30 in topology.message.timeout.secs, defaults.yaml), ack will trigger master’s failure at this time, causing the TransactionAttempt failure to continue to retry. currently, there is no limit on the number of attempts. in the actual production process, attention should be paid, because as long as one tuple of the batchId fails, the tuples of the entire batchId will be retransmitted. if the downstream fails to handle it properly at this time, the previous tuple of the batchId may succeed, while the latter fails, causing the successful tuple to be repeatedly processed (In order to avoid the failure of batch, the State of Trident needs to be used in conjunction with the success of partial processing and the failure of partial processing.)。

doc