[case44] talk about storetritdent’s operations.

  storm

Order

This article mainly studies the operations of storm trident.

function filter projection

Function

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/operation/Function.java

public interface Function extends EachOperation {
    /**
     * Performs the function logic on an individual tuple and emits 0 or more tuples.
     *
     * @param tuple The incoming tuple
     * @param collector A collector instance that can be used to emit tuples
     */
    void execute(TridentTuple tuple, TridentCollector collector);
}
  • Function defines the execute method, and the fields it emits are appended to the input tuple

Filter

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/operation/Filter.java

public interface Filter extends EachOperation {

    /**
     * Determines if a tuple should be filtered out of a stream
     *
     * @param tuple the tuple being evaluated
     * @return `false` to drop the tuple, `true` to keep the tuple
     */
    boolean isKeep(TridentTuple tuple);
}
  • Filter provides an isKeep method to determine whether the tuple is output.

projection

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/Stream.java

    /**
     * Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`.
     *
     * For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling"
     *
     * ```java
     * mystream.project(new Fields("b", "d"))
     * ```
     *
     * would produce a stream containing only the fields `["b", "d"]`.
     *
     *
     * @param keepFields The fields in the Stream to keep
     * @return
     */
    public Stream project(Fields keepFields) {
        projectionValidation(keepFields);
        return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));
    }
  • The ProjectedProcessor is used here for the projection operation.

repartitioning operations

partition

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * @param partitioner
     * @return
     */
    public Stream partition(CustomStreamGrouping partitioner) {
        return partition(Grouping.custom_serialized(Utils.javaSerialize(partitioner)));
    }
  • CustomStreamGrouping is used here

partitionBy

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * @param fields
     * @return
     */
    public Stream partitionBy(Fields fields) {
        projectionValidation(fields);
        return partition(Grouping.fields(fields.toList()));
    }
  • Fields is used here.

identityPartition

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * @return
     */
    public Stream identityPartition() {
        return partition(new IdentityGrouping());
    }
  • IdentityGrouping is used here.

shuffle

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * Use random round robin algorithm to evenly redistribute tuples across all target partitions
     *
     * @return
     */
    public Stream shuffle() {
        return partition(Grouping.shuffle(new NullStruct()));
    }
  • Grouping.shuffle is used here.

localOrShuffle

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * Use random round robin algorithm to evenly redistribute tuples across all target partitions, with a preference
     * for local tasks.
     *
     * @return
     */
    public Stream localOrShuffle() {
        return partition(Grouping.local_or_shuffle(new NullStruct()));
    }
  • Local _ or _ shuffle

global

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
     * @return
     */
    public Stream global() {
        // use this instead of storm's built in one so that we can specify a singleemitbatchtopartition
        // without knowledge of storm's internals
        return partition(new GlobalGrouping());
    }
  • GlobalGrouping is used here.

batchGlobal

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     *  All tuples in the batch are sent to the same partition. Different batches in the stream may go to different
     *  partitions.
     *
     * @return
     */
    public Stream batchGlobal() {
        // the first field is the batch id
        return partition(new IndexHashGrouping(0));
    }
  • IndexHashGrouping is used here to repartition the entire batch dimension.

broadcast

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do
     * a stateQuery on every partition of data.
     *
     * @return
     */
    public Stream broadcast() {
        return partition(Grouping.all(new NullStruct()));
    }
  • All is used here.

groupBy

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/Stream.java

    /**
     * ## Grouping Operation
     *
     * @param fields
     * @return
     */
    public GroupedStream groupBy(Fields fields) {
        projectionValidation(fields);
        return new GroupedStream(this, fields);
    }
  • The return here is GroupedStream.

