[case46] Talk about _ MaxTransactional of StormtradidentSpout

  storm

Order

This article mainly studies the _maxTransactionActive of storemtradidentsport.

MasterBatchCoordinator

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/MasterBatchCoordinator.java


   TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>();

   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
        for(String spoutId: _managedSpoutIds) {
            _states.add(TransactionalState.newCoordinatorState(conf, spoutId));
        }
        _currTransaction = getStoredCurrTransaction();

        _collector = collector;
        Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
        if(active==null) {
            _maxTransactionActive = 1;
        } else {
            _maxTransactionActive = active.intValue();
        }
        _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);

        
        for(int i=0; i<_spouts.size(); i++) {
            String txId = _managedSpoutIds.get(i);
            _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
        }
        LOG.debug("Opened {}", this);
    }

   public void nextTuple() {
        sync();
    }

    private void sync() {
        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
        // max_spout_pending = 3
        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
        if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
            LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
        }
        
        if(_active) {
            if(_activeTx.size() < _maxTransactionActive) {
                Long curr = _currTransaction;
                for(int i=0; i<_maxTransactionActive; i++) {
                    if(!_activeTx.containsKey(curr) && isReady(curr)) {
                        // by using a monotonically increasing attempt id, downstream tasks
                        // can be memory efficient by clearing out state for old attempts
                        // as soon as they see a higher attempt id for a transaction
                        Integer attemptId = _attemptIds.get(curr);
                        if(attemptId==null) {
                            attemptId = 0;
                        } else {
                            attemptId++;
                        }
                        _attemptIds.put(curr, attemptId);
                        for(TransactionalState state: _states) {
                            state.setData(CURRENT_ATTEMPTS, _attemptIds);
                        }
                        
                        TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                        final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                        _activeTx.put(curr, newTransactionStatus);
                        _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                        LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);
                        _throttler.markEvent();
                    }
                    curr = nextTransactionId(curr);
                }
            }
        }
    }

    private static class TransactionStatus {
        TransactionAttempt attempt;
        AttemptStatus status;
        
        public TransactionStatus(TransactionAttempt attempt) {
            this.attempt = attempt;
            this.status = AttemptStatus.PROCESSING;
        }

        @Override
        public String toString() {
            return attempt.toString() + " <" + status.toString() + ">";
        }        
    }

    private static enum AttemptStatus {
        PROCESSING,
        PROCESSED,
        COMMITTING
    }
  • MasterBatchCoordinator sets _maxTransactionActive in the open method from config.topology _ max _ sport _ pending (topology.max.spout.pending), the configuration file defaults to null, and here _maxTransactionActive is set to 1 when the value is null
  • NextTuple controls the number of batches processed at the same time. The next batch can only be continued after batches in _activeTx are processed successfully or fail.
  • _activeTx is a treeMap with transactionId as the key and value as TransactionStatus, which contains transactionstatus and AttemptStatus; ; AttemptStatus has three states: PROCESSING, PROCESSED, COMMITTING

TridentSpoutCoordinator

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/spout/TridentSpoutCoordinator.java

    RotatingTransactionalState _state;

    public void prepare(Map conf, TopologyContext context) {
        _coord = _spout.getCoordinator(_id, conf, context);
        _underlyingState = TransactionalState.newCoordinatorState(conf, _id);
        _state = new RotatingTransactionalState(_underlyingState, META_DIR);
    }

    public void execute(Tuple tuple, BasicOutputCollector collector) {
        TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);

        if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            _state.cleanupBefore(attempt.getTransactionId());
            _coord.success(attempt.getTransactionId());
        } else {
            long txid = attempt.getTransactionId();
            Object prevMeta = _state.getPreviousState(txid);
            Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
            _state.overrideState(txid, meta);
            collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
        }
                
    }
  • The execute method of TridentSpoutCoordinator accesses meta by txid, and then sends Values(attempt, meta) to TridentBoltExecutor.

