Order
This article mainly studies the streaming and merging of storm.
Example
@Test
public void testStreamSplitJoin() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentence-spout", new SentenceSpout());
// SentenceSpout --> SplitStreamBolt
builder.setBolt("split-bolt", new SplitStreamBolt())
.shuffleGrouping("sentence-spout");
// SplitStreamBolt split two stream --> WordCountBolt
//NOTE 这里要指定上游的bolt以及要处理的streamId
builder.setBolt("long-word-count-bolt", new CountStreamBolt(),5)
.shuffleGrouping("split-bolt","longWordStream");
builder.setBolt("short-word-count-bolt", new CountStreamBolt(),5)
.shuffleGrouping("split-bolt","shortWordStream");
// WordCountBolt join --> ReportBolt
builder.setBolt("report-bolt", new ReportBolt())
.shuffleGrouping("long-word-count-bolt")
.shuffleGrouping("short-word-count-bolt");
submitRemote(builder);
}
- Here, the stream is divided into two in SplitStreamBolt, and then two CountStreamBolt respectively process the data of the two streams, and finally return to the same stream. ReportBolt consumes the tuple.
SplitStreamBolt
public class SplitStreamBolt extends BaseRichBolt {
private static final Logger LOGGER = LoggerFactory.getLogger(SplitStreamBolt.class);
private OutputCollector collector;
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
//NOTE 这里要自己ack
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words){
// NOTE 这里指定发送给指定streamId
if(word.length() > 4){
this.collector.emit("longWordStream",new Values(word));
}else{
this.collector.emit("shortWordStream",new Values(word));
}
}
this.collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
//NOTE 这里通过declareStream声明direct stream,并指定streamId
declarer.declareStream("longWordStream",true,new Fields("word"));
declarer.declareStream("shortWordStream",true,new Fields("word"));
}
}
- Two additional stream are declared here, one is longWordStream and the other is shortWordStream.
- For word with length greater than 4 sent to longWordStream, those with length less than or equal to 4 sent to longWordStream
CountStreamBolt
public class CountStreamBolt extends BaseBasicBolt{
private static final Logger LOGGER = LoggerFactory.getLogger(CountStreamBolt.class);
Map<String, Integer> longWordCounts = new HashMap<String, Integer>();
Map<String, Integer> shortWordCounts = new HashMap<String, Integer>();
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String sourceStreamId = input.getSourceStreamId();
String word = input.getString(0);
if(sourceStreamId.equals("longWordStream")){
Integer count = longWordCounts.get(word);
if (count == null) count = 0;
count++;
longWordCounts.put(word, count);
LOGGER.info("long word:{} -> {}",word,count);
collector.emit(new Values(word, count));
return ;
}
if(sourceStreamId.equals("shortWordStream")){
Integer count = shortWordCounts.get(word);
if (count == null) count = 0;
count++;
shortWordCounts.put(word, count);
LOGGER.info("short word:{} -> {}",word,count);
collector.emit(new Values(word, count));
return ;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
- In order to show the difference between sourceStreamId, two stream share the same bolt, but topology has two instances there.
- Actually, it can also be two different bolt classes to process the data of two stream.
Summary
- OutputFieldsDeclarer can declare multiple streamId through the declareStream method
- OutputCollector can selectively send tuples to the specified streamId through the emit (stringstream id, list < object > tuple) method.
- OutputCollector also has emit method parameters without streamId, and its internal default is to use Utils.DEFAULT_STREAM_ID (
default
) as the actual streamId