Talk about FreshCollector of storemwindowtridentprocessor.

  storm

Order

This article mainly studies FreshCollector of storemwindowtridentprocessor.

Example

        TridentTopology topology = new TridentTopology();
        topology.newStream("spout1", spout)
                .partitionBy(new Fields("user"))
                .window(windowConfig,windowsStoreFactory,new Fields("user","score"),new UserCountAggregator(),new Fields("aggData"))
                .parallelismHint(1)
                .each(new Fields("aggData"), new PrintEachFunc(),new Fields());
  • This instance is followed by an each operation after the window operation.

WindowTridentProcessor

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/windowing/WindowTridentProcessor.java

public class WindowTridentProcessor implements TridentProcessor {
    
    private FreshCollector collector;

    //......

    public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {
        this.topologyContext = context;
        List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
        if (parents.size() != 1) {
            throw new RuntimeException("Aggregation related operation can only have one parent");
        }

        Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);

        this.tridentContext = tridentContext;
        collector = new FreshCollector(tridentContext);
        projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);

        windowStore = windowStoreFactory.create(stormConf);
        windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
        windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);

        tridentWindowManager = storeTuplesInStore ?
                new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
                : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());

        tridentWindowManager.prepare();
    }

    public void finishBatch(ProcessorContext processorContext) {

        Object batchId = processorContext.batchId;
        Object batchTxnId = getBatchTxnId(batchId);

        LOG.debug("Received finishBatch of : [{}] ", batchId);
        // get all the tuples in a batch and add it to trident-window-manager
        List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
        tridentWindowManager.addTuplesBatch(batchId, tuples);

        List<Integer> pendingTriggerIds = null;
        List<String> triggerKeys = new ArrayList<>();
        Iterable<Object> triggerValues = null;

        if (retriedAttempt(batchId)) {
            pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
            if (pendingTriggerIds != null) {
                for (Integer pendingTriggerId : pendingTriggerIds) {
                    triggerKeys.add(triggerKey(pendingTriggerId));
                }
                triggerValues = windowStore.get(triggerKeys);
            }
        }

        // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
        if(triggerValues == null) {
            pendingTriggerIds = new ArrayList<>();
            Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
            try {
                Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                List<Object> values = new ArrayList<>();
                StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
                while (pendingTriggersIter.hasNext()) {
                    triggerResult = pendingTriggersIter.next();
                    for (List<Object> aggregatedResult : triggerResult.result) {
                        String triggerKey = triggerKey(triggerResult.id);
                        triggerKeys.add(triggerKey);
                        values.add(aggregatedResult);
                        pendingTriggerIds.add(triggerResult.id);
                    }
                    pendingTriggersIter.remove();
                }
                triggerValues = values;
            } finally {
                // store inprocess triggers of a batch in store for batch retries for any failures
                if (!pendingTriggerIds.isEmpty()) {
                    windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
                }
            }
        }

        collector.setContext(processorContext);
        int i = 0;
        for (Object resultValue : triggerValues) {
            collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
        }
        collector.setContext(null);
    }
}
  • WindowTridentProcessor created FreshCollector when prepare.
  • When finishBatch, call FreshCollector.emit to pass the result set of the window’s aggregate.
  • The data structure passed is ConsList, which is actually an implementation of AbstractList, consisting of first element of type Object and _elems of List<Object > structure

FreshCollector

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/processor/FreshCollector.java

public class FreshCollector implements TridentCollector {
    FreshOutputFactory _factory;
    TridentContext _triContext;
    ProcessorContext context;
    
    public FreshCollector(TridentContext context) {
        _triContext = context;
        _factory = new FreshOutputFactory(context.getSelfOutputFields());
    }
                
    public void setContext(ProcessorContext pc) {
        this.context = pc;
    }

    @Override
    public void emit(List<Object> values) {
        TridentTuple toEmit = _factory.create(values);
        for(TupleReceiver r: _triContext.getReceivers()) {
            r.execute(context, _triContext.getOutStreamId(), toEmit);
        }            
    }

    @Override
    public void reportError(Throwable t) {
        _triContext.getDelegateCollector().reportError(t);
    } 

    public Factory getOutputFactory() {
        return _factory;
    }    
}
  • FreshCollector in the constructor according to context’s selfOutputFields (The first field is fixed as _task_info, and the following fields are the functionFields defined by the user in the window method.) to construct FreshOutputFactory
  • Emit method, first use FreshOutputFactory to construct TridentTupleView according to outputFields, then obtain TupleReceiver, and call execute method of TupleReceiver to pass TridentTupleView over.
  • There are ProjectedProcessor and PartitionPersistProcessor here.

