Talk about storm’s JoinBolt

  storm

Order

This article mainly studies storm’s JoinBolt.

Example

    @Test
    public void testJoinBolt() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("uuid-spout", new RandomWordSpout(new String[]{"uuid", "timestamp"}), 1);
        builder.setSpout("word-spout", new RandomWordSpout(new String[]{"word", "timestamp"}), 1);

        JoinBolt joinBolt = new JoinBolt("uuid-spout", "timestamp")
                //from priorStream inner join newStream on newStream.field = priorStream.field1
                .join("word-spout", "timestamp", "uuid-spout")
                .select("uuid,word,timestamp")
                .withTumblingWindow(BaseWindowedBolt.Count.of(10));
        builder.setBolt("join", joinBolt,1)
                .fieldsGrouping("uuid-spout",new Fields("timestamp"))
                .fieldsGrouping("word-spout",new Fields("timestamp"));

        builder.setBolt("fileWriter",new FilePrinterBolt(),1).globalGrouping("join");
        SubmitHelper.submitRemote("windowTopology",builder.createTopology());
    }

JoinBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

public class JoinBolt extends BaseWindowedBolt {

    protected final Selector selectorType;
    // Map[StreamName -> JoinInfo]
    protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
    protected FieldSelector[] outputFields;  // specified via bolt.select() ... used in declaring Output fields
    //    protected String[] dotSeparatedOutputFieldNames; // fieldNames in x.y.z format w/o stream name, used for naming output fields
    protected String outputStreamName;
    // Map[StreamName -> Map[Key -> List<Tuple>]  ]
    HashMap<String, HashMap<Object, ArrayList<Tuple>>> hashedInputs = new HashMap<>(); // holds remaining streams
    private OutputCollector collector;

    /**
     * Calls  JoinBolt(Selector.SOURCE, sourceId, fieldName)
     *
     * @param sourceId  Id of source component (spout/bolt) from which this bolt is receiving data
     * @param fieldName the field to use for joining the stream (x.y.z format)
     */
    public JoinBolt(String sourceId, String fieldName) {
        this(Selector.SOURCE, sourceId, fieldName);
    }


    /**
     * Introduces the first stream to start the join with. Equivalent SQL ... select .... from srcOrStreamId ...
     *
     * @param type          Specifies whether 'srcOrStreamId' refers to stream name/source component
     * @param srcOrStreamId name of stream OR source component
     * @param fieldName     the field to use for joining the stream (x.y.z format)
     */
    public JoinBolt(Selector type, String srcOrStreamId, String fieldName) {
        selectorType = type;

        joinCriteria.put(srcOrStreamId, new JoinInfo(new FieldSelector(srcOrStreamId, fieldName)));
    }

    /**
     * Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on 'default' stream.
     */
    public JoinBolt withOutputStream(String streamName) {
        this.outputStreamName = streamName;
        return this;
    }

    /**
     * Performs inner Join with the newStream. SQL    :   from priorStream inner join newStream on newStream.field = priorStream.field1 same
     * as:   new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream);
     *
     * Note: priorStream must be previously joined. Valid ex:    new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2); Invalid ex:
     * new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);
     *
     * @param newStream Either stream name or name of upstream component
     * @param field     the field on which to perform the join
     */
    public JoinBolt join(String newStream, String field, String priorStream) {
        return joinCommon(newStream, field, priorStream, JoinType.INNER);
    }

    /**
     * Performs left Join with the newStream. SQL    :   from stream1  left join stream2  on stream2.field = stream1.field1 same as:   new
     * WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1);
     *
     * Note: priorStream must be previously joined Valid ex:    new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2);
     * Invalid ex:  new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
     *
     * @param newStream Either a name of a stream or an upstream component
     * @param field     the field on which to perform the join
     */
    public JoinBolt leftJoin(String newStream, String field, String priorStream) {
        return joinCommon(newStream, field, priorStream, JoinType.LEFT);
    }

