[case43] Talk about storm’s LinearDRPCTopologyBuilder

  storm

Order

This article mainly studies storm’s LinearDRPCTopologyBuilder

Example

manual drpc

    @Test
    public void testManualDRPC() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        DRPCSpout spout = new DRPCSpout("exclamation"); //Fields("args", "return-info")
        //spout为DRPCSpout,组件id为drpc
        builder.setSpout("drpc", spout);
        builder.setBolt("exclaim", new ManualExclaimBolt(), 3).shuffleGrouping("drpc"); //Fields("result", "return-info")
        builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
        SubmitHelper.submitRemote("manualDrpc",builder.createTopology());
    }
  • This shows the construction of the original drpc topology, starting with DRPCSpout and ending with ReturnResults.
  • The outputFields of DRPCSpout are Fields(“args”, “return-info “), and the fields received by ReturnResults are Fields(“result”, “return-info”)
  • Here, it is required that the outputFields of the customized ManualExclaimBolt are Fields (“result”, “return-info”), where return-info can be obtained from input and result will process the result.

Using LinearDRPCTopologyBuilder

    @Test
    public void testBasicDRPCTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
        builder.addBolt(new ExclaimBolt(), 3);
        SubmitHelper.submitRemote("basicDrpc",builder.createRemoteTopology());
    }
  • LinearDRPCTopologyBuilder automatically helped you build DRPCSpout, PrepareRequest, CoordinatedBolt, JoinResult, ReturnResults, which are extremely concise in use.
  • Due to the difference between the upstream and downstream of the constructed component, the user-defined bolt requires fields (“requests”, “args”) as the input field and new Fields(“id”, “result “) as the output field, where the former’s request is the requestId, i.e. the latter’s id, which is long; Args is the input parameter and result is the output result.

LinearDRPCTopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java

public class LinearDRPCTopologyBuilder {
    String function;
    List<Component> components = new ArrayList<>();


    public LinearDRPCTopologyBuilder(String function) {
        this.function = function;
    }

    private static String boltId(int index) {
        return "bolt" + index;
    }