TridentTupleView.FreshOutputFactory

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/tuple/TridentTupleView.java

    public static class FreshOutputFactory  implements Factory {
        Map<String, ValuePointer> _fieldIndex;
        ValuePointer[] _index;

        public FreshOutputFactory(Fields selfFields) {
            _fieldIndex = new HashMap<>();
            for(int i=0; i<selfFields.size(); i++) {
                String field = selfFields.get(i);
                _fieldIndex.put(field, new ValuePointer(0, i, field));
            }
            _index = ValuePointer.buildIndex(selfFields, _fieldIndex);
        }
        
        public TridentTuple create(List<Object> selfVals) {
            return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex);
        }

        @Override
        public Map<String, ValuePointer> getFieldIndex() {
            return _fieldIndex;
        }

        @Override
        public int numDelegates() {
            return 1;
        }
        
        @Override
        public List<String> getOutputFields() {
            return indexToFieldsList(_index);
        }        
    }
  • FreshOutputFactory is a static class of TridentTupleView. its construction method is mainly to calculate _index and _fieldIndex
  • _fieldIndex is a map, key is a field field, and value is valuepoint, recording its delegateIndex (Fixed here as 0), index and field information; The first field is _task_info, index is 0; The following fields are the functionFields defined by the user in the window method.
  • The create method here is mainly to construct TridentTupleView. The first value of its constructor is IPersistentVector, the second value is _index, and the third value is _fieldIndex

ValuePointer

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/tuple/ValuePointer.java

public class ValuePointer {
    public static Map<String, ValuePointer> buildFieldIndex(ValuePointer[] pointers) {
        Map<String, ValuePointer> ret = new HashMap<String, ValuePointer>();
        for(ValuePointer ptr: pointers) {
            ret.put(ptr.field, ptr);
        }
        return ret;        
    }

    public static ValuePointer[] buildIndex(Fields fieldsOrder, Map<String, ValuePointer> pointers) {
        if(fieldsOrder.size()!=pointers.size()) {
            throw new IllegalArgumentException("Fields order must be same length as pointers map");
        }
        ValuePointer[] ret = new ValuePointer[pointers.size()];
        for(int i=0; i<fieldsOrder.size(); i++) {
            ret[i] = pointers.get(fieldsOrder.get(i));
        }
        return ret;
    }    
    
    public int delegateIndex;
    protected int index;
    protected String field;
    
    public ValuePointer(int delegateIndex, int index, String field) {
        this.delegateIndex = delegateIndex;
        this.index = index;
        this.field = field;
    }

    @Override
    public String toString() {
        return ToStringBuilder.reflectionToString(this);
    }    
}
  • BuildIndex here mainly returns the ValuePointer array according to the order of selfOutputFields.

ProjectedProcessor

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/processor/ProjectedProcessor.java

public class ProjectedProcessor implements TridentProcessor {
    Fields _projectFields;
    ProjectionFactory _factory;
    TridentContext _context;
    
    public ProjectedProcessor(Fields projectFields) {
        _projectFields = projectFields;
    }
    
    @Override
    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
        if(tridentContext.getParentTupleFactories().size()!=1) {
            throw new RuntimeException("Projection processor can only have one parent");
        }
        _context = tridentContext;
        _factory = new ProjectionFactory(tridentContext.getParentTupleFactories().get(0), _projectFields);
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void startBatch(ProcessorContext processorContext) {
    }

    @Override
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        TridentTuple toEmit = _factory.create(tuple);
        for(TupleReceiver r: _context.getReceivers()) {
            r.execute(processorContext, _context.getOutStreamId(), toEmit);
        }
    }

    @Override
    public void finishBatch(ProcessorContext processorContext) {
    }

    @Override
    public Factory getOutputFactory() {
        return _factory;
    }
}
  • ProjectedProcessor created ProjectionFactory when prepare, and its _projectFields is the functionFields defined by the window method. Here, the first Factory of parent is also extracted by using TridentContext. GetParentCollections (). Get (0). Because it is passed by FreshCollector, this is TridentCollectionView. FreshShowPutFactory
  • When execute, the ProjectionFactory.create method is first called to extract fields from TridentTupleView. toEmit is the TridentTupleView re-extracted from functionFields defined by the window method.
  • After the execute method, the execute operation is called one by one to _context.getreceiver (), and toEmit is passed. Receivers here are various processor after window operation, such as EachProcessor.

TridentTupleView.ProjectionFactory

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/tuple/TridentTupleView.java

public static class ProjectionFactory implements Factory {
        Map<String, ValuePointer> _fieldIndex;
        ValuePointer[] _index;
        Factory _parent;

        public ProjectionFactory(Factory parent, Fields projectFields) {
            _parent = parent;
            if(projectFields==null) projectFields = new Fields();
            Map<String, ValuePointer> parentFieldIndex = parent.getFieldIndex();
            _fieldIndex = new HashMap<>();
            for(String f: projectFields) {
                _fieldIndex.put(f, parentFieldIndex.get(f));
            }            
            _index = ValuePointer.buildIndex(projectFields, _fieldIndex);
        }
        
