Talk about MasterFaultDetection of elasticsearch.

  elasticsearch

Order

This article mainly studies MasterFaultDetection of elasticsearch.

FaultDetection

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java

/**
 * A base class for {@link MasterFaultDetection} & {@link NodesFaultDetection},
 * making sure both use the same setting.
 */
public abstract class FaultDetection implements Closeable {

    private static final Logger logger = LogManager.getLogger(FaultDetection.class);

    public static final Setting<Boolean> CONNECT_ON_NETWORK_DISCONNECT_SETTING =
        Setting.boolSetting("discovery.zen.fd.connect_on_network_disconnect", false, Property.NodeScope, Property.Deprecated);
    public static final Setting<TimeValue> PING_INTERVAL_SETTING =
        Setting.positiveTimeSetting("discovery.zen.fd.ping_interval", timeValueSeconds(1), Property.NodeScope, Property.Deprecated);
    public static final Setting<TimeValue> PING_TIMEOUT_SETTING =
        Setting.timeSetting("discovery.zen.fd.ping_timeout", timeValueSeconds(30), Property.NodeScope, Property.Deprecated);
    public static final Setting<Integer> PING_RETRIES_SETTING =
        Setting.intSetting("discovery.zen.fd.ping_retries", 3, Property.NodeScope, Property.Deprecated);
    public static final Setting<Boolean> REGISTER_CONNECTION_LISTENER_SETTING =
        Setting.boolSetting("discovery.zen.fd.register_connection_listener", true, Property.NodeScope, Property.Deprecated);

    protected final ThreadPool threadPool;
    protected final ClusterName clusterName;
    protected final TransportService transportService;

    // used mainly for testing, should always be true
    protected final boolean registerConnectionListener;
    protected final FDConnectionListener connectionListener;
    protected final boolean connectOnNetworkDisconnect;

    protected final TimeValue pingInterval;
    protected final TimeValue pingRetryTimeout;
    protected final int pingRetryCount;

    public FaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
        this.threadPool = threadPool;
        this.transportService = transportService;
        this.clusterName = clusterName;

        this.connectOnNetworkDisconnect = CONNECT_ON_NETWORK_DISCONNECT_SETTING.get(settings);
        this.pingInterval = PING_INTERVAL_SETTING.get(settings);
        this.pingRetryTimeout = PING_TIMEOUT_SETTING.get(settings);
        this.pingRetryCount = PING_RETRIES_SETTING.get(settings);
        this.registerConnectionListener = REGISTER_CONNECTION_LISTENER_SETTING.get(settings);

        this.connectionListener = new FDConnectionListener();
        if (registerConnectionListener) {
            transportService.addConnectionListener(connectionListener);
        }
    }

    @Override
    public void close() {
        transportService.removeConnectionListener(connectionListener);
    }

    /**
     * This method will be called when the {@link org.elasticsearch.transport.TransportService} raised a node disconnected event
     */
    abstract void handleTransportDisconnect(DiscoveryNode node);

    private class FDConnectionListener implements TransportConnectionListener {
        @Override
        public void onNodeDisconnected(DiscoveryNode node) {
            AbstractRunnable runnable = new AbstractRunnable() {
                @Override
                public void onFailure(Exception e) {
                    logger.warn("failed to handle transport disconnect for node: {}", node);
                }

                @Override
                protected void doRun() {
                    handleTransportDisconnect(node);
                }
            };
            threadPool.generic().execute(runnable);
        }
    }

}
  • FaultDetection implements the closeable interface, which defines FDConnectionListener. Its constructor will add FDConnectionListener to transportService when registerConnectionListener is true, while the Close method removes FDConnectionListener from transportService. FaultDetection also defines the abstract method handleTransportDisconnect

MasterFaultDetection

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

public class MasterFaultDetection extends FaultDetection {

    private static final Logger logger = LogManager.getLogger(MasterFaultDetection.class);

    public static final String MASTER_PING_ACTION_NAME = "internal:discovery/zen/fd/master_ping";

    public interface Listener {

