Talk about nimbus.seeds parameters of storm client

  storm

Order

This article mainly studies nimbus.seeds parameters of storm client.

NIMBUS_SEEDS

storm-core-1.1.0-sources.jar! /org/apache/storm/Config.java

    /**
     * The host that the master server is running on, added only for backward compatibility,
     * the usage deprecated in favor of nimbus.seeds config.
     */
    @Deprecated
    @isString
    public static final String NIMBUS_HOST = "nimbus.host";

    /**
     * List of seed nimbus hosts to use for leader nimbus discovery.
     */
    @isStringList
    public static final String NIMBUS_SEEDS = "nimbus.seeds";
  • It can be seen that the nimbus.host parameter has been abolished here, while the nimbus.seeds parameter is mainly used to discover nimbus leader.

StormSubmitter

storm-core-1.1.0-sources.jar! /org/apache/storm/StormSubmitter.java

    public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)
            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
        if(!Utils.isValidConf(stormConf)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap(stormConf);
        stormConf.putAll(Utils.readCommandLineOpts());
        Map conf = Utils.readStormConfig();
        conf.putAll(stormConf);
        stormConf.putAll(prepareZookeeperAuthentication(conf));

        validateConfs(conf, topology);

        Map<String,String> passedCreds = new HashMap<>();
        if (opts != null) {
            Credentials tmpCreds = opts.get_creds();
            if (tmpCreds != null) {
                passedCreds = tmpCreds.get_creds();
            }
        }
        Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
        if (!fullCreds.isEmpty()) {
            if (opts == null) {
                opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
            }
            opts.set_creds(new Credentials(fullCreds));
        }
        try {
            if (localNimbus!=null) {
                LOG.info("Submitting topology " + name + " in local mode");
                if (opts!=null) {
                    localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
                } else {
                    // this is for backwards compatibility
                    localNimbus.submitTopology(name, stormConf, topology);
                }
                LOG.info("Finished submitting topology: " +  name);
            } else {
                String serConf = JSONValue.toJSONString(stormConf);
                try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
                    if (topologyNameExists(name, client)) {
                        throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                    }

                    // Dependency uploading only makes sense for distributed mode
                    List<String> jarsBlobKeys = Collections.emptyList();
                    List<String> artifactsBlobKeys;

                    DependencyUploader uploader = new DependencyUploader();
                    try {
                        uploader.init();

                        jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);

                        artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
                    } catch (Throwable e) {
                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster
                        uploader.deleteBlobs(jarsBlobKeys);
                        uploader.shutdown();
                        throw e;
                    }

                    try {
                        setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
                        submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client);
                    } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster
                        // Note that we don't handle TException to delete jars blobs
                        // because it's safer to leave some blobs instead of topology not running
                        uploader.deleteBlobs(jarsBlobKeys);
                        throw e;
                    } finally {
                        uploader.shutdown();
                    }
                }
            }
        } catch(TException e) {
            throw new RuntimeException(e);
        }
        invokeSubmitterHook(name, asUser, conf, topology);

    }
  • StormSubmitter’s submitTopologyAs created NimbusClient through nimbusclient.getconfiguredclientes (conf, asuser)

NimbusClient

storm-core-1.1.0-sources.jar! /org/apache/storm/utils/NimbusClient.java

    public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
        if (conf.containsKey(Config.STORM_DO_AS_USER)) {
            if (asUser != null && !asUser.isEmpty()) {
                LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."
                        , asUser, conf.get(Config.STORM_DO_AS_USER));
            }
            asUser = (String) conf.get(Config.STORM_DO_AS_USER);
        }

        List<String> seeds;
        if(conf.containsKey(Config.NIMBUS_HOST)) {
            LOG.warn("Using deprecated config {} for backward compatibility. Please update your storm.yaml so it only has config {}",
                     Config.NIMBUS_HOST, Config.NIMBUS_SEEDS);
            seeds = Lists.newArrayList(conf.get(Config.NIMBUS_HOST).toString());
        } else {
            seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
        }

        for (String host : seeds) {
            int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
            NimbusSummary nimbusSummary;
            NimbusClient client = null;
            try {
                client = new NimbusClient(conf, host, port, null, asUser);
                nimbusSummary = client.getClient().getLeader();
                if (nimbusSummary != null) {
                    String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port();
                    LOG.info("Found leader nimbus : {}", leaderNimbus);
                    if (nimbusSummary.get_host().equals(host) && nimbusSummary.get_port() == port) {
                        NimbusClient ret = client;
                        client = null;
                        return ret;
                    }
                    try {
                        return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
                    } catch (TTransportException e) {
                        throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);
                    }
                }
            } catch (Exception e) {
                LOG.warn("Ignoring exception while trying to get leader nimbus info from " + host
                                 + ". will retry with a different seed host.", e);
                continue;
            } finally {
                if (client != null) {
                    client.close();
                }
            }
            throw new NimbusLeaderNotFoundException("Could not find a nimbus leader, please try " +
                                                            "again after some time.");
        }
        throw new NimbusLeaderNotFoundException(
                "Could not find leader nimbus from seed hosts " + seeds + ". " +
                        "Did you specify a valid list of nimbus hosts for config " +
                        Config.NIMBUS_SEEDS + "?");
    }
  • The NIMBUS_HOST parameter is still compatible here. if there is NIMBUS_HOST parameter, seeds will be read from it. if there is no NIMBUS_SEEDS parameter, seeds will be obtained.
  • After traversing seeds, NimbusClient is created according to each seed, then client.getClient().getLeader () is called to obtain leader information. if successful, it is determined whether the leader is currently connected seed, if yes, it is returned directly, if not, a new NimbusClient is created according to leader’s host and port to return
  • If nimbusSummary is null, a nimbusleader notfoundexception (“could not find nimbus leader, please try again after some time.”
  • If there is an exception in the connection leader, the next seed will be traversed for retry operation. if all seeds fail to retry, the loop will jump out. Finally, a nimbusleader notfoundexception (“could not findleader nimbus from seedhosts”+seeds+”.did you specify a valid list of nimbus hosts for configmbus.seeds?” )

Summary

  • For storm client, the nimbus.seeds parameter is used by the client to find the nimbus leader, while the nimbus.host parameter has been discarded.
  • The process of finding nimbus leader is to traverse the host configured by seeds one by one, connect, and then obtain the leader’s information. if the acquisition is successful but nimbusSummary is null, then a nimbus leader notfoundexception (“could not find a nimbus leader, please try again after some time.”) is thrown.
  • If there is an exception, go through the next seed to retry, if all are unsuccessful, then jump out of the loop at last. Throw nimbusleardernfoundexception (“could not findleadernimbus from seedhosts”+seeds+”.did you specify a validlist of nimbus hosts for configmbus.seeds?” )

doc