TridentBoltExecutor

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/TridentBoltExecutor.java


    RotatingMap<Object, TrackedBatch> _batches;


    public void execute(Tuple tuple) {
        if(TupleUtils.isTick(tuple)) {
            long now = System.currentTimeMillis();
            if(now - _lastRotate > _messageTimeoutMs) {
                _batches.rotate();
                _lastRotate = now;
            }
            return;
        }
        String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
        if(batchGroup==null) {
            // this is so we can do things like have simple DRPC that doesn't need to use batch processing
            _coordCollector.setCurrBatch(null);
            _bolt.execute(null, tuple);
            _collector.ack(tuple);
            return;
        }
        IBatchID id = (IBatchID) tuple.getValue(0);
        //get transaction id
        //if it already exists and attempt id is greater than the attempt there
        
        
        TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
//                    + " (" + _batches.size() + ")" +
//                    "\ntuple: " + tuple +
//                    "\nwith tracked " + tracked +
//                    "\nwith id " + id + 
//                    "\nwith group " + batchGroup
//                    + "\n");
//            
//        }
        //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
        
        // this code here ensures that only one attempt is ever tracked for a batch, so when
        // failures happen you don't get an explosion in memory usage in the tasks
        if(tracked!=null) {
            if(id.getAttemptId() > tracked.attemptId) {
                _batches.remove(id.getId());
                tracked = null;
            } else if(id.getAttemptId() < tracked.attemptId) {
                // no reason to try to execute a previous attempt than we've already seen
                return;
            }
        }
        
        if(tracked==null) {
            tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
            _batches.put(id.getId(), tracked);
        }
        _coordCollector.setCurrBatch(tracked);
        
        //System.out.println("TRACKED: " + tracked + " " + tuple);
        
        TupleType t = getTupleType(tuple, tracked);
        if(t==TupleType.COMMIT) {
            tracked.receivedCommit = true;
            checkFinish(tracked, tuple, t);
        } else if(t==TupleType.COORD) {
            int count = tuple.getInteger(1);
            tracked.reportedTasks++;
            tracked.expectedTupleCount+=count;
            checkFinish(tracked, tuple, t);
        } else {
            tracked.receivedTuples++;
            boolean success = true;
            try {
                _bolt.execute(tracked.info, tuple);
                if(tracked.condition.expectedTaskReports==0) {
                    success = finishBatch(tracked, tuple);
                }
            } catch(FailedException e) {
                failBatch(tracked, e);
            }
            if(success) {
                _collector.ack(tuple);                   
            } else {
                _collector.fail(tuple);
            }
        }
        _coordCollector.setCurrBatch(null);
    }

    public static class TrackedBatch {
        int attemptId;
        BatchInfo info;
        CoordCondition condition;
        int reportedTasks = 0;
        int expectedTupleCount = 0;
        int receivedTuples = 0;
        Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
        boolean failed = false;
        boolean receivedCommit;
        Tuple delayedAck = null;
        
        public TrackedBatch(BatchInfo info, CoordCondition condition, int attemptId) {
            this.info = info;
            this.condition = condition;
            this.attemptId = attemptId;
            receivedCommit = condition.commitStream == null;
        }

        @Override
        public String toString() {
            return ToStringBuilder.reflectionToString(this);
        }        
    }

    private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
        if(tracked.failed) {
            failBatch(tracked);
            _collector.fail(tuple);
            return;
        }
        CoordCondition cond = tracked.condition;
        boolean delayed = tracked.delayedAck==null &&
                              (cond.commitStream!=null && type==TupleType.COMMIT
                               || cond.commitStream==null);
        if(delayed) {
            tracked.delayedAck = tuple;
        }
        boolean failed = false;
        if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
            if(tracked.receivedTuples == tracked.expectedTupleCount) {
                finishBatch(tracked, tuple);                
            } else {
                //TODO: add logging that not all tuples were received
                failBatch(tracked);
                _collector.fail(tuple);
                failed = true;
            }
        }
        
        if(!delayed && !failed) {
            _collector.ack(tuple);
        }
        
    }

    private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
        boolean success = true;
        try {
            _bolt.finishBatch(tracked.info);
            String stream = COORD_STREAM(tracked.info.batchGroup);
            for(Integer task: tracked.condition.targetTasks) {
                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
            }
            if(tracked.delayedAck!=null) {
                _collector.ack(tracked.delayedAck);
                tracked.delayedAck = null;
            }
        } catch(FailedException e) {
            failBatch(tracked, e);
            success = false;
        }
        _batches.remove(tracked.info.batchId.getId());
        return success;
    }

    private void failBatch(TrackedBatch tracked, FailedException e) {
        if(e!=null && e instanceof ReportedFailedException) {
            _collector.reportError(e);
        }
        tracked.failed = true;
        if(tracked.delayedAck!=null) {
            _collector.fail(tracked.delayedAck);
            tracked.delayedAck = null;
        }
    }
  • TridentBoltExecutor uses Rotational Map (_batches) to store batch information, key is txid, and valute is TrackedBatch.
  • When calling the _bolt. execute (tracked. info, tuple) method, BatchInfo is passed, and its state value is _ bolt. initbatch (batch group, id), which is obtained through initbatch of _ bolt. this is called when the txid information is not available in _ batch for the first time, and when it is created for the first time
  • CheckFinish here is also judged according to Tracked batch information corresponding to Batch; When finishBatch, it will call _bolt.finishBatch(tracked.info) to pass the batchInfo in the past; FailBatch also operates on TrackedBatch corresponding to batch.