        /** called when pinging the master failed, like a timeout, transport disconnects etc */
        void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason);

    }

    private final MasterService masterService;
    private final java.util.function.Supplier<ClusterState> clusterStateSupplier;
    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();

    private volatile MasterPinger masterPinger;

    private final Object masterNodeMutex = new Object();

    private volatile DiscoveryNode masterNode;

    private volatile int retryCount;

    private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean();

    public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
                                java.util.function.Supplier<ClusterState> clusterStateSupplier, MasterService masterService,
                                ClusterName clusterName) {
        super(settings, threadPool, transportService, clusterName);
        this.clusterStateSupplier = clusterStateSupplier;
        this.masterService = masterService;

        logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout,
            pingRetryCount);

        transportService.registerRequestHandler(
            MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, false, false, new MasterPingRequestHandler());
    }

    @Override
    public void close() {
        super.close();
        stop("closing");
        this.listeners.clear();
    }

    @Override
    protected void handleTransportDisconnect(DiscoveryNode node) {
        synchronized (masterNodeMutex) {
            if (!node.equals(this.masterNode)) {
                return;
            }
            if (connectOnNetworkDisconnect) {
                try {
                    transportService.connectToNode(node);
                    // if all is well, make sure we restart the pinger
                    if (masterPinger != null) {
                        masterPinger.stop();
                    }
                    this.masterPinger = new MasterPinger();
                    // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                    threadPool.schedule(masterPinger, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME);
                } catch (Exception e) {
                    logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                    notifyMasterFailure(masterNode, null, "transport disconnected (with verified connect)");
                }
            } else {
                logger.trace("[master] [{}] transport disconnected", node);
                notifyMasterFailure(node, null, "transport disconnected");
            }
        }
    }

    private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
        if (notifiedMasterFailure.compareAndSet(false, true)) {
            try {
                threadPool.generic().execute(() -> {
                    for (Listener listener : listeners) {
                        listener.onMasterFailure(masterNode, cause, reason);
                    }
                });
            } catch (EsRejectedExecutionException e) {
                logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
            }
            stop("master failure, " + reason);
        }
    }

    //......
}
  • MasterFaultDetection inherits FaultDetection and its constructor registers MasterPingRequestHandler with transportService.
  • Its handleTransportDisconnect method retries node if connectnetworkdisconnect is true, re-registers the delay task of MasterPinger if retry succeeds, and calls notifyMasterFailure method if retry fails or connectnetworkdisconnect is false.
  • The notifyMasterFailure method calls back the onMasterFailure method of the MasterFaultDetection.Listener