    public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) {
        return addBolt(new BatchBoltExecutor(bolt), parallelism);
    }

    public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) {
        return addBolt(bolt, 1);
    }

    @Deprecated
    public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) {
        if (parallelism == null) {
            parallelism = 1;
        }
        Component component = new Component(bolt, parallelism.intValue());
        components.add(component);
        return new InputDeclarerImpl(component);
    }

    @Deprecated
    public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) {
        return addBolt(bolt, null);
    }

    public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) {
        return addBolt(new BasicBoltExecutor(bolt), parallelism);
    }

    public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) {
        return addBolt(bolt, null);
    }

    public StormTopology createLocalTopology(ILocalDRPC drpc) {
        return createTopology(new DRPCSpout(function, drpc));
    }

    public StormTopology createRemoteTopology() {
        return createTopology(new DRPCSpout(function));
    }

    private StormTopology createTopology(DRPCSpout spout) {
        final String SPOUT_ID = "spout";
        final String PREPARE_ID = "prepare-request";

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SPOUT_ID, spout);
        builder.setBolt(PREPARE_ID, new PrepareRequest())
               .noneGrouping(SPOUT_ID);
        int i = 0;
        for (; i < components.size(); i++) {
            Component component = components.get(i);

            Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();
            if (i == 1) {
                source.put(boltId(i - 1), SourceArgs.single());
            } else if (i >= 2) {
                source.put(boltId(i - 1), SourceArgs.all());
            }
            IdStreamSpec idSpec = null;
            if (i == components.size() - 1 && component.bolt instanceof FinishedCallback) {
                idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);
            }
            BoltDeclarer declarer = builder.setBolt(
                boltId(i),
                new CoordinatedBolt(component.bolt, source, idSpec),
                component.parallelism);

            for (SharedMemory request : component.sharedMemory) {
                declarer.addSharedMemory(request);
            }

            if (!component.componentConf.isEmpty()) {
                declarer.addConfigurations(component.componentConf);
            }

            if (idSpec != null) {
                declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request"));
            }
            if (i == 0 && component.declarations.isEmpty()) {
                declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
            } else {
                String prevId;
                if (i == 0) {
                    prevId = PREPARE_ID;
                } else {
                    prevId = boltId(i - 1);
                }
                for (InputDeclaration declaration : component.declarations) {
                    declaration.declare(prevId, declarer);
                }
            }
            if (i > 0) {
                declarer.directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID);
            }
        }

        IRichBolt lastBolt = components.get(components.size() - 1).bolt;
        OutputFieldsGetter getter = new OutputFieldsGetter();
        lastBolt.declareOutputFields(getter);
        Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
        if (streams.size() != 1) {
            throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
        }
        String outputStream = streams.keySet().iterator().next();
        List<String> fields = streams.get(outputStream).get_output_fields();
        if (fields.size() != 2) {
            throw new RuntimeException(
                "Output stream of last component in LinearDRPCTopology must contain exactly two fields. "
                + "The first should be the request id, and the second should be the result.");
        }

        builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))
               .fieldsGrouping(boltId(i - 1), outputStream, new Fields(fields.get(0)))
               .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));
        i++;
        builder.setBolt(boltId(i), new ReturnResults())
               .noneGrouping(boltId(i - 1));
        return builder.createTopology();
    }

    //......
}
  • As can be seen from createTopology, the constructed spout is DRPCSpout (spout), followed by a PrepareRequest (prepare-request)
  • Then, according to the bolt set by the user, the CoordinatedBolt is packaged and constructed. if there are multiple bolts, direct grouping (bolt id (I-1), constants. coordinated _ stream _ id) will be set for the second and subsequent bolts, and Fields(“id”, “count “) will be launched with emitDirect
  • After the user-set bolt is built, JoinResult are built, and ReturnResults are the last.

DRPCSpout

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java

public class DRPCSpout extends BaseRichSpout {
    public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
    static final long serialVersionUID = 2387848310969237877L;
    final String _function;
    final String _local_drpc_id;
    SpoutOutputCollector _collector;
    List<DRPCInvocationsClient> _clients = new ArrayList<>();
    transient LinkedList<Future<Void>> _futures = null;
    transient ExecutorService _backround = null;

    public DRPCSpout(String function) {
        _function = function;
        if (DRPCClient.isLocalOverride()) {
            _local_drpc_id = DRPCClient.getOverrideServiceId();
        } else {
            _local_drpc_id = null;
        }
    }

    //......

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        if (_local_drpc_id == null) {
            _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
                                                        60L, TimeUnit.SECONDS,
                                                        new SynchronousQueue<Runnable>());
            _futures = new LinkedList<>();

            int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
            int index = context.getThisTaskIndex();

            int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
            List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
            if (servers == null || servers.isEmpty()) {
                throw new RuntimeException("No DRPC servers configured for topology");
            }