    private JoinBolt joinCommon(String newStream, String fieldDescriptor, String priorStream, JoinType joinType) {
        if (hashedInputs.containsKey(newStream)) {
            throw new IllegalArgumentException("'" + newStream + "' is already part of join. Cannot join with it more than once.");
        }
        hashedInputs.put(newStream, new HashMap<Object, ArrayList<Tuple>>());
        JoinInfo joinInfo = joinCriteria.get(priorStream);
        if (joinInfo == null) {
            throw new IllegalArgumentException("Stream '" + priorStream + "' was not previously declared");
        }

        FieldSelector field = new FieldSelector(newStream, fieldDescriptor);
        joinCriteria.put(newStream, new JoinInfo(field, priorStream, joinInfo, joinType));
        return this;
    }

    /**
     * Specify projection fields. i.e. Specifies the fields to include in the output. e.g: .select("field1, stream2:field2, field3") Nested
     * Key names are supported for nested types: e.g: .select("outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)" Inner
     * types (non leaf) must be Map<> in order to support nested lookup using this dot notation This selected fields implicitly declare the
     * output fieldNames for the bolt based.
     *
     * @param commaSeparatedKeys
     * @return
     */
    public JoinBolt select(String commaSeparatedKeys) {
        String[] fieldNames = commaSeparatedKeys.split(",");

        outputFields = new FieldSelector[fieldNames.length];
        for (int i = 0; i < fieldNames.length; i++) {
            outputFields[i] = new FieldSelector(fieldNames[i]);
        }
        return this;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        String[] outputFieldNames = new String[outputFields.length];
        for (int i = 0; i < outputFields.length; ++i) {
            outputFieldNames[i] = outputFields[i].getOutputName();
        }
        if (outputStreamName != null) {
            declarer.declareStream(outputStreamName, new Fields(outputFieldNames));
        } else {
            declarer.declare(new Fields(outputFieldNames));
        }
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        // initialize the hashedInputs data structure
        int i = 0;
        for (String stream : joinCriteria.keySet()) {
            if (i > 0) {
                hashedInputs.put(stream, new HashMap<Object, ArrayList<Tuple>>());
            }
            ++i;
        }
        if (outputFields == null) {
            throw new IllegalArgumentException("Must specify output fields via .select() method.");
        }
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        // 1) Perform Join
        List<Tuple> currentWindow = inputWindow.get();
        JoinAccumulator joinResult = hashJoin(currentWindow);

        // 2) Emit results
        for (ResultRecord resultRecord : joinResult.getRecords()) {
            ArrayList<Object> outputTuple = resultRecord.getOutputFields();
            if (outputStreamName == null) {
                // explicit anchoring emits to corresponding input tuples only, as default window anchoring will anchor them to all
                // tuples in window
                collector.emit(resultRecord.tupleList, outputTuple);
            } else {
                // explicitly anchor emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples
                // in window
                collector.emit(outputStreamName, resultRecord.tupleList, outputTuple);
            }
        }
    }

    //......
}
  • JoinBolt inherits BaseWindowedBolt, defines Selector selectorType, linkedhashmap < string, joininfo > joincriteria, FieldSelector[] outputFields and other attributes, and is used to record association types and relationships.
  • The join and leftJoin methods are used to set the join association. finally, the joinCommon method is called. the association uses the JoinInfo object and is stored in the joinCriteria
  • The select method is used to select the columns of the result set and finally set to outputFields for declareOutputFields.
  • Execute is the core logic of join. hashJoin is called here

