Talk about storm’s AggregateProcessor’s execute and finishBatch methods.

  storm

Order

This article mainly studies storm’s AggregateProcessor’s execute and finishBatch methods.

Example

        TridentTopology topology = new TridentTopology();
        topology.newStream("spout1", spout)
                .groupBy(new Fields("user"))
                .aggregate(new Fields("user","score"),new UserCountAggregator(),new Fields("val"))
                .toStream()
                .parallelismHint(1)
                .each(new Fields("val"),new PrintEachFunc(),new Fields());

TridentBoltExecutor

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/TridentBoltExecutor.java

    private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
        if(tracked.failed) {
            failBatch(tracked);
            _collector.fail(tuple);
            return;
        }
        CoordCondition cond = tracked.condition;
        boolean delayed = tracked.delayedAck==null &&
                              (cond.commitStream!=null && type==TupleType.COMMIT
                               || cond.commitStream==null);
        if(delayed) {
            tracked.delayedAck = tuple;
        }
        boolean failed = false;
        if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
            if(tracked.receivedTuples == tracked.expectedTupleCount) {
                finishBatch(tracked, tuple);                
            } else {
                //TODO: add logging that not all tuples were received
                failBatch(tracked);
                _collector.fail(tuple);
                failed = true;
            }
        }
        
        if(!delayed && !failed) {
            _collector.ack(tuple);
        }
        
    }

   private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
        boolean success = true;
        try {
            _bolt.finishBatch(tracked.info);
            String stream = COORD_STREAM(tracked.info.batchGroup);
            for(Integer task: tracked.condition.targetTasks) {
                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
            }
            if(tracked.delayedAck!=null) {
                _collector.ack(tracked.delayedAck);
                tracked.delayedAck = null;
            }
        } catch(FailedException e) {
            failBatch(tracked, e);
            success = false;
        }
        _batches.remove(tracked.info.batchId.getId());
        return success;
    }

    public static class TrackedBatch {
        int attemptId;
        BatchInfo info;
        CoordCondition condition;
        int reportedTasks = 0;
        int expectedTupleCount = 0;
        int receivedTuples = 0;
        Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
        //......
    }
  • The user’s spout and groupBy operations are finally packaged as TridentBoltExecutor, while groupBy’s TridentBoltExecutor is packaged as SubtopologyBolt.
  • TridentBoltExecutor calls checkfinishBatch operation (In addition, when a REGULAR type tuple is received, the finishBatch operation will also be called when tracked.condition.expectedTaskReports = = 0. for spout, tracked.condition.expertedtaskreports is 0, because it is a data source, so it is not necessary to receive COORD_STREAM to update expertedtaskreports and expectedTupleCount.), and the operation will send new values (tracked.info.batchId, utils.get (tracked.taskemittedtuples, task, 0)) to the stream of COORD_STREAM, that is, new Fields(“id”, “count “), that is, batch id and the number of tuples sent to the destination task, informing the downstream how many tuples it sent to task (TaskEmittedTuples data is maintained in the emit and emitDirect methods of coordinator)
  • Downstream is also TridentBoltExecutor, which updates expectedTupleCount when it receives data from COORD_STREAM. Each TridentBoltExecutor determines in the checkFinish method that if receiv edTuples equals expectedTupleCount, it means that it has completely received the Tuples sent from upstream and then triggers the finishBatch operation.

SubtopologyBolt

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/SubtopologyBolt.java

public class SubtopologyBolt implements ITridentBatchBolt {
    //......
    @Override
    public void execute(BatchInfo batchInfo, Tuple tuple) {
        String sourceStream = tuple.getSourceStreamId();
        InitialReceiver ir = _roots.get(sourceStream);
        if(ir==null) {
            throw new RuntimeException("Received unexpected tuple " + tuple.toString());
        }
        ir.receive((ProcessorContext) batchInfo.state, tuple);
    }