aggregators

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/Stream.java

    //partition aggregate
    public Stream partitionAggregate(Aggregator agg, Fields functionFields) {
        return partitionAggregate(null, agg, functionFields);
    }

    public Stream partitionAggregate(CombinerAggregator agg, Fields functionFields) {
        return partitionAggregate(null, agg, functionFields);
    }

    public Stream partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .partitionAggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) {
        return partitionAggregate(null, agg, functionFields);
    }

    public Stream partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .partitionAggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    //aggregate
    public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .aggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    public Stream aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .aggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
                .aggregate(inputFields, agg, functionFields)
                .chainEnd();
    }

    //persistent aggregate
    public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {
        return persistentAggregate(spec, null, agg, functionFields);
    }

    public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        // replaces normal aggregation here with a global grouping because it needs to be consistent across batches 
        return new ChainedAggregatorDeclarer(this, new GlobalAggScheme())
                .aggregate(inputFields, agg, functionFields)
                .chainEnd()
               .partitionPersist(spec, functionFields, new CombinerAggStateUpdater(agg), functionFields);
    }

    public TridentState persistentAggregate(StateFactory stateFactory, ReducerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, ReducerAggregator agg, Fields functionFields) {
        return persistentAggregate(spec, null, agg, functionFields);
    }

    public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields);
    }
  • Trident’s aggregators are mainly divided into three categories, namely partitionAggregate, aggregate and persistentAggregate; . The aggregator operation changes the output
  • The granularity of the function of partitionAggregate is each partition, not the whole batch.
  • The granularity of the aggregrate operation is batch. for each batch, it uses the global operation to merge the tuple of the batch from all partitions into one partition, and then aggregates the batch. Three types of parameters are provided here, namely Aggregator, CombinerAggregator and ReducerAggregator; . When calling the stream.aggregrate method, it is equivalent to a global aggregation. When using the Aggregator or ReducerAggregator, stream will divide the tuple into a partition before performing the Aggregation operation. When using the CombinerAggregator, trident will optimize, first perform a local aggregation operation on each partition, then divide it into a partition, and finally perform an aggregation operation, thus saving network transmission time compared with the Aggregator or ReducerAggregator
  • The persistentAggregate operation aggregates the tuple of all batch on the stream and stores the results in state

Aggregator

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/operation/Aggregator.java

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T val, TridentTuple tuple, TridentCollector collector);
    void complete(T val, TridentCollector collector);
}
  • The Aggregator first calls init to initialize, and then passes the parameters to the aggregator and complete methods.
  • Aggregate; is performed once for each tuple in batch partition; The complete method is executed after the tuple in batch partition finishes executing aggregate.
  • Assuming that the custom Aggregator is an accumulation operation, init is 0 for [4], [7], [8] tuple, and val=0, 0+4 = 4 for [4]; For [7], val=4, 4+7 = 11; For [8], val=11, 11+8 = 19; Then batch ends, val=19, complete is executed, and collector can be used to transmit data.

CombinerAggregator

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/operation/CombinerAggregator.java

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}
  • Every time the CombinerAggregator receives a tuple, it calls init to get the value of the current tuple, and calls the combine operation to use the result of the previous combination (If not, take the value of zero.) and the value obtained by init, and returns the value of the zero method if there is no tuple in the partition.
  • Assuming that combination is an accumulation operation and zero returns 0, then for [4], [7], [8] tuple, init values are 4, 7 and 8 respectively, and for [4], there is no previous combination result, so val1=0, val2=4 and combination result is 4; For [7], val1=4, val2=7, the combination result is 11; For [8], val1 is 11, val2 is 8, and the combine result is 19
  • Combineraggratator operates with relatively low network overhead, so its performance is better than the other two types of Aggregators.

ReducerAggregator

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/operation/ReducerAggregator.java

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}
  • When ReducerAggregator calculates a batch of tuple, it first calls init to obtain the initial value, and then performs reduce operation. curr value is the value of the previous reduce operation, otherwise, it is init value
  • Assuming reduce is an accumulation operation and init returns 0, init is 0 for [4], [7], [8] and [4], then curr=0, first 0+4 = 4; For [7], curr is 4, which means 4+7 = 11; For [8], curr is 11, and finally 11+8=19

topology stream operations

join

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/TridentTopology.java

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields);        
    }
    
    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields) {
        return join(streams, joinFields, outFields, JoinType.INNER);        
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type);        
    }
    
    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
        return join(streams, joinFields, outFields, repeat(streams.size(), type));
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);
        
    }
    
    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
        return join(streams, joinFields, outFields, mixed, JoinOutFieldsMode.COMPACT);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinOutFieldsMode mode) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mode);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinOutFieldsMode mode) {
        return join(streams, joinFields, outFields, JoinType.INNER, mode);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type, mode);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
        return join(streams, joinFields, outFields, repeat(streams.size(), type), mode);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed, mode);

    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
        switch (mode) {
            case COMPACT:
                return multiReduce(strippedInputFields(streams, joinFields),
                        groupedStreams(streams, joinFields),
                        new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
                        outFields);

            case PRESERVE:
                return multiReduce(strippedInputFields(streams, joinFields),
                        groupedStreams(streams, joinFields),
                        new PreservingFieldsOrderJoinerMultiReducer(mixed, joinFields.get(0).size(),
                                getAllOutputFields(streams), joinFields, strippedInputFields(streams, joinFields)),
                        outFields);

            default:
                throw new IllegalArgumentException("Unsupported out-fields mode: " + mode);
        }
    }
  • You can see that join finally called multiReduce, the GroupedMultiReducer used for COMPACT type is the JoinerMultiReducer, and the GroupedMultiReducer used for PRESERVE type is the PreservingFieldSorterJoinerMultiReducer