JoinBolt.hashJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

    protected JoinAccumulator hashJoin(List<Tuple> tuples) {
        clearHashedInputs();

        JoinAccumulator probe = new JoinAccumulator();

        // 1) Build phase - Segregate tuples in the Window into streams.
        //    First stream's tuples go into probe, rest into HashMaps in hashedInputs
        String firstStream = joinCriteria.keySet().iterator().next();
        for (Tuple tuple : tuples) {
            String streamId = getStreamSelector(tuple);
            if (!streamId.equals(firstStream)) {
                Object field = getJoinField(streamId, tuple);
                ArrayList<Tuple> recs = hashedInputs.get(streamId).get(field);
                if (recs == null) {
                    recs = new ArrayList<Tuple>();
                    hashedInputs.get(streamId).put(field, recs);
                }
                recs.add(tuple);

            } else {
                ResultRecord probeRecord = new ResultRecord(tuple, joinCriteria.size() == 1);
                probe.insert(probeRecord);  // first stream's data goes into the probe
            }
        }

        // 2) Join the streams in order of streamJoinOrder
        int i = 0;
        for (String streamName : joinCriteria.keySet()) {
            boolean finalJoin = (i == joinCriteria.size() - 1);
            if (i > 0) {
                probe = doJoin(probe, hashedInputs.get(streamName), joinCriteria.get(streamName), finalJoin);
            }
            ++i;
        }


        return probe;
    }
  • The hashJoin method first traverses the tuples and divides the tuples into two types. the firstStream data is stored in the JoinAccumulator probe, and the rest is stored in hashmap < string, hashmap < object, arraylist < tuple > > hashed inputs
  • After that, doJoin is called for the remaining streamId one by one to integrate the results into JoinAccumulator probe.

JoinAccumulator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

    protected class JoinAccumulator {
        ArrayList<ResultRecord> records = new ArrayList<>();

        public void insert(ResultRecord tuple) {
            records.add(tuple);
        }

        public Collection<ResultRecord> getRecords() {
            return records;
        }
    }
  • JoinAccumulator is an ArrayList<ResultRecord >

ResultRecord

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

    // Join helper to concat fields to the record
    protected class ResultRecord {

        ArrayList<Tuple> tupleList = new ArrayList<>(); // contains one Tuple per Stream being joined
        ArrayList<Object> outFields = null; // refs to fields that will be part of output fields

        // 'generateOutputFields' enables us to avoid projection unless it is the final stream being joined
        public ResultRecord(Tuple tuple, boolean generateOutputFields) {
            tupleList.add(tuple);
            if (generateOutputFields) {
                outFields = doProjection(tupleList, outputFields);
            }
        }

        public ResultRecord(ResultRecord lhs, Tuple rhs, boolean generateOutputFields) {
            if (lhs != null) {
                tupleList.addAll(lhs.tupleList);
            }
            if (rhs != null) {
                tupleList.add(rhs);
            }
            if (generateOutputFields) {
                outFields = doProjection(tupleList, outputFields);
            }
        }

        public ArrayList<Object> getOutputFields() {
            return outFields;
        }


        // 'stream' cannot be null,
        public Object getField(FieldSelector fieldSelector) {
            for (Tuple tuple : tupleList) {
                Object result = lookupField(fieldSelector, tuple);
                if (result != null) {
                    return result;
                }
            }
            return null;
        }
    }

    // Performs projection on the tuples based on 'projectionFields'
    protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, FieldSelector[] projectionFields) {
        ArrayList<Object> result = new ArrayList<>(projectionFields.length);
        // Todo: optimize this computation... perhaps inner loop can be outside to avoid rescanning tuples
        for (int i = 0; i < projectionFields.length; i++) {
            boolean missingField = true;
            for (Tuple tuple : tuples) {
                Object field = lookupField(projectionFields[i], tuple);
                if (field != null) {
                    result.add(field);
                    missingField = false;
                    break;
                }
            }
            if (missingField) { // add a null for missing fields (usually in case of outer joins)
                result.add(null);
            }
        }
        return result;
    }

    // Extract the field from tuple. Field may be nested field (x.y.z)
    protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) {

        // very stream name matches, it stream name was specified
        if (fieldSelector.streamName != null &&
            !fieldSelector.streamName.equalsIgnoreCase(getStreamSelector(tuple))) {
            return null;
        }

        Object curr = null;
        for (int i = 0; i < fieldSelector.field.length; i++) {
            if (i == 0) {
                if (tuple.contains(fieldSelector.field[i])) {
                    curr = tuple.getValueByField(fieldSelector.field[i]);
                } else {
                    return null;
                }
            } else {
                curr = ((Map) curr).get(fieldSelector.field[i]);
                if (curr == null) {
                    return null;
                }
            }
        }
        return curr;
    }
  • ResultRecord is used to store data after joined.
  • When joinCriteria.size() == 1 or finalJoin is true, generateOutputFields of ResultRecord is true, and doProjection is called to project the result set.
  • True when traversing the joinCriteria calls doJoin to the last record.