    @Override
    public void finishBatch(BatchInfo batchInfo) {
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext) batchInfo.state);
        }
    }

    @Override
    public Object initBatchState(String batchGroup, Object batchId) {
        ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
            p.startBatch(ret);
        }
        return ret;
    }

    @Override
    public void cleanup() {
        for(String bg: _myTopologicallyOrdered.keySet()) {
            for(TridentProcessor p: _myTopologicallyOrdered.get(bg)) {
                p.cleanup();
            }   
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        for(Node n: _nodes) {
            declarer.declareStream(n.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), n.allOutputFields));
        }        
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    protected static class InitialReceiver {
        List<TridentProcessor> _receivers = new ArrayList<>();
        RootFactory _factory;
        ProjectionFactory _project;
        String _stream;
        
        public InitialReceiver(String stream, Fields allFields) {
            // TODO: don't want to project for non-batch bolts...???
            // how to distinguish "batch" streams from non-batch streams?
            _stream = stream;
            _factory = new RootFactory(allFields);
            List<String> projected = new ArrayList<>(allFields.toList());
            projected.remove(0);
            _project = new ProjectionFactory(_factory, new Fields(projected));
        }
        
        public void receive(ProcessorContext context, Tuple tuple) {
            TridentTuple t = _project.create(_factory.create(tuple));
            for(TridentProcessor r: _receivers) {
                r.execute(context, _stream, t);
            }            
        }
        
        public void addReceiver(TridentProcessor p) {
            _receivers.add(p);
        }
        
        public Factory getOutputFactory() {
            return _project;
        }
    }
}
  • The groupBy operation is wrapped as a SubtopologyBolt, and the first field of its outputFields is $batchId.
  • The execute method gets the corresponding Initialreceiver and then calls the Receive method. The receive method of InitialReceiver calls the execute o f _receivers, where receive is AggregateProcessor.
  • The finishBatch method calls the finishBatch method of the TridentProcessor returned by _ mytopologycollallyordered.get (batchinfo.batchgroup) one by one, here are the AggregateProcessor and EachProcessor;; BatchInfo, containing batchId, processorContext and batchGroup information, here the processorContext (Contains batchId of TransactionAttempt type and Object array state, which contains GroupCollector, aggregate accumulation result, etc.) to the finishBatch method

AggregateProcessor

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/processor/AggregateProcessor.java

public class AggregateProcessor implements TridentProcessor {
    Aggregator _agg;
    TridentContext _context;
    FreshCollector _collector;
    Fields _inputFields;
    ProjectionFactory _projection;

    public AggregateProcessor(Fields inputFields, Aggregator agg) {
        _agg = agg;
        _inputFields = inputFields;
    }
    
    @Override
    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
        List<Factory> parents = tridentContext.getParentTupleFactories();
        if(parents.size()!=1) {
            throw new RuntimeException("Aggregate operation can only have one parent");
        }
        _context = tridentContext;
        _collector = new FreshCollector(tridentContext);
        _projection = new ProjectionFactory(parents.get(0), _inputFields);
        _agg.prepare(conf, new TridentOperationContext(context, _projection));
    }

    @Override
    public void cleanup() {
        _agg.cleanup();
    }

    @Override
    public void startBatch(ProcessorContext processorContext) {
        _collector.setContext(processorContext);
        processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);
    }    

    @Override
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        _collector.setContext(processorContext);
        _agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);
    }
    
    @Override
    public void finishBatch(ProcessorContext processorContext) {
        _collector.setContext(processorContext);
        _agg.complete(processorContext.state[_context.getStateIndex()], _collector);
    }
 
    @Override
    public Factory getOutputFactory() {
        return _collector.getOutputFactory();
    }
}
  • AggregateProcessor created FreshCollector and ProjectionFactory in prepare.
  • For GroupBy operation, the _ ag here is GroupedAggregator, and the context passed by _ ag.prepare is TridentOperationContext.
  • The finishBatch method calls the _agg.complete method here, and the arr array passed in, the first element is the GroupCollector, and the second element is the accumulated value of the aggregator; The _collector passed in is FreshCollector

GroupedAggregator

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/operation/impl/GroupedAggregator.java

public class GroupedAggregator implements Aggregator<Object[]> {
    ProjectionFactory _groupFactory;
    ProjectionFactory _inputFactory;
    Aggregator _agg;
    ComboList.Factory _fact;
    Fields _inFields;
    Fields _groupFields;
    