        public TridentTuple create(TridentTuple parent) {
            if(_index.length==0) return EMPTY_TUPLE;
            else return new TridentTupleView(((TridentTupleView)parent)._delegates, _index, _fieldIndex);
        }

        @Override
        public Map<String, ValuePointer> getFieldIndex() {
            return _fieldIndex;
        }

        @Override
        public int numDelegates() {
            return _parent.numDelegates();
        }

        @Override
        public List<String> getOutputFields() {
            return indexToFieldsList(_index);
        }
    }
  • ProjectionFactory is a static class of TridentTupleView. it constructs _index and _fieldIndex according to projectFields in the constructor, so the create method can create TridentTupleView according to the required fields.

EachProcessor

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/processor/EachProcessor.java

public class EachProcessor implements TridentProcessor {
    Function _function;
    TridentContext _context;
    AppendCollector _collector;
    Fields _inputFields;
    ProjectionFactory _projection;
    
    public EachProcessor(Fields inputFields, Function function) {
        _function = function;
        _inputFields = inputFields;
    }
    
    @Override
    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
        List<Factory> parents = tridentContext.getParentTupleFactories();
        if(parents.size()!=1) {
            throw new RuntimeException("Each operation can only have one parent");
        }
        _context = tridentContext;
        _collector = new AppendCollector(tridentContext);
        _projection = new ProjectionFactory(parents.get(0), _inputFields);
        _function.prepare(conf, new TridentOperationContext(context, _projection));
    }

    @Override
    public void cleanup() {
        _function.cleanup();
    }    

    @Override
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        _collector.setContext(processorContext, tuple);
        _function.execute(_projection.create(tuple), _collector);
    }

    @Override
    public void startBatch(ProcessorContext processorContext) {
    }

    @Override
    public void finishBatch(ProcessorContext processorContext) {
    }

    @Override
    public Factory getOutputFactory() {
        return _collector.getOutputFactory();
    }    
}
  • The execute method of EachProcessor first sets the context of _collector to processorContext, and then calls the _function.execute method.
  • Here, the _projection.create(tuple) is called to extract the fields, mainly according to the inputFields defined by the _function
  • The collector passed to _function here is AppendCollector

AppendCollector

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/processor/AppendCollector.java

public class AppendCollector implements TridentCollector {
    OperationOutputFactory _factory;
    TridentContext _triContext;
    TridentTuple tuple;
    ProcessorContext context;
    
    public AppendCollector(TridentContext context) {
        _triContext = context;
        _factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());
    }
                
    public void setContext(ProcessorContext pc, TridentTuple t) {
        this.context = pc;
        this.tuple = t;
    }

    @Override
    public void emit(List<Object> values) {
        TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);
        for(TupleReceiver r: _triContext.getReceivers()) {
            r.execute(context, _triContext.getOutStreamId(), toEmit);
        }
    }

    @Override
    public void reportError(Throwable t) {
        _triContext.getDelegateCollector().reportError(t);
    } 
    
    public Factory getOutputFactory() {
        return _factory;
    }
}
  • AppendCollector creates OperationOutputFields in the constructor, and its emit method is also to extract OperationOutputFields, and then call the _triContext.getReceivers () execute method one by one. If there is no other operation after each, AppendCollector’s _ tricContext. getReceivers () is empty

Summary

  • The windowridentprocessor uses a FreshCollector. when windowridentprocessor finishBatch, it extracts pendingTriggers (After selection, its data is removed from pendingTriggers.), which contains the accumulated data of the window, and then uses FreshCollector to transmit the data. The default first value is TriggerInfo, and the second value is the accumulated values of the window.
  • FreshCollector’s emit method first uses tridenttupleview.freshoutputfield according to selfOutputFields (The first field is fixed as _task_info, and the following fields are the functionFields defined by the user in the window method.) and then call the execute method of _ trincontext.getreceivers () one by one.
  • There is a ProjectedProcessor in the subsequent receivers, which is used for the TridentTupleView re-extracted according to the functionFields defined by the window method. Its execute method is also similar to the FreshCollector.emit method. First, the required fields are extracted to construct TridentTupleView, and then the execute method (For example, EachProcessor.execute)
  • The collector used by EachProcessor is AppendCollector, and its emit method is similar to the emit method of FreshCollector. first, field extraction is performed to construct TridentTupleView, and then the execute method of _triContext.getReceivers () is called one by one.
  • FreshCollector’s emit method is very similar to ProjectedProcessor’s execute method and AppendCollector’s emit method. First, the Factory is used to extract the required fields to build TridentTupleView, and then the _triContext.getReceivers () execute method is called one by one. When an _ trincontext has no receiver, the delivery of tuple is stopped.

doc