Talking about the construction of storetritdenttopology

  storm

Order

This article mainly studies the construction of storetritdent topology.

Example

    @Test
    public void testDebugTopologyBuild(){
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,
                new Values("nickt1", 4),
                new Values("nickt2", 7),
                new Values("nickt3", 8),
                new Values("nickt4", 9),
                new Values("nickt5", 7),
                new Values("nickt6", 11),
                new Values("nickt7", 5)
        );
        spout.setCycle(false);
        TridentTopology topology = new TridentTopology();
        Stream stream1 = topology.newStream("spout1",spout)
                .each(new Fields("user", "score"), new BaseFunction() {
                    @Override
                    public void execute(TridentTuple tuple, TridentCollector collector) {
                        System.out.println("tuple:"+tuple);
                    }
                },new Fields());

        topology.build();
    }
  • For the sake of simplicity, many of the following analyses are based on this example.

TridentTopology.newStream

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/TridentTopology.java

    public Stream newStream(String txId, IRichSpout spout) {
        return newStream(txId, new RichSpoutBatchExecutor(spout));
    }
    
    public Stream newStream(String txId, IPartitionedTridentSpout spout) {
        return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
    }
    
    public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
    }

    public Stream newStream(String txId, ITridentDataSource dataSource) {
        if (dataSource instanceof IBatchSpout) {
            return newStream(txId, (IBatchSpout) dataSource);
        } else if (dataSource instanceof ITridentSpout) {
            return newStream(txId, (ITridentSpout) dataSource);
        } else if (dataSource instanceof IPartitionedTridentSpout) {
            return newStream(txId, (IPartitionedTridentSpout) dataSource);
        } else if (dataSource instanceof IOpaquePartitionedTridentSpout) {
            return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);
        } else {
            throw new UnsupportedOperationException("Unsupported stream");
        }
    }

    public Stream newStream(String txId, IBatchSpout spout) {
        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return addNode(n);
    }
    
    public Stream newStream(String txId, ITridentSpout spout) {
        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return addNode(n);
    }

    protected Stream addNode(Node n) {
        registerNode(n);
        return new Stream(this, n.name, n);
    }

    protected void registerNode(Node n) {
        _graph.addVertex(n);
        if(n.stateInfo!=null) {
            String id = n.stateInfo.id;
            if(!_colocate.containsKey(id)) {
                _colocate.put(id, new ArrayList());
            }
            _colocate.get(id).add(n);
        }
    }
  • The first parameter of newStream is txId and the second parameter is ITridentDataSource
  • ITridentDataSource is divided into several types, including IBatchSpout, ITridentSpout, IPartitionedTridentSpout, IOpaquePartitionedTridentSpout
  • The last thing is to create a SpoutNode, and then registerNode is added to _graph (If node's stateInfo is not null, it is also added to _colocate, but SpoutNode has a null value.Note that the SpoutType of SpoutNode is SpoutNode.SpoutType.BATCH

Node

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/Node.java

public class Node extends DefaultResourceDeclarer<Node> implements Serializable {
    private static final AtomicInteger INDEX = new AtomicInteger(0);
    private String nodeId;

    public String name = null;
    public Fields allOutputFields;
    public String streamId;
    public Integer parallelismHint = null;
    public NodeStateInfo stateInfo = null;
    public int creationIndex;

    public Node(String streamId, String name, Fields allOutputFields) {
        this.nodeId = UUID.randomUUID().toString();
        this.allOutputFields = allOutputFields;
        this.streamId = streamId;
        this.name = name;
        this.creationIndex = INDEX.incrementAndGet();
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        return nodeId.equals(((Node) o).nodeId);
    }

    @Override
    public int hashCode() {
        return nodeId.hashCode();
    }

    @Override
    public String toString() {
        return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
    }

    public String shortString() {
        return "nodeId: " + nodeId + ", allOutputFields: " + allOutputFields;
    }
}
  • Node inherits DefaultresourcesDeclarer, and it implements Resource-related interfaces: ResourceDeclarer and ITridentResource.
  • Node has several subclasses, namely SpoutNode, ProcessorNode, PartitionNode.
  • SpoutNode is the node description of spout information. ProcessorNode is generally the node description of each, map, aggregrate, reduce, project and other operations of trident. PartitionNode is the node description related to partition

TridentTopology.build

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/TridentTopology.java

    public StormTopology build() {
        DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone();
        
        //......
        
        List<SpoutNode> spoutNodes = new ArrayList<>();
        
        // can be regular nodes (static state) or processor nodes
        Set<Node> boltNodes = new LinkedHashSet<>();
        for(Node n: graph.vertexSet()) {
            if(n instanceof SpoutNode) {
                spoutNodes.add((SpoutNode) n);
            } else if(!(n instanceof PartitionNode)) {
                boltNodes.add(n);
            }
        }
        
        Set<Group> initialGroups = new LinkedHashSet<>();

        //......

        for(Node n: boltNodes) {
            initialGroups.add(new Group(graph, n));
        }
        
        
        GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
        grouper.mergeFully();
        Collection<Group> mergedGroups = grouper.getAllGroups();
        
        
        
        // add identity partitions between groups
        for(IndexedEdge<Node> e: new HashSet<>(graph.edgeSet())) {
            if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {                
                Group g1 = grouper.nodeGroup(e.source);
                Group g2 = grouper.nodeGroup(e.target);
                // g1 being null means the source is a spout node
                if(g1==null && !(e.source instanceof SpoutNode))
                    throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
                if(g1==null || !g1.equals(g2)) {
                    graph.removeEdge(e);
                    PartitionNode pNode = makeIdentityPartition(e.source);
                    graph.addVertex(pNode);
                    graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));
                    graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index));                    
                }
            }
        }
        
        //......
        
        // add in spouts as groups so we can get parallelisms
        for(Node n: spoutNodes) {
            grouper.addGroup(new Group(graph, n));
        }
        
        grouper.reindex();
        mergedGroups = grouper.getAllGroups();
                
        
        Map<Node, String> batchGroupMap = new HashMap<>();
        List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
        for(int i=0; i<connectedComponents.size(); i++) {
            String groupId = "bg" + i;
            for(Node n: connectedComponents.get(i)) {
                batchGroupMap.put(n, groupId);
            }
        }
        
