Talk about LeaderLatch of curator recipes.

  zookeeper

Order

This article mainly studies the LeaderLatch of curator recipes.

Example

    @Test
    public void testCuratorLeaderLatch() throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
        client.start();

        String leaderLockPath = "/leader-lock2";

        List<LeaderLatch> latchList = IntStream.rangeClosed(1,10)
                .parallel()
                .mapToObj(i -> new LeaderLatch(client,leaderLockPath,"client"+i))
                .collect(Collectors.toList());

        latchList.parallelStream()
                .forEach(latch -> {
                    try {
                        latch.start();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });

        TimeUnit.SECONDS.sleep(5);

        Iterator<LeaderLatch> iterator = latchList.iterator();
        while (iterator.hasNext()){
            LeaderLatch latch = iterator.next();
            if(latch.hasLeadership()){
                System.out.println(latch.getId() + " hasLeadership");
                try {
                    latch.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                iterator.remove();
            }
        }


        TimeUnit.SECONDS.sleep(5);

        latchList.stream()
                .filter(latch -> latch.hasLeadership())
                .forEach(latch -> System.out.println(latch.getId() + " hasLeadership"));

        Participant participant = latchList.get(0).getLeader();
        System.out.println(participant);


        TimeUnit.MINUTES.sleep(15);

        latchList.stream()
                .forEach(latch -> {
                    try {
                        latch.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
        client.close();
    }
  • ZkCli query
[zk: localhost:2181(CONNECTED) 17] ls /
[leader-lock1, leader-lock2, zookeeper, leader-lock]
[zk: localhost:2181(CONNECTED) 18] ls /leader-lock2
[_c_4e86edb9-075f-4e18-a00c-cbf4fbf11b23-latch-0000000048, _c_b53efe1b-39ba-48df-8edb-905ddcccf5c9-latch-0000000042, _c_5ea234cc-8350-47ef-beda-8795694b62f6-latch-0000000045, _c_5f3330d9-384c-4abf-8f3e-21623213a374-latch-0000000044, _c_3fdec032-b8a4-44b9-9a9f-20285553a23e-latch-0000000049, _c_97a53125-0ab1-48ea-85cc-cdba631ce20f-latch-0000000047, _c_2bb56be2-ba17-485e-bbd3-10aa1d6af57c-latch-0000000043, _c_93fb732d-541b-48c6-aca7-dd2cd9b6f93e-latch-0000000041, _c_e09f0307-344c-4041-ab71-d68e10a48d02-latch-0000000046, _c_754a4f90-b03c-4803-915b-0654ad35ec9f-latch-0000000040]

LeaderLatch.start

curator-recipes-4.0.1-sources.jar! /org/apache/curator/framework/recipes/leader/LeaderLatch.java

    /**
     * Add this instance to the leadership election and attempt to acquire leadership.
     *
     * @throws Exception errors
     */
    public void start() throws Exception
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    internalStart();
                }
                finally
                {
                    startTask.set(null);
                }
            }
        }));
    }

    private synchronized void internalStart()
    {
        if ( state.get() == State.STARTED )
        {
            client.getConnectionStateListenable().addListener(listener);
            try
            {
                reset();
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                log.error("An error occurred checking resetting leadership.", e);
            }
        }
    }

    @VisibleForTesting
    void reset() throws Exception
    {
        setLeadership(false);
        setNode(null);

        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( debugResetWaitLatch != null )
                {
                    debugResetWaitLatch.await();
                    debugResetWaitLatch = null;
                }

                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    setNode(event.getName());
                    if ( state.get() == State.CLOSED )
                    {
                        setNode(null);
                    }
                    else
                    {
                        getChildren();
                    }
                }
                else
                {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
        client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }
  • Here, the start method means participating in the election, and the reset method creates child nodes through the forPath.
  • Here, zkpaths.makepath (latchpath, lock _ name) returns /latchPath/latch-
  • Here is a callback, which mainly deals with getChildren.