BatchInfo

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/BatchInfo.java

public class BatchInfo {
    public IBatchID batchId;
    public Object state;
    public String batchGroup;
    
    public BatchInfo(String batchGroup, IBatchID batchId, Object state) {
        this.batchGroup = batchGroup;
        this.batchId = batchId;
        this.state = state;
    }
}
  • BatchInfo contains batchId, state and batchGroup information.

TridentSpoutExecutor

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/spout/TridentSpoutExecutor.java

    public Object initBatchState(String batchGroup, Object batchId) {
        return null;
    }

    public void execute(BatchInfo info, Tuple input) {
        // there won't be a BatchInfo for the success stream
        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
        if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
            if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
                ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
                _activeBatches.remove(attempt.getTransactionId());
            } else {
                 throw new FailedException("Received commit for different transaction attempt");
            }
        } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            // valid to delete before what's been committed since 
            // those batches will never be accessed again
            _activeBatches.headMap(attempt.getTransactionId()).clear();
            _emitter.success(attempt);
        } else {            
            _collector.setBatch(info.batchId);
            _emitter.emitBatch(attempt, input.getValue(1), _collector);
            _activeBatches.put(attempt.getTransactionId(), attempt);
        }
    }

    public void finishBatch(BatchInfo batchInfo) {
    }
  • The execute method of TridentSpoutExecutor also distinguishes the information of each batch according to txid.

SubtopologyBolt

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/SubtopologyBolt.java

    public Object initBatchState(String batchGroup, Object batchId) {
        ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
            p.startBatch(ret);
        }
        return ret;
    }

    public void execute(BatchInfo batchInfo, Tuple tuple) {
        String sourceStream = tuple.getSourceStreamId();
        InitialReceiver ir = _roots.get(sourceStream);
        if(ir==null) {
            throw new RuntimeException("Received unexpected tuple " + tuple.toString());
        }
        ir.receive((ProcessorContext) batchInfo.state, tuple);
    }

    public void finishBatch(BatchInfo batchInfo) {
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext) batchInfo.state);
        }
    }

    protected static class InitialReceiver {
        List<TridentProcessor> _receivers = new ArrayList<>();
        RootFactory _factory;
        ProjectionFactory _project;
        String _stream;
        
        public InitialReceiver(String stream, Fields allFields) {
            // TODO: don't want to project for non-batch bolts...???
            // how to distinguish "batch" streams from non-batch streams?
            _stream = stream;
            _factory = new RootFactory(allFields);
            List<String> projected = new ArrayList<>(allFields.toList());
            projected.remove(0);
            _project = new ProjectionFactory(_factory, new Fields(projected));
        }
        
        public void receive(ProcessorContext context, Tuple tuple) {
            TridentTuple t = _project.create(_factory.create(tuple));
            for(TridentProcessor r: _receivers) {
                r.execute(context, _stream, t);
            }            
        }
        
        public void addReceiver(TridentProcessor p) {
            _receivers.add(p);
        }
        
        public Factory getOutputFactory() {
            return _project;
        }
    }
  • SubtopologyBolt created the ProcessorContext with the identifier batchId when initBatchState. In this way, if different BATCHEs are parallel, their ProcessorContext is also distinguished.
  • The execute method uses the ProcessorContext (batchInfo.state), the execute method of TridentProcessor is called using the ProcessorContext of each batch.
  • The same is true for the finishBatch method, which passes (processorscontext) batchinfo.state to TridentProcessor.finishBatch