//        System.out.println("GRAPH:");
//        System.out.println(graph);
        
        Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);

        TridentTopologyBuilder builder = new TridentTopologyBuilder();

        Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
        Map<Group, String> boltIds = genBoltIds(mergedGroups);

        for(SpoutNode sn: spoutNodes) {
            Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));

            Map<String, Number> spoutRes = new HashMap<>(_resourceDefaults);
            spoutRes.putAll(sn.getResources());

            Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
            Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
            Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

            SpoutDeclarer spoutDeclarer = null;

            if(sn.type == SpoutNode.SpoutType.DRPC) {

                spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
                                                              (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
            } else {
                ITridentSpout s;
                if(sn.spout instanceof IBatchSpout) {
                    s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
                } else if(sn.spout instanceof ITridentSpout) {
                    s = (ITridentSpout) sn.spout;
                } else {
                    throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
                    // TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
                }
                spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
            }

            if(onHeap != null) {
                if(offHeap != null) {
                    spoutDeclarer.setMemoryLoad(onHeap, offHeap);
                }
                else {
                    spoutDeclarer.setMemoryLoad(onHeap);
                }
            }

            if(cpuLoad != null) {
                spoutDeclarer.setCPULoad(cpuLoad);
            }
        }

        for(Group g: mergedGroups) {
            if(!isSpoutGroup(g)) {
                Integer p = parallelisms.get(g);
                Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
                Map<String, Number> groupRes = g.getResources(_resourceDefaults);

                Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
                Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
                Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

                BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
                                                 committerBatches(g, batchGroupMap), streamToGroup);

                if(onHeap != null) {
                    if(offHeap != null) {
                        d.setMemoryLoad(onHeap, offHeap);
                    }
                    else {
                        d.setMemoryLoad(onHeap);
                    }
                }

                if(cpuLoad != null) {
                    d.setCPULoad(cpuLoad);
                }

                Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
                for(PartitionNode n: inputs) {
                    Node parent = TridentUtils.getParent(graph, n);
                    String componentId = parent instanceof SpoutNode ?
                            spoutIds.get(parent) : boltIds.get(grouper.nodeGroup(parent));
                    d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
                }
            }
        }
        HashMap<String, Number> combinedMasterCoordResources = new HashMap<String, Number>(_resourceDefaults);
        combinedMasterCoordResources.putAll(_masterCoordResources);
        return builder.buildTopology(combinedMasterCoordResources);
    }
  • Here, TridentTopologyBuilder is created, and then for spoutNodes, the TridentTopologybuilder.setspout (stringid, stringstream name, stringtxstate id, itridentsport, integer parallelism, stringbatch group) method is called to add sport.
  • For a spout of type IBatchSpout, it is packaged as ITridentSpout by BatchSpoutExecutor.
  • The streamName here is streamId, which is generated by UniqueIdGen.getUniqueStreamId, starting with s, followed by the count of _streamCounter, such as 1, which together is S1; TxStateId is the txId; passed in by the user; BatchGroup starts with bg, followed by the index of the elements of connectedComponents, such as 0, which together is bg0; ; Parallelism parameters are set when users build topology.
  • After spout is set, the relevant resource configuration of spout is set, such as memoryLoad and cpuLoad; ; After that, Bolts are set. Subtopology is used here, and then Bolts-related resource allocation is set.
  • Finally, call trindenttopology builder.buildtopology.

