Storm drpc instance

  storm

Order

This article mainly demonstrates an example of storm drpc.

Configuration

version: '2'
services:
    supervisor:
        image: storm
        container_name: supervisor
        command: storm supervisor -c storm.local.hostname="192.168.99.100" -c drpc.servers='["192.168.99.100"]' -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - zookeeper
        links:
            - nimbus
            - zookeeper
        restart: always
        ports:
            - 6700:6700
            - 6701:6701
            - 6702:6702
            - 6703:6703
            - 8000:8000
    drpc:
        image: storm
        container_name: drpc
        command: storm drpc -c storm.local.hostname="192.168.99.100" -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - supervisor
            - zookeeper
        links:
            - nimbus
            - supervisor
            - zookeeper
        restart: always
        ports:
            - 3772:3772
            - 3773:3773
            - 3774:3774
  • Here, drpc.servers and drpc.port, drpc.invocations.port are configured for supervisor so that worker can access drpc node through DRPC.invocations.port.
  • For drpc services, drpc.port (To allow external DRPCClient access.)、drpc.invocations.port(Give worker access)

TridentTopology

    @Test
    public void testDeployDRPCStateQuery() throws InterruptedException, TException {
        TridentTopology topology = new TridentTopology();
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                new Values("the cow jumped over the moon"),
                new Values("the man went to the store and bought some candy"),
                new Values("four score and seven years ago"),
                new Values("how many apples can you eat"));
        spout.setCycle(true);
        TridentState wordCounts =
                topology.newStream("spout1", spout)
                        .each(new Fields("sentence"), new Split(), new Fields("word"))
                        .groupBy(new Fields("word"))
                        //NOTE transforms a Stream into a TridentState object
                        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
                        .parallelismHint(6);

        topology.newDRPCStream("words")
                .each(new Fields("args"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull())
                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

        StormTopology stormTopology = topology.build();

        //远程提交 mvn clean package -Dmaven.test.skip=true
        //storm默认会使用System.getProperty("storm.jar")去取,如果不设定,就不能提交
        System.setProperty("storm.jar",TOPOLOGY_JAR);

        Config conf = new Config();
        conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus连接主机地址,比如:192.168.10.1
        conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus连接端口,默认 6627
        conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper连接主机地址,可以使用集合存放多个
        conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper连接端口,默认2181

        StormSubmitter.submitTopology("DRPCStateQuery", conf, stormTopology);
    }
  • Here newStream creates a TridentState, and then newDRPCStream creates a DRPCStream, whose stateQuery is specified as the TridentState created earlier.
  • Since TridentState stores the results in MemoryMapState, DRPCStream here performs stateQuery through drpc.

DRPCClient

    @Test
    public void testLaunchDrpcClient() throws TException {
        Config conf = new Config();
        //NOTE 要设置Config.DRPC_THRIFT_TRANSPORT_PLUGIN属性,不然client直接跑空指针
        conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName());
        conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000);
        conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100M
        DRPCClient client = new DRPCClient(conf, "192.168.99.100", 3772);
        System.out.println(client.execute("words", "cat dog the man"));
    }
  • Note that there are no fewer configuration items here, otherwise null pointers will be raised.
  • Config.drpc _ throft _ transport _ plugin here uses simpleTransportplugin.class.getname (), although this class is abandoned, it can still run through
  • Since SimpleTransportPlugin.class is used, Config.DRPC_MAX_BUFFER_SIZE is configured here
  • DRPCClient is configured with the address and port of drpc.
  • Execute here to pass in the function name specified by newDRPCStream.

Summary

  • When drpc is used, drpc server service node needs to be started through storm drpc, and two additional ports need to be exposed, one drpc.port is for external DRPCClient call and one drpc.invocations.port is for worker to access. The drpc.http.port port is exposed to http protocol calls (DRPCClient uses a thrift protocol call)
  • The supervisor will configure drpc.servers, drpc.invocations.port so that the worker can access drpc server.
  • DRPCClient is accessed using the port specified by drpc.port in addition, client.execute will pass in the function name specified by newDRPCStream here.

doc