MasterPingRequestHandler

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

    private class MasterPingRequestHandler implements TransportRequestHandler<MasterPingRequest> {

        @Override
        public void messageReceived(final MasterPingRequest request, final TransportChannel channel, Task task) throws Exception {
            final DiscoveryNodes nodes = clusterStateSupplier.get().nodes();
            // check if we are really the same master as the one we seemed to be think we are
            // this can happen if the master got "kill -9" and then another node started using the same port
            if (!request.masterNode.equals(nodes.getLocalNode())) {
                throw new ThisIsNotTheMasterYouAreLookingForException();
            }

            // ping from nodes of version < 1.4.0 will have the clustername set to null
            if (request.clusterName != null && !request.clusterName.equals(clusterName)) {
                logger.trace("master fault detection ping request is targeted for a different [{}] cluster then us [{}]",
                    request.clusterName, clusterName);
                throw new ThisIsNotTheMasterYouAreLookingForException("master fault detection ping request is targeted for a different ["
                    + request.clusterName + "] cluster then us [" + clusterName + "]");
            }

            // when we are elected as master or when a node joins, we use a cluster state update thread
            // to incorporate that information in the cluster state. That cluster state is published
            // before we make it available locally. This means that a master ping can come from a node
            // that has already processed the new CS but it is not known locally.
            // Therefore, if we fail we have to check again under a cluster state thread to make sure
            // all processing is finished.
            //

            if (!nodes.isLocalNodeElectedMaster() || !nodes.nodeExists(request.sourceNode)) {
                logger.trace("checking ping from {} under a cluster state thread", request.sourceNode);
                masterService.submitStateUpdateTask("master ping (from: " + request.sourceNode + ")", new ClusterStateUpdateTask() {

                    @Override
                    public ClusterState execute(ClusterState currentState) throws Exception {
                        // if we are no longer master, fail...
                        DiscoveryNodes nodes = currentState.nodes();
                        if (!nodes.nodeExists(request.sourceNode)) {
                            throw new NodeDoesNotExistOnMasterException();
                        }
                        return currentState;
                    }

                    @Override
                    public void onNoLongerMaster(String source) {
                        onFailure(source, new NotMasterException("local node is not master"));
                    }

                    @Override
                    public void onFailure(String source, @Nullable Exception e) {
                        if (e == null) {
                            e = new ElasticsearchException("unknown error while processing ping");
                        }
                        try {
                            channel.sendResponse(e);
                        } catch (IOException inner) {
                            inner.addSuppressed(e);
                            logger.warn("error while sending ping response", inner);
                        }
                    }

                    @Override
                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                        try {
                            channel.sendResponse(new MasterPingResponseResponse());
                        } catch (IOException e) {
                            logger.warn("error while sending ping response", e);
                        }
                    }
                });
            } else {
                // send a response, and note if we are connected to the master or not
                channel.sendResponse(new MasterPingResponseResponse());
            }
        }
    }

    public static class MasterPingRequest extends TransportRequest {

        public DiscoveryNode sourceNode;

        private DiscoveryNode masterNode;
        private ClusterName clusterName;

        public MasterPingRequest() {
        }

        public MasterPingRequest(DiscoveryNode sourceNode, DiscoveryNode masterNode, ClusterName clusterName) {
            this.sourceNode = sourceNode;
            this.masterNode = masterNode;
            this.clusterName = clusterName;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            sourceNode = new DiscoveryNode(in);
            masterNode = new DiscoveryNode(in);
            clusterName = new ClusterName(in);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            sourceNode.writeTo(out);
            masterNode.writeTo(out);
            clusterName.writeTo(out);
        }
    }

    public static class MasterPingResponseResponse extends TransportResponse {

        public MasterPingResponseResponse() {
        }

        public MasterPingResponseResponse(StreamInput in) throws IOException {
            super(in);
        }
    }
  • MasterPingRequestHandler is used to respond to MasterPingRequest requests. it will execute ClusterStateUpdateTask if localNode is not master or sourceN ode exists, otherwise, it will directly return MasterPingResponseResponse.
  • The execute method of ClusterStateUpdateTask checks whether the sourceNode of the request exists or not, and if not, throws nodedoesnotexiston masterexception exception
  • The onNoLongerMaster method of ClusterStateUpdateTask calls the onFailure method, passing an exception of NotMasterException; ; The onFailure method determines whether the exception is null, and if it is null, it creates an ElasticsearchException exception and returns an exception response. The clusterStateProcessed method returns MasterPingResponseResponse

ZenDiscovery.processNextCommittedClusterState

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {
    //......

    // return true if state has been sent to applier
    boolean processNextCommittedClusterState(String reason) {
        assert Thread.holdsLock(stateMutex);

        final ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess();
        final ClusterState currentState = committedState.get();
        // all pending states have been processed
        if (newClusterState == null) {
            return false;
        }

        assert newClusterState.nodes().getMasterNode() != null : "received a cluster state without a master";
        assert !newClusterState.blocks().hasGlobalBlock(noMasterBlockService.getNoMasterBlock()) :
            "received a cluster state with a master block";

        if (currentState.nodes().isLocalNodeElectedMaster() && newClusterState.nodes().isLocalNodeElectedMaster() == false) {
            handleAnotherMaster(currentState, newClusterState.nodes().getMasterNode(), newClusterState.version(),
                "via a new cluster state");
            return false;
        }

        try {
            if (shouldIgnoreOrRejectNewClusterState(logger, currentState, newClusterState)) {
                String message = String.format(
                    Locale.ROOT,
                    "rejecting cluster state version [%d] uuid [%s] received from [%s]",
                    newClusterState.version(),
                    newClusterState.stateUUID(),
                    newClusterState.nodes().getMasterNodeId()
                );
                throw new IllegalStateException(message);
            }
        } catch (Exception e) {
            try {
                pendingStatesQueue.markAsFailed(newClusterState, e);
            } catch (Exception inner) {
                inner.addSuppressed(e);
                logger.error(() -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);
            }
            return false;
        }

        if (currentState.blocks().hasGlobalBlock(noMasterBlockService.getNoMasterBlock())) {
            // its a fresh update from the master as we transition from a start of not having a master to having one
            logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId());
        }

        if (currentState == newClusterState) {
            return false;
        }

        committedState.set(newClusterState);

        // update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest
        // and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node
        if (newClusterState.nodes().isLocalNodeElectedMaster()) {
            // update the set of nodes to ping
            nodesFD.updateNodesAndPing(newClusterState);
        } else {
            // check to see that we monitor the correct master of the cluster
            if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) {
                masterFD.restart(newClusterState.nodes().getMasterNode(),
                    "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
            }
        }

        //......

        return true;
    }

    //......
}
  • ZenDiscovery’s processnextcommmitedclusterstate method will execute the masterFD.restart method when the current node is not masterwhen masterFD.master Node () is null or masterFD.masterNode () and newclusterstate.nodes (). getmasternode ().