TridentTopologyBuilder.setSpout

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java

    Map<GlobalStreamId, String> _batchIds = new HashMap();
    Map<String, TransactionalSpoutComponent> _spouts = new HashMap();

    public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) {
        Map<String, String> batchGroups = new HashMap();
        batchGroups.put(streamName, batchGroup);
        markBatchGroups(id, batchGroups);

        TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup);
        _spouts.put(id, c);
        return new SpoutDeclarerImpl(c);
    }

    private void markBatchGroups(String component, Map<String, String> batchGroups) {
        for(Map.Entry<String, String> entry: batchGroups.entrySet()) {
            _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue());
        }
    }
  • MarkBatchGroups is called here to add the new component to _batchIds and also to _ sports.

TridentTopologyBuilder.setBolt

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java

    Map<GlobalStreamId, String> _batchIds = new HashMap();
    Map<String, Component> _bolts = new HashMap();

    // map from stream name to batch id
    public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set<String> committerBatches, Map<String, String> batchGroups) {
        markBatchGroups(id, batchGroups);
        Component c = new Component(bolt, parallelism, committerBatches);
        _bolts.put(id, c);
        return new BoltDeclarerImpl(c);
        
    }

    private void markBatchGroups(String component, Map<String, String> batchGroups) {
        for(Map.Entry<String, String> entry: batchGroups.entrySet()) {
            _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue());
        }
    }
  • MarkBatchGroups was called here to add the new component to _batchIds and also to _bolts. For trident, it is a series of ProcessorNode (There may also be PartitionNode.)