merge

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/TridentTopology.java

    public Stream merge(Fields outputFields, Stream... streams) {
        return merge(outputFields, Arrays.asList(streams));
    }
    
    public Stream merge(Stream... streams) {
        return merge(Arrays.asList(streams));
    }
    
    public Stream merge(List<Stream> streams) {
        return merge(streams.get(0).getOutputFields(), streams);
    } 

    public Stream merge(Fields outputFields, List<Stream> streams) {
        return multiReduce(streams, new IdentityMultiReducer(), outputFields);
    }
  • It can be seen that merge finally called multiReduce, and the MultiReducer used is IdentityMultiReducer.

multiReduce

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/TridentTopology.java

    public Stream multiReduce(Stream s1, Stream s2, MultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(s1, s2), function, outputFields);        
    }

    public Stream multiReduce(Fields inputFields1, Stream s1, Fields inputFields2, Stream s2, MultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);        
    }    
    
    public Stream multiReduce(GroupedStream s1, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(s1, s2), function, outputFields);        
    }
    
    public Stream multiReduce(Fields inputFields1, GroupedStream s1, Fields inputFields2, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);        
    } 
    
    public Stream multiReduce(List<Stream> streams, MultiReducer function, Fields outputFields) {
        return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
    }
        
    public Stream multiReduce(List<GroupedStream> streams, GroupedMultiReducer function, Fields outputFields) {
        return multiReduce(getAllOutputFields(streams), streams, function, outputFields);        
    }    
    
    public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
        List<Fields> fullInputFields = new ArrayList<>();
        List<Stream> streams = new ArrayList<>();
        List<Fields> fullGroupFields = new ArrayList<>();
        for(int i=0; i<groupedStreams.size(); i++) {
            GroupedStream gs = groupedStreams.get(i);
            Fields groupFields = gs.getGroupFields();
            fullGroupFields.add(groupFields);
            streams.add(gs.toStream().partitionBy(groupFields));
            fullInputFields.add(TridentUtils.fieldsUnion(groupFields, inputFields.get(i)));
            
        }
        return multiReduce(fullInputFields, streams, new GroupedMultiReducerExecutor(function, fullGroupFields, inputFields), outputFields);
    }

    public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
        List<String> names = new ArrayList<>();
        for(Stream s: streams) {
            if(s._name!=null) {
                names.add(s._name);
            }
        }
        Node n = new ProcessorNode(getUniqueStreamId(), Utils.join(names, "-"), outputFields, outputFields, new MultiReducerProcessor(inputFields, function));
        return addSourcedNode(streams, n);
    }
  • The multiReduce method has a MultiReducer parameter. although join and merge call multiReduce, they pass different MultiReducer values.

Summary

  • Trident’s operations are mainly of several types. One type is the basic function, filter and projection operations. One is repartitioning operation, mainly some grouping; ; One type is the aggregate operation, including aggregate, partitionAggregate, persistentAggregate; ; One is join and merge operations on stream in topology.
  • Function, if there is emit field will be appended to the original tuple; Filter is used to filter tuple; ; Projection is used to extract fields
  • The repartitioning operations include Grouping.local_or_shuffle, Grouping.shuffle, Grouping.all, GlobalGrouping, CustomStreamGrouping, IdentityGrouping, IndexHashGrouping, etc. The partition operation can be understood as assigning the input tuple to the task or grouping the stream.
  • In the case of aggregate Operation, there are three types of interfaces for common aggregate operations, namely, Aggregator, CombinerAggregator and ReducerAggregator. among them, Aggregator is the most common, which inherits the operation interface and can be used in method parameters, which is not available in CombinerAggregator and ReducerAggregator. However, CombinerAggregator is different from Aggregator and ReducerAggregator in that when calling the stream.aggregrate method, trident will preferentially perform local aggregation in the partition and then return to one partition for final aggregation, thus saving network transmission time. However, if the CombinerAggregator and the non-CombinerAggregator are used together, they will not enjoy the optimization. PartitionAggregate mainly operates on the partition dimension; On the other hand, persistentAggregate operates on the tuple of all batch in the entire stream dimension, and the result is persisted on the state.
  • For join and merge operations of stream, it is finally implemented by multi-reduce, but the passed multiReduce values are different. If you join, you need fields to match (Field names can be different), you can choose JoinType, INNER or OUTER, but join is for spout’s small batch. Merge, it is pure a few stream for the reunion of the total.

doc