AggregateProcessor

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/processor/AggregateProcessor.java

    public void startBatch(ProcessorContext processorContext) {
        _collector.setContext(processorContext);
        processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);
    }    

    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        _collector.setContext(processorContext);
        _agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);
    }
    
    public void finishBatch(ProcessorContext processorContext) {
        _collector.setContext(processorContext);
        _agg.complete(processorContext.state[_context.getStateIndex()], _collector);
    }
  • The startBatch, execute, finishBatch methods of the AggregateProcessor all use the state of the ProcessorContext, which is passed from SubtopologyBolt to distinguish batch.

EachProcessor

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/processor/EachProcessor.java

    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
        List<Factory> parents = tridentContext.getParentTupleFactories();
        if(parents.size()!=1) {
            throw new RuntimeException("Each operation can only have one parent");
        }
        _context = tridentContext;
        _collector = new AppendCollector(tridentContext);
        _projection = new ProjectionFactory(parents.get(0), _inputFields);
        _function.prepare(conf, new TridentOperationContext(context, _projection));
    }
    
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        _collector.setContext(processorContext, tuple);
        _function.execute(_projection.create(tuple), _collector);
    }

    public void startBatch(ProcessorContext processorContext) {
    }

    public void finishBatch(ProcessorContext processorContext) {
    }
  • EachProcessor sets the ProcessorContext to _collector, and then passes _collector when it calls _function.execute; The _collector here is AppendCollector.

AppendCollector

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/processor/AppendCollector.java

public class AppendCollector implements TridentCollector {
    OperationOutputFactory _factory;
    TridentContext _triContext;
    TridentTuple tuple;
    ProcessorContext context;
    
    public AppendCollector(TridentContext context) {
        _triContext = context;
        _factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());
    }
                
    public void setContext(ProcessorContext pc, TridentTuple t) {
        this.context = pc;
        this.tuple = t;
    }

    @Override
    public void emit(List<Object> values) {
        TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);
        for(TupleReceiver r: _triContext.getReceivers()) {
            r.execute(context, _triContext.getOutStreamId(), toEmit);
        }
    }

    @Override
    public void reportError(Throwable t) {
        _triContext.getDelegateCollector().reportError(t);
    } 
    
    public Factory getOutputFactory() {
        return _factory;
    }
}
  • When _function.execute uses AppendCollector for emit, AppendCollector will hand over these tuple to TupleReceiver for processing, and the passed context is the processorscontext set by EachProcessor, that is, each batch’s own processorscontext; The execute method of the TupleReceiver may access the ProcessorContext, which is also of the batch dimension. For example, the AggregateProcessor stores the aggregation results in the processorContext.state of its own batch.

Summary

  • Storm’s trident uses [id,count] data to tell the downstream TridentBoltExecutor to end a batch;; When TridentBoltExecutor receives [id,count] data, it will first determine whether tracked.reportedTasks is equal to cond.expectedTaskReports (This is used to aggregate the data of these task when the parallelism of the upstream TridentBoltExecutor is greater than 1.), and then determine whether tracked.receivedTuples is equal to tracked.expectedTupleCount, and finishBatch can only be performed if they are equal.
  • Storm’s tridentsport _maxTransactionActive parameter is based on config.topology _ max _ sport _ pending (topology.max.spout.pending) is set, the configuration file defaults to null, and when the value is null _maxTransactionActive is 1
  • The Masterbatch; Coordinator controls the number of batches processed at the same time. The next Batch can only be continued after batches in _activeTx are processed su ccessfully or fail. When there are multiple _ active txs in parallel, the downstream TridentBoltExecutor can distinguish batch for processing without causing chaos. For example, when SubtopologyBolt was in initBatchState, the ProcessorContext was also created with the identifier batchId. In this way, if different BATCHEs are parallel, their processor contexts are also distinguished. Some TridentProcessor called in SubtopologyBolt use the ProcessorContext to store the results. For e xample, AggregateProcessor stores the aggregated results in its own batch processorContext.state

doc