TridentTopologyBuilder.buildTopology

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java

   public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
        TopologyBuilder builder = new TopologyBuilder();
        Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
        Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);

        Map<String, List<String>> batchesToCommitIds = new HashMap<>();
        Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
        
        for(String id: _spouts.keySet()) {
            TransactionalSpoutComponent c = _spouts.get(id);
            if(c.spout instanceof IRichSpout) {
                
                //TODO: wrap this to set the stream name
                builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
            } else {
                String batchGroup = c.batchGroupId;
                if(!batchesToCommitIds.containsKey(batchGroup)) {
                    batchesToCommitIds.put(batchGroup, new ArrayList<String>());
                }
                batchesToCommitIds.get(batchGroup).add(c.commitStateId);

                if(!batchesToSpouts.containsKey(batchGroup)) {
                    batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
                }
                batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
                
                
                BoltDeclarer scd =
                      builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                
                for(Map<String, Object> m: c.componentConfs) {
                    scd.addConfigurations(m);
                }
                
                Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
                specs.put(c.batchGroupId, new CoordSpec());
                BoltDeclarer bd = builder.setBolt(id,
                        new TridentBoltExecutor(
                          new TridentSpoutExecutor(
                            c.commitStateId,
                            c.streamName,
                            ((ITridentSpout) c.spout)),
                            batchIdsForSpouts,
                            specs),
                        c.parallelism);
                bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
                bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                if(c.spout instanceof ICommitterTridentSpout) {
                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
                }
                for(Map<String, Object> m: c.componentConfs) {
                    bd.addConfigurations(m);
                }
            }
        }
        
        //......

        Number onHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
        Number offHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
        Number cpuLoad = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

        for(String batch: batchesToCommitIds.keySet()) {
            List<String> commitIds = batchesToCommitIds.get(batch);
            SpoutDeclarer masterCoord = builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));

            if(onHeap != null) {
                if(offHeap != null) {
                    masterCoord.setMemoryLoad(onHeap, offHeap);
                }
                else {
                    masterCoord.setMemoryLoad(onHeap);
                }
            }

            if(cpuLoad != null) {
                masterCoord.setCPULoad(cpuLoad);
            }
        }
                
        for(String id: _bolts.keySet()) {
            Component c = _bolts.get(id);
            
            Map<String, CoordSpec> specs = new HashMap<>();
            
            for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
                String batch = batchIdsForBolts.get(s);
                if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec());
                CoordSpec spec = specs.get(batch);
                CoordType ct;
                if(_batchPerTupleSpouts.containsKey(s.get_componentId())) {
                    ct = CoordType.single();
                } else {
                    ct = CoordType.all();
                }
                spec.coords.put(s.get_componentId(), ct);
            }
            
            for(String b: c.committerBatches) {
                specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
            
            BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
            for(Map<String, Object> conf: c.componentConfs) {
                d.addConfigurations(conf);
            }
            
            for(InputDeclaration inputDecl: c.declarations) {
               inputDecl.declare(d);
            }
            
            Map<String, Set<String>> batchToComponents = getBoltBatchToComponentSubscriptions(id);
            for(Map.Entry<String, Set<String>> entry: batchToComponents.entrySet()) {
                for(String comp: entry.getValue()) {
                    d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(entry.getKey()));
                }
            }
            
            for(String b: c.committerBatches) {
                d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
        }

        return builder.createTopology();
    }
  • BuildTopology for non-IRichSpout spout will create the bolt of TridentSpoutCoordinator in topology, which globalGrouping the masterbatch coordinator.batch _ stream _ id ($batch)、MasterBatchCoordinator.SUCCESS_STREAM_ID($success) these two stream;; At the same time, it also created the bolt of TridentBoltExecutor, which allGrouping the Masterbatch Coordinator. Batch _ Stream _ ID ($batch)、MasterBatchCoordinator.SUCCESS_STREAM_ID($success), for spout is ICommitterTridentSpout type, also allGrouping masterbatch coordinator.commit _ stream _ id ($commit); Note here that a spout other than IRichSpout is converted to bolt.
  • After that, for each batch in batchesToCommitIds, the spout of MasterBatchCoordinator is created, which is just linked up by the front-line TridentSpoutCoordinator and TridentBoltExecutor.
  • For bolt (Subtopology that wraps ProcessorNode), this bolt is set up here, and it directGrouping the bolt.$coord-Com mit _ stream _ id ($commit)