JoinBolt.doJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

    // Dispatches to the right join method (inner/left/right/outer) based on the joinInfo.joinType
    protected JoinAccumulator doJoin(JoinAccumulator probe, HashMap<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
                                     boolean finalJoin) {
        final JoinType joinType = joinInfo.getJoinType();
        switch (joinType) {
            case INNER:
                return doInnerJoin(probe, buildInput, joinInfo, finalJoin);
            case LEFT:
                return doLeftJoin(probe, buildInput, joinInfo, finalJoin);
            case RIGHT:
            case OUTER:
            default:
                throw new RuntimeException("Unsupported join type : " + joinType.name());
        }
    }
  • DoJoin encapsulates various join types of methods. Currently, it only implements INNER and LEFT, calling doInnerJoin and doLeftJoin methods respectively.

doInnerJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

    // inner join - core implementation
    protected JoinAccumulator doInnerJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
                                          boolean finalJoin) {
        String[] probeKeyName = joinInfo.getOtherField();
        JoinAccumulator result = new JoinAccumulator();
        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
        for (ResultRecord rec : probe.getRecords()) {
            Object probeKey = rec.getField(fieldSelector);
            if (probeKey != null) {
                ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey);
                if (matchingBuildRecs != null) {
                    for (Tuple matchingRec : matchingBuildRecs) {
                        ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
                        result.insert(mergedRecord);
                    }
                }
            }
        }
        return result;
    }
  • Here, the records of the JoinAccumulator probe are traversed one by one, and then the corresponding records are found from buildInput through probeKey. if found, the records are merged.

doLeftJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

    // left join - core implementation
    protected JoinAccumulator doLeftJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
                                         boolean finalJoin) {
        String[] probeKeyName = joinInfo.getOtherField();
        JoinAccumulator result = new JoinAccumulator();
        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
        for (ResultRecord rec : probe.getRecords()) {
            Object probeKey = rec.getField(fieldSelector);
            if (probeKey != null) {
                ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); // ok if its return null
                if (matchingBuildRecs != null && !matchingBuildRecs.isEmpty()) {
                    for (Tuple matchingRec : matchingBuildRecs) {
                        ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
                        result.insert(mergedRecord);
                    }
                } else {
                    ResultRecord mergedRecord = new ResultRecord(rec, null, finalJoin);
                    result.insert(mergedRecord);
                }

            }
        }
        return result;
    }
  • The difference between left join and inner join is that if no matching record is found, the record on the left is still kept.

Summary

  • JoinBolt inherits BaseWindowedBolt, currently only supports inner join and left join, and requires the same fields as fieldsGrouping.
  • JoinBolt uses divide-and-conquer method to merge multiple stream data. JoinAccumulator is used to accumulate result sets continuously, and doJoin is called to complete loop traversal.
  • Since JoinBolt operates in memory and needs matching data, it needs to consume CPU and memory. There are several points to note:

    • The window’s time window should not be too large, otherwise the memory will accumulate too much data and OOM is easy. you can adjust the time window according to the situation or set woker’s memory si ze through config.topology _ worker _ max _ heap _ size _ MB.
    • Taking slding window will result in repeated join of data, so withTumblingWindow is required.
    • If opening tuple processing times out, it is required that config.topology _ message _ timeout _ secs is greater than windowLength+slidingInterval+processing time, so as to avoid misjudging as timeout and replayed before processing is finished.
    • Since windowedBolt automatically anchors the data of tupleWindow, too much anchor operation will put pressure on the entire topology, and ack (Set config.topology _ acker _ executors to 0)
    • Config. topology _ max _ sport _ pending is set a little larger, giving window’s join operation and subsequent operations sufficient time to prevent sport from sending tuple too fast and downstream bolt from consuming too much.
    • On production, Config.TOPOLOGY_DEBUG is set to false to close debug log, config.topology _ event logger _ executors is set to 0 to close eventlogger

doc