Talk about storm’s direct grouping

  storm

Order

This article mainly studies storm’s direct grouping

direct grouping

Direct grouping is a special grouping. It is the upstream producer that directly specifies which task downstream receives the tuple it sends. The use of direct grouping includes the following steps:

1. upstream saves taskId list of downstream bolt in the prepare method

public class SentenceDirectBolt extends BaseRichBolt {

    private static final Logger LOGGER = LoggerFactory.getLogger(SentenceDirectBolt.class);

    private OutputCollector collector;

    private List<Integer> taskIds;

    private int numCounterTasks;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        //NOTE 1 这里要取到下游的bolt的taskId,用于emitDirect时指定taskId
        this.taskIds = context.getComponentTasks("count-bolt");
        this.numCounterTasks = taskIds.size();
    }
    //......
}

Here, the Taskidd list of bolt downstream is saved, which is used to select Taskidd when emitDirect is used.

2. upstream, declareOutputFields use declareStream to declare streamId

public class SentenceDirectBolt extends BaseRichBolt {
    //......
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
        //NOTE 2 这里要通过declareStream声明direct stream,并指定streamId
        declarer.declareStream("directStreamDemo1",true,new Fields("word"));
        declarer.declareStream("directStreamDemo2",true,new Fields("word"));
    }
}

Two streamId are declared here, one is directStreamDemo1 and the other is directStreamDemo2.

3. upstream uses emitDirect to specify downstream taskId and streamId.

public class SentenceDirectBolt extends BaseRichBolt {
    //......
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            int targetTaskId = getWordCountTaskId(word);
            LOGGER.info("word:{} choose taskId:{}",word,targetTaskId);
            // NOTE 3 这里指定发送给下游bolt的哪个taskId,同时指定streamId
            if(targetTaskId % 2 == 0){
                this.collector.emitDirect(targetTaskId,"directStreamDemo1",new Values(word));
            }else{
                this.collector.emitDirect(targetTaskId,"directStreamDemo2",new Values(word));
            }
        }
        this.collector.ack(tuple);
    }
}

The emitdirect (inttaskId, stringstream id, list < object > tuple) method is used here to specify the downstream taskid and streamId to send to.

4. downstream uses directGrouping to connect upstream bolt and streamId.

    @Test
    public void testDirectGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("sentence-spout", new SentenceSpout());
        // SentenceSpout --> SplitSentenceBolt
        builder.setBolt("split-bolt", new SentenceDirectBolt()).shuffleGrouping("sentence-spout");
        // SplitSentenceBolt --> WordCountBolt
        //NOTE 4这里要指定上游的bolt以及要处理的streamId
        builder.setBolt("count-bolt", new WordCountBolt(),5).directGrouping("split-bolt","directStreamDemo1");
        // WordCountBolt --> ReportBolt
        builder.setBolt("report-bolt", new ReportBolt()).globalGrouping("count-bolt");

        submitRemote(builder);
    }

Here, count-bolt is used as the downstream of split-bolt, directGrouping is used, and the streamId to be received is specified as directStreamDemo1.

Summary

  • Direct grouping is a special grouping. It is the upstream producer that directly specifies which task downstream receives the tuple it sends.
  • The downstream uses directGrouping to connect the upstream and specify the streamId to be consumed, the upstream saves the downstream taskId list when prepare, and then uses declareStream to declare the streamId when declaring declareOutputFields. Finally, the emitdirect (inttaskId, stringstream id, list < object > tuple) method is used in the execute method to specify the downstream taskid and streamId to be sent to.

doc