            if (numTasks < servers.size()) {
                for (String s : servers) {
                    _futures.add(_backround.submit(new Adder(s, port, conf)));
                }
            } else {
                int i = index % servers.size();
                _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
            }
        }

    }

    @Override
    public void close() {
        for (DRPCInvocationsClient client : _clients) {
            client.close();
        }
    }

    @Override
    public void nextTuple() {
        if (_local_drpc_id == null) {
            int size = 0;
            synchronized (_clients) {
                size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
            }
            for (int i = 0; i < size; i++) {
                DRPCInvocationsClient client;
                synchronized (_clients) {
                    client = _clients.get(i);
                }
                if (!client.isConnected()) {
                    LOG.warn("DRPCInvocationsClient [{}:{}] is not connected.", client.getHost(), client.getPort());
                    reconnectAsync(client);
                    continue;
                }
                try {
                    DRPCRequest req = client.fetchRequest(_function);
                    if (req.get_request_id().length() > 0) {
                        Map<String, Object> returnInfo = new HashMap<>();
                        returnInfo.put("id", req.get_request_id());
                        returnInfo.put("host", client.getHost());
                        returnInfo.put("port", client.getPort());
                        _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)),
                                        new DRPCMessageId(req.get_request_id(), i));
                        break;
                    }
                } catch (AuthorizationException aze) {
                    reconnectAsync(client);
                    LOG.error("Not authorized to fetch DRPC request from DRPC server", aze);
                } catch (TException e) {
                    reconnectAsync(client);
                    LOG.error("Failed to fetch DRPC request from DRPC server", e);
                } catch (Exception e) {
                    LOG.error("Failed to fetch DRPC request from DRPC server", e);
                }
            }
            checkFutures();
        } else {
            //......
        }
    }

    @Override
    public void ack(Object msgId) {
    }

    @Override
    public void fail(Object msgId) {
        DRPCMessageId did = (DRPCMessageId) msgId;
        DistributedRPCInvocations.Iface client;

        if (_local_drpc_id == null) {
            client = _clients.get(did.index);
        } else {
            client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
        }

        int retryCnt = 0;
        int maxRetries = 3;

        while (retryCnt < maxRetries) {
            retryCnt++;
            try {
                client.failRequest(did.id);
                break;
            } catch (AuthorizationException aze) {
                LOG.error("Not authorized to failRequest from DRPC server", aze);
                throw new RuntimeException(aze);
            } catch (TException tex) {
                if (retryCnt >= maxRetries) {
                    LOG.error("Failed to fail request", tex);
                    break;
                }
                reconnectSync((DRPCInvocationsClient) client);
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("args", "return-info"));
    }
    //......
}
  • Prepare DRPCInvocationsClient when open
  • The nextTuple method obtains DRPCRequest information through drpcinvocationsclean.fetchrequest (_ function)
  • After that, returnInfo is built and then emit data, msgId is DRPCMessageId, tuple is values (req.get _ func _ args (), jsonvalue.tojsonstring (returninfo))
  • The fail method is rewritten here. If the request fails, try again. By default, try again 3 times.

PrepareRequest

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java

public class PrepareRequest extends BaseBasicBolt {
    public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID;
    public static final String RETURN_STREAM = "ret";
    public static final String ID_STREAM = "id";

    Random rand;

    @Override
    public void prepare(Map<String, Object> map, TopologyContext context) {
        rand = new Random();
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String args = tuple.getString(0);
        String returnInfo = tuple.getString(1);
        long requestId = rand.nextLong();
        collector.emit(ARGS_STREAM, new Values(requestId, args));
        collector.emit(RETURN_STREAM, new Values(requestId, returnInfo));
        collector.emit(ID_STREAM, new Values(requestId));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(ARGS_STREAM, new Fields("request", "args"));
        declarer.declareStream(RETURN_STREAM, new Fields("request", "return"));
        declarer.declareStream(ID_STREAM, new Fields("request"));
    }
}
  • PrepareRequest takes args and returnInfo, constructs requestId, and then emit to ARGS_STREAM, RETURN_STREAM, ID_STREAM
  • JoinResult receives RETURN_STREAM of PrepareRequest, and the first CoordinatedBolt receives ARGS_STREAM.

CoordinatedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java

/**
 * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused in the case of retries.
 */
public class CoordinatedBolt implements IRichBolt {
    
    private TimeCacheMap<Object, TrackingInfo> _tracked;

    //......

