Talking about the diversion and aggregation of storm trident batch



This article mainly studies the distribution and aggregation of storm trident batch.


        TridentTopology topology = new TridentTopology();
        topology.newStream("spout1", spout)
                .partitionBy(new Fields("user"))
                .partitionAggregate(new Fields("user","score","batchId"),new OriginUserCountAggregator(),new Fields("result","aggBatchId"))
                .aggregate(new Fields("result","aggBatchId"),new AggAgg(),new Fields("agg"))
                .each(new Fields("agg"),new PrintEachFunc(),new Fields())
  • At last, three bolt are constructed here, which are b-0, b-1 and b-2 respectively.
  • B-0 is mainly partitionAggregate, and its parallelismHint is 3
  • B-1 mainly handles the init of the CombinerAggregator, and its parallelismHint is 1. since its upstream bolt has 3 tasks, its TridentBoltExecutor’s tracked.condition.expertedtaskreports is 3. it cannot finishBatch until the aggregate data of the three tasks have arrived.
  • B-2 mainly deals with the combination and each operations of the CombinerAggregator.
  • The whole data stream starts from a batch at spout, and is divided into 3 sub-batches at b-0 through partitionBy, and finishBatch at b-1 after aggregation of 3 sub-batches, and finally aggregates the results at b-2 after aggregation of b-1

Log instance

23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO  com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt1, 1]
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO  com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt2, 1]
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO  com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt3, 1]
23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO  com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0
23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO  com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt2, 1, 1]
23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO  com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt2=1}}
23:22:00.722 [Thread-22-b-0-executor[7 7]] INFO  com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0
23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO  com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0
23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO  com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt1, 1, 1]
23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO  com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt3, 1, 1]
23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO  com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt1=1}}
23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO  com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt3=1}}
23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - zero called
23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - init tuple:[{1={nickt2=1}}, 1:0]
23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt2=1}}
23:22:00.726 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - init tuple:[{1={nickt3=1}}, 1:0]
23:22:00.727 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - combine val1:{1={nickt2=1}},val2:{1={nickt3=1}}
23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - init tuple:[{1={nickt1=1}}, 1:0]
23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - combine val1:{1={nickt3=1, nickt2=1}},val2:{1={nickt1=1}}
23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO  com.example.demo.trident.AggAgg - zero called
23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO  com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt3=1, nickt2=1, nickt1=1}}
23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO  com.example.demo.trident.PrintEachFunc - null each tuple:[{1={nickt3=1, nickt2=1, nickt1=1}}]
  • The name of storm’s thread has been given bolt, such as b-0, b-1, b-2.


storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/

    public void execute(Tuple tuple) {
        if(TupleUtils.isTick(tuple)) {
            long now = System.currentTimeMillis();
            if(now - _lastRotate > _messageTimeoutMs) {
                _lastRotate = now;
        String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
        if(batchGroup==null) {
            // this is so we can do things like have simple DRPC that doesn't need to use batch processing
            _bolt.execute(null, tuple);
        IBatchID id = (IBatchID) tuple.getValue(0);
        //get transaction id
        //if it already exists and attempt id is greater than the attempt there
        TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
//                    + " (" + _batches.size() + ")" +
//                    "\ntuple: " + tuple +
//                    "\nwith tracked " + tracked +
//                    "\nwith id " + id + 
//                    "\nwith group " + batchGroup
//                    + "\n");
//        }
        //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
        // this code here ensures that only one attempt is ever tracked for a batch, so when
        // failures happen you don't get an explosion in memory usage in the tasks
        if(tracked!=null) {
            if(id.getAttemptId() > tracked.attemptId) {
                tracked = null;
            } else if(id.getAttemptId() < tracked.attemptId) {
                // no reason to try to execute a previous attempt than we've already seen
        if(tracked==null) {
            tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
            _batches.put(id.getId(), tracked);
        //System.out.println("TRACKED: " + tracked + " " + tuple);
        TupleType t = getTupleType(tuple, tracked);
        if(t==TupleType.COMMIT) {
            tracked.receivedCommit = true;
            checkFinish(tracked, tuple, t);
        } else if(t==TupleType.COORD) {
            int count = tuple.getInteger(1);
            checkFinish(tracked, tuple, t);
        } else {
            boolean success = true;
            try {
                _bolt.execute(, tuple);
                if(tracked.condition.expectedTaskReports==0) {
                    success = finishBatch(tracked, tuple);
            } catch(FailedException e) {
                failBatch(tracked, e);
            if(success) {
            } else {

    private void failBatch(TrackedBatch tracked, FailedException e) {
        if(e!=null && e instanceof ReportedFailedException) {
        tracked.failed = true;
        if(tracked.delayedAck!=null) {
            tracked.delayedAck = null;

    private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
        if(tracked.failed) {
        CoordCondition cond = tracked.condition;
        boolean delayed = tracked.delayedAck==null &&
                              (cond.commitStream!=null && type==TupleType.COMMIT
                               || cond.commitStream==null);
        if(delayed) {
            tracked.delayedAck = tuple;
        boolean failed = false;
        if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
            if(tracked.receivedTuples == tracked.expectedTupleCount) {
                finishBatch(tracked, tuple);                
            } else {
                //TODO: add logging that not all tuples were received
                failed = true;
        if(!delayed && !failed) {

  • In the execute method, one will be created when TrackedBatch does not exist, and the _ bolt.initbatch method will be called when creating it.
  • The header here can see that when receiving a normal tuple, it first calls _bolt.execute (, tuple) to execute, and then calls _collector’s ack. if _ bolt.execute throws a FailedException, it directly failBatch. It will mark tracked.failed as true, and finally call checkFinish after the tuple sending and receiving of the whole batch is finished. once tracked.failed is found, it will call _
  • There are two types of _ bolts here, namely, TridentSpoutExecutor and SubtopologyBolt;; If it is TridentSpoutExecutor, then tracked.condition.expertedtaskreports is 0, and every tuple (Actually, it is to issue a batch instruction.), finishBatch; immediately after _bolt.execute; For SubtopologyBolt, here tracked.condition.expertedtaskreports is not 0, and you need to wait until the last [id,count] command to checkFinish.


storm-core-1.2.2-sources.jar! /org/apache/storm/trident/spout/

    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) {
        _emitter = _spout.getEmitter(_txStateId, conf, context);
        _collector = new AddIdCollector(_streamName, collector);

    public void execute(BatchInfo info, Tuple input) {
        // there won't be a BatchInfo for the success stream
        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
        if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
            if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
                ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
            } else {
                 throw new FailedException("Received commit for different transaction attempt");
        } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            // valid to delete before what's been committed since 
            // those batches will never be accessed again
        } else {            
            _emitter.emitBatch(attempt, input.getValue(1), _collector);
            _activeBatches.put(attempt.getTransactionId(), attempt);

    public void finishBatch(BatchInfo batchInfo) {

    public Object initBatchState(String batchGroup, Object batchId) {
        return null;
  • TridentSpoutExecutor uses AddIdCollector, and its initBatchState and finishBatch methods are null operations.
  • The execute method is divided into COMMIT_stream_ID, SUCCESS_STREAM_ID and normal STREAM
  • The tuple sent by the ordinary stream is the instruction to launch batch. here you call _emitter.emitBatch to launch batch’s tuples


storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/

    public Object initBatchState(String batchGroup, Object batchId) {
        ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
        return ret;

    public void execute(BatchInfo batchInfo, Tuple tuple) {
        String sourceStream = tuple.getSourceStreamId();
        InitialReceiver ir = _roots.get(sourceStream);
        if(ir==null) {
            throw new RuntimeException("Received unexpected tuple " + tuple.toString());
        ir.receive((ProcessorContext) batchInfo.state, tuple);

    public void finishBatch(BatchInfo batchInfo) {
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext) batchInfo.state);

    protected static class InitialReceiver {
        List<TridentProcessor> _receivers = new ArrayList<>();
        RootFactory _factory;
        ProjectionFactory _project;
        String _stream;
        public InitialReceiver(String stream, Fields allFields) {
            // TODO: don't want to project for non-batch bolts...???
            // how to distinguish "batch" streams from non-batch streams?
            _stream = stream;
            _factory = new RootFactory(allFields);
            List<String> projected = new ArrayList<>(allFields.toList());
            _project = new ProjectionFactory(_factory, new Fields(projected));
        public void receive(ProcessorContext context, Tuple tuple) {
            TridentTuple t = _project.create(_factory.create(tuple));
            for(TridentProcessor r: _receivers) {
                r.execute(context, _stream, t);
        public void addReceiver(TridentProcessor p) {
        public Factory getOutputFactory() {
            return _project;
  • Its initBatchState method creates a ProcessorContext and then calls TridentProcessor (For example, AggregateProcessor, EachProcessorThe startBatch method of
  • The execute method calls the execute of InitialReceiver, while it calls the execute method of TridentProcessor (For example, AggregateProcessor)
  • When finishBatch is called TridentProcessor (For example, AggregateProcessor, EachProcessorThe finishBatch method of


storm-core-1.2.2-sources.jar! /org/apache/storm/trident/windowing/

    public void startBatch(ProcessorContext processorContext) {
        // initialize state for batch
        processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>();

    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        // add tuple to the batch state
        Object state = processorContext.state[tridentContext.getStateIndex()];
        ((List<TridentTuple>) state).add(projection.create(tuple));

    public void finishBatch(ProcessorContext processorContext) {

        Object batchId = processorContext.batchId;
        Object batchTxnId = getBatchTxnId(batchId);

        LOG.debug("Received finishBatch of : [{}] ", batchId);
        // get all the tuples in a batch and add it to trident-window-manager
        List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
        tridentWindowManager.addTuplesBatch(batchId, tuples);

        List<Integer> pendingTriggerIds = null;
        List<String> triggerKeys = new ArrayList<>();
        Iterable<Object> triggerValues = null;

        if (retriedAttempt(batchId)) {
            pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
            if (pendingTriggerIds != null) {
                for (Integer pendingTriggerId : pendingTriggerIds) {
                triggerValues = windowStore.get(triggerKeys);

        // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
        if(triggerValues == null) {
            pendingTriggerIds = new ArrayList<>();
            Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
            try {
                Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                List<Object> values = new ArrayList<>();
                StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
                while (pendingTriggersIter.hasNext()) {
                    triggerResult =;
                    for (List<Object> aggregatedResult : triggerResult.result) {
                        String triggerKey = triggerKey(;
                triggerValues = values;
            } finally {
                // store inprocess triggers of a batch in store for batch retries for any failures
                if (!pendingTriggerIds.isEmpty()) {
                    windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);

        int i = 0;
        for (Object resultValue : triggerValues) {
            collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
  • It can be seen that WindowTridentProcessor gave ProcessorContext.State [TridentContext.GetStateIndex ()] a new list when startBatch.
  • When execute, save the received tuple into processorscontext.state [tridentcontext.getstateindex ()]
  • When finishBatch, add the data of processorscontext.state [tridentcontext.getstateindex ()] to windowStore and windowManager’s ConcurrentLinkedQueue.
  • The trigger of window will fetch the window data from ConcurrentLinkedQueue and add it to pendingTriggers. However, WindowTridentProcessor will remove the data of pendingTriggers when finishBatch, and then conduct emit through FreshCollector.
  • The data transmitted by FreshCollector will be received and processed by its TupleReceiver (For example, ProjectedProcessor, PartitionPersistProcessor), PartitionPersistprocessor stores data in state, while ProjectedProcessor extracts fields according to window’s outputFields, and then passes the data to various processors downstream, such as EachProcessor.


  • Trident spout launches a batch of data, and then waits for downstream execution of the batch data to finishBatch; batch by batch. For bolt and bolt, the ack interval between tuples depends on the processing time of each tuple (TridentBoltExecutor will automatically help you with ack after the tuple is processed.), if the overall processing time is too long, it will cause the tuple processing of the entire topology to time out, triggering the fail operation of spout. this time, the batchId will be triggered again. if spout is transactional, the tuples corresponding to batchId will not change when triggered again.
  • The window operation will disturb the original batch of tridentsport. the data of a batch is first accumulated in the state of the ProcessContext (WindowTridentProcessor resets state every time it startBatch.), when finishBatch, copy the data to windowStore and windowManager’s ConcurrentLinkedQueue, then wait for window trigger to trigger, calculate the window data, and then put it into pendingTriggers. In bolt finishBatch, the window data is removed from the pendingTriggers and then sent to the FreshCollector and then processed by the downstream processor, while the startBatch and finishBatch of the downstream processor follow the rhythm of the original spout instead of being triggered by the Window.
  • Assuming a steady stream of data, the speed at which spout sends batch depends on CONFIG. TOPOLOGY _ TRIDENT _ BATCH _ EMIT _ INTERVAL _ MILLIS (Topology.tritent.batch.emit.interval.millis defaults to 500 in defaults.yaml) parameter, and the window interval is usually larger than the default batch interval, so the window will aggregate multiple batch data; At the same time, because the data was added to the windowManager’s ConcurrentLinkedQueue only when finishing batch, the pendingTriggers at this time did not have data, so the data obtained from the window during the previous finishing batch were usually empty, so the subsequent processor also did not have data processing, and attention should be paid to blank judgment to prevent the occurrence of null pointers.
  • If the data is groupBy/partitionBy, when parallelism is 1, groupBy/partitionBy is based on batch at this time. When parallelism is greater than 1, the original spout will be distributed to multiple partitions/tasks when emit has a batch. the data stream of the original batch will be split, and each task will execute its own finishBatch operation (The tuple is in the order of emit, and the last one is [id,count], which is equivalent to an instruction to end batch and is used to detect and trigger the completion of batch operation.), and then send the data of the new batch to the downstream, and send [id,cout] when the new batch is sent out, and sequentially carry out batch operation on the downstream bolt; The global operation distributes data to the same partition/task;; Batchglobal has the same effect as Global when parallelism is 1. When parallelism is greater than 1, data will be distributed to different partitions/tasks according to batchId.
  • The aggregate operation is used to aggregate data. Generally, with the cooperation of groupBy or partitionBy, upstream batch will be shunted again, and then aggregate according to the shunted batch. At this time, if parallelism is greater than 1, the task is divided into tasks for aggregation. If you want to aggregate these tasks together later, you can cooperate with the global().aggregate () operation. As long as there is no window operation in the middle, the final aggregate will be based on the original batch, because the tracked.condition.expertedtaskreports of TridentBoltExecutor recorded which task the bolt needs to wait for to report [id,count], and after receiving [id, Count] data, will first determine whether tracked.reportedTasks is equal to cond.expectedTaskReports, equal to determine whether tracked.receivedTuples is equal to tracked.expectedTupleCount, equal to finishBatch, complete the current batch, and then send [id,count] data downstream; Judging from expectedTaskReports, it is true that the whole batch can be finally aggregated according to the original batch after multiple task splitting processes. However, it should be noted that the window operation will disturb the original batch of trident spout during the window phase.