    public GroupedAggregator(Aggregator agg, Fields group, Fields input, int outSize) {
        _groupFields = group;
        _inFields = input;
        _agg = agg;
        int[] sizes = new int[2];
        sizes[0] = _groupFields.size();
        sizes[1] = outSize;
        _fact = new ComboList.Factory(sizes);
    }
    
    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        _inputFactory = context.makeProjectionFactory(_inFields);
        _groupFactory = context.makeProjectionFactory(_groupFields);
        _agg.prepare(conf, new TridentOperationContext(context, _inputFactory));
    }

    @Override
    public Object[] init(Object batchId, TridentCollector collector) {
        return new Object[] {new GroupCollector(collector, _fact), new HashMap(), batchId};
    }

    @Override
    public void aggregate(Object[] arr, TridentTuple tuple, TridentCollector collector) {
        GroupCollector groupColl = (GroupCollector) arr[0];
        Map<List, Object> val = (Map) arr[1];
        TridentTuple group = _groupFactory.create((TridentTupleView) tuple);
        TridentTuple input = _inputFactory.create((TridentTupleView) tuple);
        Object curr;
        if(!val.containsKey(group)) {
            curr = _agg.init(arr[2], groupColl);
            val.put((List) group, curr);
        } else {
            curr = val.get(group);
        }
        groupColl.currGroup = group;
        _agg.aggregate(curr, input, groupColl);
        
    }

    @Override
    public void complete(Object[] arr, TridentCollector collector) {
        Map<List, Object> val = (Map) arr[1];        
        GroupCollector groupColl = (GroupCollector) arr[0];
        for(Entry<List, Object> e: val.entrySet()) {
            groupColl.currGroup = e.getKey();
            _agg.complete(e.getValue(), groupColl);
        }
    }

    @Override
    public void cleanup() {
        _agg.cleanup();
    }
    
}
  • Arr[0] of the aggregate method is GroupCollector; ; Arr[1] is map, key is TridentTupleView of group field, init return value with value _agg is used for accumulation; Arr[2] is TransactionAttempt
  • _agg here is ChainedAggregatorImpl. aggregate first obtains the group field of the tuple and the entered tuple, and then judges whether arr[1] has the value of the group. If not, it calls init of _agg to initialize one and adds it to map
  • The aggregate method finally calls _ aggregate.aggregate for accumulation

ChainedAggregatorImpl

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/operation/impl/ChainedAggregatorImpl.java

public class ChainedAggregatorImpl implements Aggregator<ChainedResult> {
    Aggregator[] _aggs;
    ProjectionFactory[] _inputFactories;
    ComboList.Factory _fact;
    Fields[] _inputFields;
    
    
    
    public ChainedAggregatorImpl(Aggregator[] aggs, Fields[] inputFields, ComboList.Factory fact) {
        _aggs = aggs;
        _inputFields = inputFields;
        _fact = fact;
        if(_aggs.length!=_inputFields.length) {
            throw new IllegalArgumentException("Require input fields for each aggregator");
        }
    }
    
    public void prepare(Map conf, TridentOperationContext context) {
        _inputFactories = new ProjectionFactory[_inputFields.length];
        for(int i=0; i<_inputFields.length; i++) {
            _inputFactories[i] = context.makeProjectionFactory(_inputFields[i]);
            _aggs[i].prepare(conf, new TridentOperationContext(context, _inputFactories[i]));
        }
    }
    
    public ChainedResult init(Object batchId, TridentCollector collector) {
        ChainedResult initted = new ChainedResult(collector, _aggs.length);
        for(int i=0; i<_aggs.length; i++) {
            initted.objs[i] = _aggs[i].init(batchId, initted.collectors[i]);
        }
        return initted;
    }
    
    public void aggregate(ChainedResult val, TridentTuple tuple, TridentCollector collector) {
        val.setFollowThroughCollector(collector);
        for(int i=0; i<_aggs.length; i++) {
            TridentTuple projected = _inputFactories[i].create((TridentTupleView) tuple);
            _aggs[i].aggregate(val.objs[i], projected, val.collectors[i]);
        }
    }
    