MasterFaultDetection.restart

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

public class MasterFaultDetection extends FaultDetection {

    //......

    public void restart(DiscoveryNode masterNode, String reason) {
        synchronized (masterNodeMutex) {
            if (logger.isDebugEnabled()) {
                logger.debug("[master] restarting fault detection against master [{}], reason [{}]", masterNode, reason);
            }
            innerStop();
            innerStart(masterNode);
        }
    }

    private void innerStart(final DiscoveryNode masterNode) {
        this.masterNode = masterNode;
        this.retryCount = 0;
        this.notifiedMasterFailure.set(false);
        if (masterPinger != null) {
            masterPinger.stop();
        }
        this.masterPinger = new MasterPinger();

        // we start pinging slightly later to allow the chosen master to complete it's own master election
        threadPool.schedule(masterPinger, pingInterval, ThreadPool.Names.SAME);
    }

    private void innerStop() {
        // also will stop the next ping schedule
        this.retryCount = 0;
        if (masterPinger != null) {
            masterPinger.stop();
            masterPinger = null;
        }
        this.masterNode = null;
    }

    //......
}
  • Inside the restart method of MasterFaultDetection, innerStop is executed first, and then innerStart; is executed. InnerStop mainly executes masterPinger.stop () and sets masterPinger and masterNode to null; ; The innerStart method creates and registers a delay task of MasterPinger and delays pingInterval execution.

MasterPinger

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

    private class MasterPinger implements Runnable {

        private volatile boolean running = true;

        public void stop() {
            this.running = false;
        }

        @Override
        public void run() {
            if (!running) {
                // return and don't spawn...
                return;
            }
            final DiscoveryNode masterToPing = masterNode;
            if (masterToPing == null) {
                // master is null, should not happen, but we are still running, so reschedule
                threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME);
                return;
            }

            final MasterPingRequest request = new MasterPingRequest(
                clusterStateSupplier.get().nodes().getLocalNode(), masterToPing, clusterName);
            final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
                .withTimeout(pingRetryTimeout).build();
            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options,
                new TransportResponseHandler<MasterPingResponseResponse>() {
                        @Override
                        public MasterPingResponseResponse read(StreamInput in) throws IOException {
                            return new MasterPingResponseResponse(in);
                        }

                        @Override
                        public void handleResponse(MasterPingResponseResponse response) {
                            if (!running) {
                                return;
                            }
                            // reset the counter, we got a good result
                            MasterFaultDetection.this.retryCount = 0;
                            // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                            if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                // we don't stop on disconnection from master, we keep pinging it
                                threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME);
                            }
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            if (!running) {
                                return;
                            }
                            synchronized (masterNodeMutex) {
                                // check if the master node did not get switched on us...
                                if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                    if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                                        handleTransportDisconnect(masterToPing);
                                        return;
                                    } else if (exp.getCause() instanceof NotMasterException) {
                                        logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                        notifyMasterFailure(masterToPing, exp, "no longer master");
                                        return;
                                    } else if (exp.getCause() instanceof ThisIsNotTheMasterYouAreLookingForException) {
                                        logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                        notifyMasterFailure(masterToPing, exp,"not master");
                                        return;
                                    } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
                                        logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure"
                                            , masterNode);
                                        notifyMasterFailure(masterToPing, exp,"do not exists on master, act as master failure");
                                        return;
                                    }

                                    int retryCount = ++MasterFaultDetection.this.retryCount;
                                    logger.trace(() -> new ParameterizedMessage(
                                            "[master] failed to ping [{}], retry [{}] out of [{}]",
                                            masterNode, retryCount, pingRetryCount), exp);
                                    if (retryCount >= pingRetryCount) {
                                        logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout",
                                            masterNode, pingRetryCount, pingRetryTimeout);
                                        // not good, failure
                                        notifyMasterFailure(masterToPing, null, "failed to ping, tried [" + pingRetryCount
                                            + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                                    } else {
                                        // resend the request, not reschedule, rely on send timeout
                                        transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
                                    }
                                }
                            }
                        }

                        @Override
                        public String executor() {
                            return ThreadPool.Names.SAME;
                        }
                    }
            );
        }
    }

    private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
        if (notifiedMasterFailure.compareAndSet(false, true)) {
            try {
                threadPool.generic().execute(() -> {
                    for (Listener listener : listeners) {
                        listener.onMasterFailure(masterNode, cause, reason);
                    }
                });
            } catch (EsRejectedExecutionException e) {
                logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
            }
            stop("master failure, " + reason);
        }
    }
  • The run method of MasterPinger first judges whether masterToPing is null, and if it is null, it registers the delayed task of MasterPinger; If it is not null, send MasterPingRequest request to masterToPing
  • The handleResponse method of TransportResponseHandler will empty masterfault detection.this.retry count, and then judge whether the masterNode has changed. if there is no change, continue to register the MasterPinger’s delay task.
  • The handleException method of TransportResponseHandler will handle the exception on the premise that the masterNode does not change. if it is ConnectTr ansportException, the handleTransportDisconnect method will be executed. If it is NotMasterException, ThisisNotTheMasterYouAreLookingForException, NodeDocNoteExistsMasterException, the notifyMasterFailure method is executed, and other exceptions are retried.
  • When retrying, first increment MasterFault Detection. This. RetryCount. If the number of retries is greater than or equal to pingRetryCount, then directly execute notifyMasterFailure method, otherwise retry sending MasterPingRequest request.
  • The notifyMasterFailure method calls back the onMasterFailure method of the MasterFaultDetection.Listener