    public void execute(Tuple tuple) {
        Object id = tuple.getValue(0);
        TrackingInfo track;
        TupleType type = getTupleType(tuple);
        synchronized (_tracked) {
            track = _tracked.get(id);
            if (track == null) {
                track = new TrackingInfo();
                if (_idStreamSpec == null) {
                    track.receivedId = true;
                }
                _tracked.put(id, track);
            }
        }

        if (type == TupleType.ID) {
            synchronized (_tracked) {
                track.receivedId = true;
            }
            checkFinishId(tuple, type);
        } else if (type == TupleType.COORD) {
            int count = (Integer) tuple.getValue(1);
            synchronized (_tracked) {
                track.reportCount++;
                track.expectedTupleCount += count;
            }
            checkFinishId(tuple, type);
        } else {
            synchronized (_tracked) {
                _delegate.execute(tuple);
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        _delegate.declareOutputFields(declarer);
        declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count"));
    }

    //......

    public static class TrackingInfo {
        int reportCount = 0;
        int expectedTupleCount = 0;
        int receivedTuples = 0;
        boolean failed = false;
        Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
        boolean receivedId = false;
        boolean finished = false;
        List<Tuple> ackTuples = new ArrayList<>();

        @Override
        public String toString() {
            return "reportCount: " + reportCount + "\n" +
                   "expectedTupleCount: " + expectedTupleCount + "\n" +
                   "receivedTuples: " + receivedTuples + "\n" +
                   "failed: " + failed + "\n" +
                   taskEmittedTuples.toString();
        }
    }
}
  • When CoordinatedBolt called declareOutputFields, he not only called declareoutfields of proxy bolt, but also declareStream, sending Fields(“id”, “count “) to constants.coordinated _ stream _ id.
  • The execute method first ensures that each requestId has a TrackingInfo, which records the expectedTupleCount and receivedTuples statistics, as well as taskEmittedTuples (The nomenclature here is somewhat ambiguous, in fact, what is maintained here is the number of tuples of the task that Bolts are currently sending to the downstream Bolts, which is used by emitDirect to inform the downstream Bolts of the number of tuples it should receive (Specifically, in the checkFinishId method, it is sent when finished.), the downstream bolt update expectedTupleCount after receiving the statistics)
  • There are several types of tuple received by the execute method, one is TupleType.ID (_idStreamSpec is not nullCoord (Receive Fields("id", "count ") and execute checkFinishId to determine whether it should endRegular (Execute method to execute bolt normally)
  • CheckFinishId will judge track.reportcount = = _ numsourcereports and track.expectedtuplecount = = track.receivedtuple, mark track.finished = true if the condition is met, and notify downstream bolt of how many tuples it should receive (If there is one)。

JoinResult

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java

public class JoinResult extends BaseRichBolt {
    public static final Logger LOG = LoggerFactory.getLogger(JoinResult.class);

    String returnComponent;
    Map<Object, Tuple> returns = new HashMap<>();
    Map<Object, Tuple> results = new HashMap<>();
    OutputCollector _collector;

    public JoinResult(String returnComponent) {
        this.returnComponent = returnComponent;
    }

    public void prepare(Map<String, Object> map, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        Object requestId = tuple.getValue(0);
        if (tuple.getSourceComponent().equals(returnComponent)) {
            returns.put(requestId, tuple);
        } else {
            results.put(requestId, tuple);
        }

        if (returns.containsKey(requestId) && results.containsKey(requestId)) {
            Tuple result = results.remove(requestId);
            Tuple returner = returns.remove(requestId);
            LOG.debug(result.getValue(1).toString());
            List<Tuple> anchors = new ArrayList<>();
            anchors.add(result);
            anchors.add(returner);
            _collector.emit(anchors, new Values("" + result.getValue(1), returner.getValue(1)));
            _collector.ack(result);
            _collector.ack(returner);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("result", "return-info"));
    }
}
  • If the tuple is sent from the PrepareRequest, the tuple is placed in retriers, otherwise, the returns are placed.
  • After that, it is judged whether both maps of returns and results have the requestId at the same time, and if there is a result indicating a match, the emit data is sent downstream.
  • Emit’s first field is result and the second is returnInfo

ReturnResults

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java

public class ReturnResults extends BaseRichBolt {
    public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
    static final long serialVersionUID = -774882142710631591L;
    OutputCollector _collector;
    boolean local;
    Map<String, Object> _conf;
    Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        _conf = topoConf;
        _collector = collector;
        local = topoConf.get(Config.STORM_CLUSTER_MODE).equals("local");
    }

    @Override
    public void execute(Tuple input) {
        String result = (String) input.getValue(0);
        String returnInfo = (String) input.getValue(1);
        if (returnInfo != null) {
            Map<String, Object> retMap;
            try {
                retMap = (Map<String, Object>) JSONValue.parseWithException(returnInfo);
            } catch (ParseException e) {
                LOG.error("Parseing returnInfo failed", e);
                _collector.fail(input);
                return;
            }
            final String host = (String) retMap.get("host");
            final int port = ObjectReader.getInt(retMap.get("port"));
            String id = (String) retMap.get("id");
            DistributedRPCInvocations.Iface client;
            if (local) {
                client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
            } else {
                List server = new ArrayList() {{
                    add(host);
                    add(port);
                }};

                if (!_clients.containsKey(server)) {
                    try {
                        _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
                    } catch (TTransportException ex) {
                        throw new RuntimeException(ex);
                    }
                }
                client = _clients.get(server);
            }


            int retryCnt = 0;
            int maxRetries = 3;
            while (retryCnt < maxRetries) {
                retryCnt++;
                try {
                    client.result(id, result);
                    _collector.ack(input);
                    break;
                } catch (AuthorizationException aze) {
                    LOG.error("Not authorized to return results to DRPC server", aze);
                    _collector.fail(input);
                    throw new RuntimeException(aze);
                } catch (TException tex) {
                    if (retryCnt >= maxRetries) {
                        LOG.error("Failed to return results to DRPC server", tex);
                        _collector.fail(input);
                    }
                    reconnectClient((DRPCInvocationsClient) client);
                }
            }
        }
    }

    private void reconnectClient(DRPCInvocationsClient client) {
        if (client instanceof DRPCInvocationsClient) {
            try {
                LOG.info("reconnecting... ");
                client.reconnectClient(); //Blocking call
            } catch (TException e2) {
                LOG.error("Failed to connect to DRPC server", e2);
            }
        }
    }

    @Override
    public void cleanup() {
        for (DRPCInvocationsClient c : _clients.values()) {
            c.close();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}
  • ReturnResults mainly send the results to the requested DRPCInvocationsClient.
  • ReturnInfo contains the destination host and port to which the result is to be sent, and DRPCInvocationsClient is constructed according to the host and port.
  • After that, the DRPCInvocationSCLIENT. RESULT (ID, RESULT) method is called to return the result, which is retried 3 times by default. If it is AuthorizationException, it will fail directly, and if it is successful, it will ack

Summary

  • LinearDRPCTopologyBuilder was marked @Deprecated (January 2012), at that time, think that Trident’s newDRPCStream replacement, but in this way to use drpc have to use Trident, so later (April 2018) Remove the mark, and it is no longer an obsolete mark in versions 2.0.0, 1.1.3, 1.0.7 and 1.2.2
  • LinearDRPCTopologyBuilder packaging combines DRPCSpout, PrepareRequest, CoordinatedBolt, JoinResult, ReturnResults, exposing simple api to the outside world without the need for users to construct these component.

    • DRPCSpout mainly constructs args and returnInfo information;
    • PrepareRequest shunts data to ARGS_STREAM, RETURN_STREAM, ID_STREAM; ;
    • CoordinatedBolt mainly ensure that the tuple between these Bolts are fully transmitted and ack; ;
    • JoinResult mainly matches the requestId and the result, matches the request with the data of the response, and then sends it to the downstream.
    • ReturnResults return data to the Client side according to returnInfo.
  • Using LinearDRPCTopologyBuilder, for the first bolt, its input is fields (“request”, “args”); The output field required for the last bolt is newfields (“id”, “result”); For non-last bolt, the first field of the output field is required to be id, i.e. requestId, which is convenient for CoordinatedBolt to make tracking statistics and confirm whether bolt successfully receives all tuple sent by upstream bolt.

doc