Talk about the split and merger of storm’s stream

  storm

Order

This article mainly studies the streaming and merging of storm.

Example

    @Test
    public void testStreamSplitJoin() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("sentence-spout", new SentenceSpout());
        // SentenceSpout --> SplitStreamBolt
        builder.setBolt("split-bolt", new SplitStreamBolt())
                .shuffleGrouping("sentence-spout");
        // SplitStreamBolt split two stream --> WordCountBolt
        //NOTE 这里要指定上游的bolt以及要处理的streamId
        builder.setBolt("long-word-count-bolt", new CountStreamBolt(),5)
                .shuffleGrouping("split-bolt","longWordStream");
        builder.setBolt("short-word-count-bolt", new CountStreamBolt(),5)
                .shuffleGrouping("split-bolt","shortWordStream");
        // WordCountBolt join --> ReportBolt
        builder.setBolt("report-bolt", new ReportBolt())
                .shuffleGrouping("long-word-count-bolt")
                .shuffleGrouping("short-word-count-bolt");

        submitRemote(builder);
    }
  • Here, the stream is divided into two in SplitStreamBolt, and then two CountStreamBolt respectively process the data of the two streams, and finally return to the same stream. ReportBolt consumes the tuple.

SplitStreamBolt

public class SplitStreamBolt extends BaseRichBolt {

    private static final Logger LOGGER = LoggerFactory.getLogger(SplitStreamBolt.class);

    private OutputCollector collector;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    //NOTE 这里要自己ack
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            // NOTE 这里指定发送给指定streamId
            if(word.length() > 4){
                this.collector.emit("longWordStream",new Values(word));
            }else{
                this.collector.emit("shortWordStream",new Values(word));
            }
        }
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
        //NOTE 这里通过declareStream声明direct stream,并指定streamId
        declarer.declareStream("longWordStream",true,new Fields("word"));
        declarer.declareStream("shortWordStream",true,new Fields("word"));
    }
}
  • Two additional stream are declared here, one is longWordStream and the other is shortWordStream.
  • For word with length greater than 4 sent to longWordStream, those with length less than or equal to 4 sent to longWordStream

CountStreamBolt

public class CountStreamBolt extends BaseBasicBolt{

    private static final Logger LOGGER = LoggerFactory.getLogger(CountStreamBolt.class);

    Map<String, Integer> longWordCounts = new HashMap<String, Integer>();
    Map<String, Integer> shortWordCounts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sourceStreamId = input.getSourceStreamId();

        String word = input.getString(0);

        if(sourceStreamId.equals("longWordStream")){
            Integer count = longWordCounts.get(word);
            if (count == null) count = 0;
            count++;
            longWordCounts.put(word, count);
            LOGGER.info("long word:{} -> {}",word,count);
            collector.emit(new Values(word, count));
            return ;
        }

        if(sourceStreamId.equals("shortWordStream")){
            Integer count = shortWordCounts.get(word);
            if (count == null) count = 0;
            count++;
            shortWordCounts.put(word, count);
            LOGGER.info("short word:{} -> {}",word,count);
            collector.emit(new Values(word, count));
            return ;
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
  • In order to show the difference between sourceStreamId, two stream share the same bolt, but topology has two instances there.
  • Actually, it can also be two different bolt classes to process the data of two stream.

Summary

  • OutputFieldsDeclarer can declare multiple streamId through the declareStream method
  • OutputCollector can selectively send tuples to the specified streamId through the emit (stringstream id, list < object > tuple) method.
  • OutputCollector also has emit method parameters without streamId, and its internal default is to use Utils.DEFAULT_STREAM_ID (default) as the actual streamId

doc