    public void complete(ChainedResult val, TridentCollector collector) {
        val.setFollowThroughCollector(collector);
        for(int i=0; i<_aggs.length; i++) {
            _aggs[i].complete(val.objs[i], val.collectors[i]);
        }
        if(_aggs.length > 1) { // otherwise, tuples were emitted directly
            int[] indices = new int[val.collectors.length];
            for(int i=0; i<indices.length; i++) {
                indices[i] = 0;
            }
            boolean keepGoing = true;
            //emit cross-join of all emitted tuples
            while(keepGoing) {
                List[] combined = new List[_aggs.length];
                for(int i=0; i< _aggs.length; i++) {
                    CaptureCollector capturer = (CaptureCollector) val.collectors[i];
                    combined[i] = capturer.captured.get(indices[i]);
                }
                collector.emit(_fact.create(combined));
                keepGoing = increment(val.collectors, indices, indices.length - 1);
            }
        }
    }
    
    //return false if can't increment anymore
    private boolean increment(TridentCollector[] lengths, int[] indices, int j) {
        if(j==-1) return false;
        indices[j]++;
        CaptureCollector capturer = (CaptureCollector) lengths[j];
        if(indices[j] >= capturer.captured.size()) {
            indices[j] = 0;
            return increment(lengths, indices, j-1);
        }
        return true;
    }
    
    public void cleanup() {
       for(Aggregator a: _aggs) {
           a.cleanup();
       } 
    } 
}
  • The init method returns a ChainedResult, and its objs field stores the init result corresponding to each _aggs
  • If the _agg here is of the type of Aggregator, it is the Aggregator passed in by the aggregate method after groupBy; If it is a CombinerAggregator type, it will be wrapped by combiner aggregator
  • The complete method of ChainedAggregatorImpl, _aggs calls complete one by one, and the first parameter passed in is val.objs[i], that is, the accumulated value corresponding to each _agg

Summary

  • GroupBy is wrapped as a SubtopologyBolt, its execute method triggers the receive method of InitialReceiver, and the receive method triggers the execute method of _receivers, the first _receivers being the AggregateProcessor.
  • The AggregateProcessor wraps the Groupedaggregator, while the GroupedAggregator wraps the ChainedAggregatorImpl, while the ChainedAggregatorImpl wraps the Aggregator array. In this example, there is only one Aggregator passed in by the Aggregation method after groupBy
  • TridentBoltExecutor will receive the count of the tuples that should be received from COORD_STREAM_PREFIX from coordinator, then update expectedTupleCount, and then make a checkFinish judgment. When receivedTuples (This value is updated every time a tuple of spout's batch is received) is equal to expectedTupleCount, the finishBatch operation will be triggered. The operation will call SubtopologyBolt.finishBatch, then AggregateProcessor.finishBatch, and then GroupedAggregator.complete. Complete, and then the complete of the user’s aggregator.
  • For TridentBoltExecutor packaged with Trident SporuteExecutor, its tracked.condition.expectedTaskReports is 0, because it is a data source, it does not need to receive COORD_STREAM updates expectedTaskreports and expectedTupleCount;; When it receives the MasterBatchCoordinator. batch _ stream _ id ($batch) from the tuple, call the execute method of TridentSpoutExecutor, and then because tracked.condition.expertedtaskreports = = 0 (In this example, the condition.commitStream of TrackedBatch of the two TridentBoltExecutor is null, so receivedCommit is true.), the finishBatch (The TridentSpoutExecutor's finishBatch method will be called inside, and then the batchId and the number of taskEmittedTuples will be sent to the task of the downstream TridentBoltExecutor via COORD_STREAM. However, for the downstream TridentBoltExecutor whose expectedTaskReports are not 0, it is necessary to checkFinish when receiving the tuple of COORD_STREAM to determine whether finishBatch is possible.)
  • The execute of TridentSpoutExecutor calls emitter (Finally, call the user's spout.) launch a batch;; However, the finishBatch method is currently empty and nothing has been done. That is to say, for the tridentspouteexecutor wrapped with tridentspouteexecutor, after it receives the instruction to launch a batch, it calls TridentSpoutExecutor.execute launches a batch through emitter, and immediately executes the finishBatch operation (Send [id,count] to the downstream TridentBoltExecutor. When receiving [id,count] data, the downstream TridentBoltExecutor updates expectedTupleCount, and then makes a checkFinish judgment. If receivedTuples equals expectedTupleCount, Triggers the finishBatch operation, which in turn triggers the finishBatch operation of the AggregateProcessor.)

doc