ZenDiscovery.MasterNodeFailureListener

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

    private class MasterNodeFailureListener implements MasterFaultDetection.Listener {

        @Override
        public void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason) {
            handleMasterGone(masterNode, cause, reason);
        }
    }

    private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
        if (lifecycleState() != Lifecycle.State.STARTED) {
            // not started, ignore a master failure
            return;
        }
        if (localNodeMaster()) {
            // we might get this on both a master telling us shutting down, and then the disconnect failure
            return;
        }

        logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);

        synchronized (stateMutex) {
            if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
                // flush any pending cluster states from old master, so it will not be set as master again
                pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
                rejoin("master left (reason = " + reason + ")");
            }
        }
    }

    protected void rejoin(String reason) {
        assert Thread.holdsLock(stateMutex);
        ClusterState clusterState = committedState.get();

        logger.warn("{}, current nodes: {}", reason, clusterState.nodes());
        nodesFD.stop();
        masterFD.stop(reason);

        // TODO: do we want to force a new thread if we actively removed the master? this is to give a full pinging cycle
        // before a decision is made.
        joinThreadControl.startNewThreadIfNotRunning();

        if (clusterState.nodes().getMasterNodeId() != null) {
            // remove block if it already exists before adding new one
            assert clusterState.blocks().hasGlobalBlockWithId(noMasterBlockService.getNoMasterBlock().id()) == false :
                "NO_MASTER_BLOCK should only be added by ZenDiscovery";
            ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks())
                .addGlobalBlock(noMasterBlockService.getNoMasterBlock())
                .build();

            DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build();
            clusterState = ClusterState.builder(clusterState)
                .blocks(clusterBlocks)
                .nodes(discoveryNodes)
                .build();

            committedState.set(clusterState);
            clusterApplier.onNewClusterState(reason, this::clusterState, (source, e) -> {}); // don't wait for state to be applied
        }
    }

    private class JoinThreadControl {

        private final AtomicBoolean running = new AtomicBoolean(false);
        private final AtomicReference<Thread> currentJoinThread = new AtomicReference<>();

        /** returns true if join thread control is started and there is currently an active join thread */
        public boolean joinThreadActive() {
            Thread currentThread = currentJoinThread.get();
            return running.get() && currentThread != null && currentThread.isAlive();
        }

        /** returns true if join thread control is started and the supplied thread is the currently active joinThread */
        public boolean joinThreadActive(Thread joinThread) {
            return running.get() && joinThread.equals(currentJoinThread.get());
        }

        /** cleans any running joining thread and calls {@link #rejoin} */
        public void stopRunningThreadAndRejoin(String reason) {
            assert Thread.holdsLock(stateMutex);
            currentJoinThread.set(null);
            rejoin(reason);
        }

        /** starts a new joining thread if there is no currently active one and join thread controlling is started */
        public void startNewThreadIfNotRunning() {
            assert Thread.holdsLock(stateMutex);
            if (joinThreadActive()) {
                return;
            }
            threadPool.generic().execute(new Runnable() {
                @Override
                public void run() {
                    Thread currentThread = Thread.currentThread();
                    if (!currentJoinThread.compareAndSet(null, currentThread)) {
                        return;
                    }
                    while (running.get() && joinThreadActive(currentThread)) {
                        try {
                            innerJoinCluster();
                            return;
                        } catch (Exception e) {
                            logger.error("unexpected error while joining cluster, trying again", e);
                            // Because we catch any exception here, we want to know in
                            // tests if an uncaught exception got to this point and the test infra uncaught exception
                            // leak detection can catch this. In practise no uncaught exception should leak
                            assert ExceptionsHelper.reThrowIfNotNull(e);
                        }
                    }
                    // cleaning the current thread from currentJoinThread is done by explicit calls.
                }
            });
        }

        /**
         * marks the given joinThread as completed and makes sure another thread is running (starting one if needed)
         * If the given thread is not the currently running join thread, the command is ignored.
         */
        public void markThreadAsDoneAndStartNew(Thread joinThread) {
            assert Thread.holdsLock(stateMutex);
            if (!markThreadAsDone(joinThread)) {
                return;
            }
            startNewThreadIfNotRunning();
        }

        /** marks the given joinThread as completed. Returns false if the supplied thread is not the currently active join thread */
        public boolean markThreadAsDone(Thread joinThread) {
            assert Thread.holdsLock(stateMutex);
            return currentJoinThread.compareAndSet(joinThread, null);
        }

        public void stop() {
            running.set(false);
            Thread joinThread = currentJoinThread.getAndSet(null);
            if (joinThread != null) {
                joinThread.interrupt();
            }
        }

        public void start() {
            running.set(true);
        }

    }
  • ZenDiscovery’s MasterNodeFailureListener implements the MasterFaultDetection.Listener interface, and its onMasterFailure method executes the handleMasterGone method. The handleMasterGone method is mainly to execute PendingStateQueue. FailallStatesAndClear and then rejoin
  • The rejoin method first executes nodesFD.stop () and masterFD.stop(reason), then triggers joinhreadcontrol.startnewthreadrunning (), and finally constructs a new clusterState and executes clusterapplier.onnewclusterstate.
  • The joinhreadcontrol.startnetthreadadifnotrunning () method mainly executes the innerJoinCluster method