TridentTopologyBuilder.createTopology

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java

    public StormTopology createTopology() {
        Map<String, Bolt> boltSpecs = new HashMap<>();
        Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
        maybeAddCheckpointSpout();
        for(String boltId: _bolts.keySet()) {
            IRichBolt bolt = _bolts.get(boltId);
            bolt = maybeAddCheckpointTupleForwarder(bolt);
            ComponentCommon common = getComponentCommon(boltId, bolt);
            try{
                maybeAddCheckpointInputs(common);
                boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
            }catch(RuntimeException wrapperCause){
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){
                    throw new IllegalStateException(
                        "Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }
        for(String spoutId: _spouts.keySet()) {
            IRichSpout spout = _spouts.get(spoutId);
            ComponentCommon common = getComponentCommon(spoutId, spout);
            try{
                spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
            }catch(RuntimeException wrapperCause){
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){
                    throw new IllegalStateException(
                        "Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }

        StormTopology stormTopology = new StormTopology(spoutSpecs,
                boltSpecs,
                new HashMap<String, StateSpoutSpec>());

        stormTopology.set_worker_hooks(_workerHooks);

        return Utils.addVersions(stormTopology);
    }

    /**
     * If the topology has at least one stateful bolt
     * add a {@link CheckpointSpout} component to the topology.
     */
    private void maybeAddCheckpointSpout() {
        if (hasStatefulBolt) {
            setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1);
        }
    }
  • When createTopology, if there is bolt of stateful, CheckpointSpout will be added. At the same time, for each bolt, if it is statefulBolt and is not StatefulBoltExecutor, CheckpointTupleForwarder will be added.
  • After a series of settings in the buildTopology, by the time we arrive at the createTopology, we already have 3 bolt, one is the TridentBoltExecutor that wraps ProcessNode, one is the TridentSpoutCoordinator, and the other is the TridentBoltExecutor that wraps the original spout.
  • There is only one SPORT here, which is MasterBatchCoordinator. When buildTopology, SPORT that is not IRichSpout will be converted into the bolt of TridentSpoutCoordinator.

Topological structure

图片描述

  • Taking the previous example as an example, after the createTopology of TridentTopologyBuilder, the final topology is a topology with spout as MasterBatchCoordinator ($mastercoord-bg0), the three bolt are respectively the TridentSpoutCoordinator ($spoutcoord-spout-spout1). TridentBoltExecutor of spout with Non-IRichSpout Wrapped (spout-spout1). TridentBoltExecutor with ProcessorNode wrapped (b-0); A total of several streams are involved, namely MasterBatchCoordinator. Success _ stream _ ID ($success)、MasterBatchCoordinator.COMMIT_STREAM_ID($commit)、MasterBatchCoordinator.BATCH_STREAM_ID($batch)、TridentBoltExecutor.COORD_STREAM($coord-bg0)、s1、s2
  • $mastercoord-bg0It declare$success$commit$batchThese three stream, outputFields are all tx fields.
  • $spoutcoord-spout-spout1It received$mastercoord-bg0The$success$batchThese two stream are declare at the same time.$batchThis stream, outputFields is [tx,metadata]
  • spout-spout1Which allGrouping receives$mastercoord-bg0The$success, and$spoutcoord-spout-spout1The$batchData for these two stream; At the same time will go to$coord-bg0Send [id,count] data and stream (s1) send data tuple
  • b-0It receivedspout-spout1The$coord-bg0And s1, and then to stream (s2Send data (output_fields:[$batchId, user, score]), but also to stream ($coord-bg0) send [id, count] data

Summary

  • When TridentTopologyBuilder buildTopology, spout that is not IRichSpout will be converted into the Bolts of TridentBoltExecutor, and a Bolts of TridentSpoutCoordinator will be added at the same time. ProcessorNode will be wrapped as the bolt; of TridentBoltExecutor; TridentTopology wraps the user-set sport as bolt for convenience of management, and then creates the MasterBatchCoordinator as the real sport.
  • TridentBoltExecutor.COORD_STREAM($coord-) this stream is used to transfer [id, count] data between components to ensure that the tuple can be completely transmitted in each component, i.e. spout and bolt will send [id, count] data to the stream.
  • MasterBatchCoordinator, TridentSpoutCoordinator, TridentBoltExecutor (spout-spout1The relationship between them is as follows: master will givespout-spout1Sending suceess data (Tuple\ instruction) to the coordinator (Tuple\ instruction); The coordinator will givespout-spout1Send batch data (Tuple\ instruction)

doc