CreateBuilderImpl.forPath

curator-framework-4.0.1-sources.jar! /org/apache/curator/framework/imps/CreateBuilderImpl.java

    @VisibleForTesting
    static final String PROTECTED_PREFIX = "_c_";

    @Override
    public String forPath(final String givenPath, byte[] data) throws Exception
    {
        if ( compress )
        {
            data = client.getCompressionProvider().compress(givenPath, data);
        }

        final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
        List<ACL> aclList = acling.getAclList(adjustedPath);
        client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);

        String returnPath = null;
        if ( backgrounding.inBackground() )
        {
            pathInBackground(adjustedPath, data, givenPath);
        }
        else
        {
            String path = protectedPathInForeground(adjustedPath, data, aclList);
            returnPath = client.unfixForNamespace(path);
        }
        return returnPath;
    }

    @VisibleForTesting
    String adjustPath(String path) throws Exception
    {
        if ( doProtected )
        {
            ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
            String name = getProtectedPrefix(protectedId) + pathAndNode.getNode();
            path = ZKPaths.makePath(pathAndNode.getPath(), name);
        }
        return path;
    }

    private static String getProtectedPrefix(String protectedId)
    {
        return PROTECTED_PREFIX + protectedId + "-";
    }
  • If no namespace was specified when the CuratorFramework was created, client.fixForNamespace here returns the original value.
  • AdjustPath adds PROTECTED_PREFIX and protectedId (UUID) and-for example, it turned out to be latch- after processing, it became _ c _ a749fd26-b739-4510-9e1b-d2974f6d1d1d1-LATCH-
  • After that, because the EPHEMERAL_SEQUENTIAL is created, a number will be added at the end, such as/leader-lock2/_ c _ a749fd26-b739-4510-9e1b-d2974f6d1d1d1-latch-0000000045, and the value of the node is the id specified by LeaderLatch

LeaderLatch.getChildren

curator-recipes-4.0.1-sources.jar! /org/apache/curator/framework/recipes/leader/LeaderLatch.java

    private void getChildren() throws Exception
    {
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    checkLeadership(event.getChildren());
                }
            }
        };
        client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
    }

    private void checkLeadership(List<String> children) throws Exception
    {
        final String localOurPath = ourPath.get();
        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
        if ( ourIndex < 0 )
        {
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            reset();
        }
        else if ( ourIndex == 0 )
        {
            setLeadership(true);
        }
        else
        {
            String watchPath = sortedChildren.get(ourIndex - 1);
            Watcher watcher = new Watcher()
            {
                @Override
                public void process(WatchedEvent event)
                {
                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
                    {
                        try
                        {
                            getChildren();
                        }
                        catch ( Exception ex )
                        {
                            ThreadUtils.checkInterrupted(ex);
                            log.error("An error occurred checking the leadership.", ex);
                        }
                    }
                }
            };

            BackgroundCallback callback = new BackgroundCallback()
            {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                {
                    if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                    {
                        // previous node is gone - reset
                        reset();
                    }
                }
            };
            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
            client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
        }
    }
  • The checkLeadership method is mainly called here. the method marks leader for index 0, and adds watch for index greater than 0. the path of watch is the previous node. if the previous node is deleted, the getChildren method will be triggered again
  • A callback is also registered here. If the previous node is deleted, the reset operation will be triggered again.

LeaderLatch.close