Summary

  • FaultDetection implements the closeable interface, which defines FDConnectionListener. Its constructor will add FDConnectionListener to transportService when registerConnectionListener is true, while the Close method removes FDConnectionListener from transportService. FaultDetection also defines the abstract method handleTransportDisconnect
  • MasterFaultDetection inherits FaultDetection and its constructor registers MasterPingRequestHandler; with transportService. Its handleTransportDisconnect method will retry node if connectnetworkdisconnect is true, re-register the delay task of MasterPinger if retry succeeds, and call notifyMasterFailure method if retry fails or connectnetworkdisconnect is false; The notifyMasterFailure method calls back the onMasterFailure method of the MasterFaultDetection.Listener
  • ZenDiscovery’s MasterNodeFailureListener implements the MasterFaultDetection.Listener interface, and its onMasterFailure method executes the handleMasterGone method. The handleMasterGone method is mainly to execute PendingStateQueue. FailallStatesAndClear and then rejoin
  • ZenDiscovery’s processnextcommmitedclusterstate method will execute the masterFD.restart method when the current node is not masterwhen masterFD.master Node () is null or masterFD.masterNode () and newclusterstate.nodes (). getmasternode ().
  • Inside the restart method of MasterFaultDetection, innerStop is executed first, and then innerStart; is executed. InnerStop mainly executes masterPinger.stop () and sets masterPinger and masterNode to null; ; The innerStart method creates and registers a delay task of MasterPinger and delays pingInterval execution.
  • The run method of MasterPinger first judges whether masterToPing is null, and if it is null, it registers the delayed task of MasterPinger; If it is not null, send MasterPingRequest request to masterToPing; ; If the request is successful, it will empty MasterFault Detection. This. RetryCount, and then judge whether masterNode has changed. If there is no change, it will continue to register the delay task of MasterPinger. If the request fails, it will be handled differently according to the exception, such as executing the handleTransportDisconnect method, or executing the notifyMasterFailure method, or retried.

doc