Talk about LeaderElector of storm nimbus.



This article mainly studies the LeaderElector of storm nimbus.



    public static void main(String[] args) throws Exception {
        launch(new StandaloneINimbus());

    public static Nimbus launch(INimbus inimbus) throws Exception {
        Map<String, Object> conf = Utils.merge(ConfigUtils.readStormConfig(),
                                               ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
        boolean fixupAcl = (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP);
        boolean checkAcl = fixupAcl || (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK);
        if (checkAcl) {
            AclEnforcement.verifyAcls(conf, fixupAcl);
        return launchServer(conf, inimbus);

    private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception {
        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
        final Nimbus nimbus = new Nimbus(conf, inimbus, metricsRegistry);
        final ThriftServer server = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS);
        Utils.addShutdownHookWithDelayedForceKill(() -> {
        }, 10);
        if (ClientAuthUtils.areWorkerTokensEnabledServer(server, conf)) {
        }"Starting nimbus server for storm version '{}'", STORM_VERSION);
        return nimbus;

    public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
                  BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper,
                  StormMetricsRegistry metricsRegistry)
        throws Exception {

        if (blobStore == null) {
            blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo, null);
        this.blobStore = blobStore;

        if (topoCache == null) {
            topoCache = new TopoCache(blobStore, conf);
        if (leaderElector == null) {
            leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf),
        this.leaderElector = leaderElector;


    public void launchServer() throws Exception {
        try {
            BlobStore store = blobStore;
            IStormClusterState state = stormClusterState;
            NimbusInfo hpi = nimbusHostPortInfo;

  "Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));

            //add to nimbuses
                                new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION));
            for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {

            if (isLeader()) {
                for (String topoId : state.activeStorms()) {
                    transition(topoId, TopologyActions.STARTUP, null);

        } catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                throw e;

            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
                throw e;
            LOG.error("Error on initialization of nimbus", e);
            Utils.exitProcess(13, "Error on initialization of nimbus");
  • Nimbus calls Zookeeper.zkLeaderElector to create leaderElector in the constructor.
  • LaunchServer method called leader elector.addtoleaderlocuqueue () to participate in leader election


storm-core-1.1.0-sources.jar! /org/apache/storm/zookeeper/

    public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {
        return _instance.zkLeaderElectorImpl(conf, blobStore);

    protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {
        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
        CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
        String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";
        String id = NimbusInfo.fromConf(conf).toHostPortString();
        AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
        AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
                new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));
        return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
            leaderLatchListenerAtomicReference, blobStore);
  • LeaderLatch was created here using the /leader-lock path, and then leaderLatchListenerImpl was used to create LeaderLatchListener.
  • Finally, use LeaderElectorImp to create ILeaderElector.


storm-core-1.1.0-sources.jar! /org/apache/storm/zookeeper/

    // Leader latch listener that will be invoked when we either gain or lose leadership
    public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
        final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
        return new LeaderLatchListener() {
            final String STORM_JAR_SUFFIX = "-stormjar.jar";
            final String STORM_CODE_SUFFIX = "-stormcode.ser";
            final String STORM_CONF_SUFFIX = "-stormconf.ser";

            public void isLeader() {
                Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));

                Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
                Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
                Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());
                Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);

                // this finds all active topologies blob keys from all local topology blob keys
                Sets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);
      "active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]",
                        generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),

                if (diffTopology.isEmpty()) {
                    Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);

                    // this finds all dependency blob keys from active topologies from all local blob keys
                    Sets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys);
          "active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]",
                            generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),

                    if (diffDependencies.isEmpty()) {
              "Accepting leadership, all active topologies and corresponding dependencies found locally.");
                    } else {
              "Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");
                } else {
          "code for all active topologies not available locally, giving up leadership.");

            public void notLeader() {
      "{} lost leadership.", hostName);


            private void closeLatch() {
                try {
                } catch (IOException e) {
                    throw new RuntimeException(e);
  • LeaderLatchListenerImpl returns an implementation class for the LeaderLatchListener interface
  • Some checks have been made in the isLeader interface, that is, when zookeeper selects it as leader, if there are not all active topologies in the local area or all dependencies in the local area, then it is necessary to call leaderLatch.close () to give up the leadership.
  • NotLeader interface mainly prints log



public class LeaderElectorImp implements ILeaderElector {
    private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
    private final Map<String, Object> conf;
    private final List<String> servers;
    private final CuratorFramework zk;
    private final String leaderlockPath;
    private final String id;
    private final AtomicReference<LeaderLatch> leaderLatch;
    private final AtomicReference<LeaderLatchListener> leaderLatchListener;
    private final BlobStore blobStore;
    private final TopoCache tc;
    private final IStormClusterState clusterState;
    private final List<ACL> acls;
    private final StormMetricsRegistry metricsRegistry;

    public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,
                            AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,
                            BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
                            StormMetricsRegistry metricsRegistry) {
        this.conf = conf;
        this.servers = servers;
        this.zk = zk;
        this.leaderlockPath = leaderlockPath; = id;
        this.leaderLatch = leaderLatch;
        this.leaderLatchListener = leaderLatchListener;
        this.blobStore = blobStore; = tc;
        this.clusterState = clusterState;
        this.acls = acls;
        this.metricsRegistry = metricsRegistry;

    public void prepare(Map<String, Object> conf) {
        // no-op for zookeeper implementation

    public void addToLeaderLockQueue() throws Exception {
        // if this latch is already closed, we need to create new instance.
        if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
            leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
            LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,
  "LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
        // Only if the latch is not already started we invoke start
        if (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) {
  "Queued up for leader lock.");
        } else {
  "Node already in queue for leader lock.");

    // Only started latches can be closed.
    public void removeFromLeaderLockQueue() throws Exception {
        if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) {
  "Removed from leader lock queue.");
        } else {
  "leader latch is not started so no removeFromLeaderLockQueue needed.");

    public boolean isLeader() throws Exception {
        return leaderLatch.get().hasLeadership();

    public NimbusInfo getLeader() {
        try {
            return Zookeeper.toNimbusInfo(leaderLatch.get().getLeader());
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);

    public List<NimbusInfo> getAllNimbuses() throws Exception {
        List<NimbusInfo> nimbusInfos = new ArrayList<>();
        Collection<Participant> participants = leaderLatch.get().getParticipants();
        for (Participant participant : participants) {
        return nimbusInfos;

    public void close() {
        //Do nothing now.
  • LeaderElectorImp Implements the ILeaderElector Interface
  • The addToLeaderLockQueue method detects that if latch has been closed, a new one is recreated, and then detects the state of latch. if there is no start, start is called to participate in the election.
  • There are two main reasons for creating a closed latch: one is that calling a method on a closed latch throws an exception; the other is that zk elects it as a leader, but some leader conditions that are not satisfied with storm will give up the leader, that is, close it off.


  • Stornimbus’s LeaderElector is mainly implemented based on the LeaderLatch of zookeeper recipies.
  • Storm nimbus has customized LeaderLatchListener. To verify nimbus after becoming a leader, it needs to own all active topologies and all dependencies locally, otherwise it will give up leadership.