curator-recipes-4.0.1-sources.jar! /org/apache/curator/framework/recipes/leader/LeaderLatch.java

    /**
     * Remove this instance from the leadership election. If this instance is the leader, leadership
     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
     * instances must eventually be closed.
     *
     * @throws IOException errors
     */
    @Override
    public void close() throws IOException
    {
        close(closeMode);
    }

    /**
     * Remove this instance from the leadership election. If this instance is the leader, leadership
     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
     * instances must eventually be closed.
     *
     * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
     * @throws IOException errors
     */
    public synchronized void close(CloseMode closeMode) throws IOException
    {
        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
        Preconditions.checkNotNull(closeMode, "closeMode cannot be null");

        cancelStartTask();

        try
        {
            setNode(null);
            client.removeWatchers();
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            throw new IOException(e);
        }
        finally
        {
            client.getConnectionStateListenable().removeListener(listener);

            switch ( closeMode )
            {
            case NOTIFY_LEADER:
            {
                setLeadership(false);
                listeners.clear();
                break;
            }

            default:
            {
                listeners.clear();
                setLeadership(false);
                break;
            }
            }
        }
    }

    private synchronized void setLeadership(boolean newValue)
    {
        boolean oldValue = hasLeadership.getAndSet(newValue);

        if ( oldValue && !newValue )
        { // Lost leadership, was true, now false
            listeners.forEach(new Function<LeaderLatchListener, Void>()
                {
                    @Override
                    public Void apply(LeaderLatchListener listener)
                    {
                        listener.notLeader();
                        return null;
                    }
                });
        }
        else if ( !oldValue && newValue )
        { // Gained leadership, was false, now true
            listeners.forEach(new Function<LeaderLatchListener, Void>()
                {
                    @Override
                    public Void apply(LeaderLatchListener input)
                    {
                        input.isLeader();
                        return null;
                    }
                });
        }

        notifyAll();
    }
  • The close method is used to withdraw the LeaderLatch from the election. if the latch is a leader, the leader needs to be released.
  • The close method first cancel the StartTask, sets the node value to null, then removes the watcher and ConnectionStateListener, finally sets the leader to false, and then triggers the related listener.
  • Note: If closeMode is NOTIFY_LEADER, set leadership to false, trigger the relevant listener, and then remove the Listener; Otherwise, the listener will be removed before being set to false.
  • SetLeadership calls listener.notLeader () or input.isLeader () based on the old and new values.

ConnectionStateListener

curator-recipes-4.0.1-sources.jar! /org/apache/curator/framework/recipes/leader/LeaderLatch.java

    private final ConnectionStateListener listener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
            handleStateChange(newState);
        }
    };

    private void handleStateChange(ConnectionState newState)
    {
        switch ( newState )
        {
            default:
            {
                // NOP
                break;
            }

            case RECONNECTED:
            {
                try
                {
                    if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
                    {
                        reset();
                    }
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    log.error("Could not reset leader latch", e);
                    setLeadership(false);
                }
                break;
            }

            case SUSPENDED:
            {
                if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) )
                {
                    setLeadership(false);
                }
                break;
            }

            case LOST:
            {
                setLeadership(false);
                break;
            }
        }
    }
  • LeaderLatch has registered a custom ConnectionStateListener, which is processed when RECONNECTED, SUSPENDED, and LOST respectively.
  • When setLeadership(false), it will notify the corresponding listener to do processing according to the new and old values. If it was originally a leader, it will call back the listener.notLeader ()
  • For the RECONNECTED state, if the current latch is not a leader, call reset and go back to the start process registration node.

Summary

  • LeaderLatch of curator recipes provides us with a convenient method of leader election and LeaderLatchListener for custom processing.
  • LeaderLatch uses the EPHEMERAL_SEQUENTIAL of zk, the node name will be automatically numbered, the default LOCK_NAME is latch-, and for protected, PROTECTED_PREFIX (_c_) and protectedId (UUID), so the format of the last node name is protected _ prefix+uuid+lock _ name+number, similar to _ c _ a749fd26-b739-4510-9e1b-d2974f6d1d1d1-latch-000000045
  • Leaderwatch uses ConnectionStateListener to process the changes of its own node, taking the node bit leader with index of 0. For non-leader, it also adds a watcher to the previous node to process the deletion of the previous node, triggering the checkLeadership operation, rechecking whether its own index is ranked first in children, updating to Leader if it is, triggering the corresponding operation, and if not, rechecking the previous node. The realization of such a link by link is very subtle.

doc