Talk about MembershipAction of elasticsearch.

  elasticsearch

Order

This article mainly studies the MembershipAction of elasticsearch.

MembershipAction

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

public class MembershipAction {

    private static final Logger logger = LogManager.getLogger(MembershipAction.class);

    public static final String DISCOVERY_JOIN_ACTION_NAME = "internal:discovery/zen/join";
    public static final String DISCOVERY_JOIN_VALIDATE_ACTION_NAME = "internal:discovery/zen/join/validate";
    public static final String DISCOVERY_LEAVE_ACTION_NAME = "internal:discovery/zen/leave";

    //......

    private final TransportService transportService;

    private final MembershipListener listener;

    public MembershipAction(TransportService transportService, MembershipListener listener,
                            Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators) {
        this.transportService = transportService;
        this.listener = listener;


        transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
            ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
        transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
            () -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC,
            new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators));
        transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
            ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
    }

    public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {
        transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode),
            EmptyTransportResponseHandler.INSTANCE_SAME);
    }

    public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {
        transportService.submitRequest(masterNode, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(node),
            EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
    }

    public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {
        transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node),
            EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
    }

    /**
     * Validates the join request, throwing a failure if it failed.
     */
    public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState state, TimeValue timeout) {
        transportService.submitRequest(node, DISCOVERY_JOIN_VALIDATE_ACTION_NAME, new ValidateJoinRequest(state),
            EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
    }
    //......
}
  • MembershipAction defines three types of requests, namely LeaveRequest, JoinRequest, ValidateJoinRequest; ; At the same time, the TransportRequestHandler for these requests is also defined, namely LeaverRequestHandler, LeaveRequestRequestHandler, ValidateJoinRequestRequestHandler

TransportRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/transport/TransportRequest.java

public abstract class TransportRequest extends TransportMessage implements TaskAwareRequest {
    public static class Empty extends TransportRequest {
        public static final Empty INSTANCE = new Empty();
    }

    /**
     * Parent of this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "no parent".
     */
    private TaskId parentTaskId = TaskId.EMPTY_TASK_ID;

    public TransportRequest() {
    }

    public TransportRequest(StreamInput in) throws IOException {
        parentTaskId = TaskId.readFromStream(in);
    }

    /**
     * Set a reference to task that created this request.
     */
    @Override
    public void setParentTask(TaskId taskId) {
        this.parentTaskId = taskId;
    }

    /**
     * Get a reference to the task that created this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "there is no parent".
     */
    @Override
    public TaskId getParentTask() {
        return parentTaskId;
    }

    @Override
    public void readFrom(StreamInput in) throws IOException {
        super.readFrom(in);
        parentTaskId = TaskId.readFromStream(in);
    }

    @Override
    public void writeTo(StreamOutput out) throws IOException {
        super.writeTo(out);
        parentTaskId.writeTo(out);
    }
}
  • TransportRequest inherits the TransportMessage class and declares to implement the TaskAwareRequest interface.

LeaveRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    public static class LeaveRequest extends TransportRequest {

        private DiscoveryNode node;

        public LeaveRequest() {
        }

        private LeaveRequest(DiscoveryNode node) {
            this.node = node;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            node = new DiscoveryNode(in);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            node.writeTo(out);
        }
    }
  • LeaveRequest inherits TransportRequest and covers readFrom and writeTo methods. In addition to calling the corresponding method of the parent class, it also reads or writes DiscoveryNode at the same time.

JoinRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    public static class JoinRequest extends TransportRequest {

        private DiscoveryNode node;

        public DiscoveryNode getNode() {
            return node;
        }

        public JoinRequest() {
        }

        private JoinRequest(DiscoveryNode node) {
            this.node = node;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            node = new DiscoveryNode(in);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            node.writeTo(out);
        }
    }
  • JoinRequest inherits TransportRequest and covers readFrom and writeTo methods. besides calling the corresponding method of the parent class, join request also reads or writes DiscoveryNode at the same time.

ValidateJoinRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    static class ValidateJoinRequest extends TransportRequest {
        private ClusterState state;

        ValidateJoinRequest() {}

        ValidateJoinRequest(ClusterState state) {
            this.state = state;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            this.state = ClusterState.readFrom(in, null);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            this.state.writeTo(out);
        }
    }
  • ValidateJoinRequest inherits the TransportRequest and covers readFrom and writeTo methods. besides calling the corresponding method of the parent class , it also reads or writes ClusterState at the same time.

TransportRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/transport/TransportRequestHandler.java

public interface TransportRequestHandler<T extends TransportRequest> {

    /**
     * Override this method if access to the Task parameter is needed
     */
    default void messageReceived(final T request, final TransportChannel channel, Task task) throws Exception {
        messageReceived(request, channel);
    }

    void messageReceived(T request, TransportChannel channel) throws Exception;
}
  • The TransportRequestHandler interface defines the messageReceived method and also provides a default method for messageReceived.

LeaveRequestRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    private class LeaveRequestRequestHandler implements TransportRequestHandler<LeaveRequest> {

        @Override
        public void messageReceived(LeaveRequest request, TransportChannel channel) throws Exception {
            listener.onLeave(request.node);
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        }
    }
  • LeaveRequestRequestHandler implements the TransportRequestHandler interface, and its messageReceived mainly calls the onLeave method of the MembershipListener.

JoinRequestRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    private class JoinRequestRequestHandler implements TransportRequestHandler<JoinRequest> {

        @Override
        public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception {
            listener.onJoin(request.getNode(), new JoinCallback() {
                @Override
                public void onSuccess() {
                    try {
                        channel.sendResponse(TransportResponse.Empty.INSTANCE);
                    } catch (Exception e) {
                        onFailure(e);
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    try {
                        channel.sendResponse(e);
                    } catch (Exception inner) {
                        inner.addSuppressed(e);
                        logger.warn("failed to send back failure on join request", inner);
                    }
                }
            });
        }
    }
  • JoinRequestRequestHandler implements the TransportRequestHandler interface, and its messageReceived mainly calls the onJoin method of the MembershipListener.

ValidateJoinRequestRequestHandler

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
        private final Supplier<DiscoveryNode> localNodeSupplier;
        private final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators;

        ValidateJoinRequestRequestHandler(Supplier<DiscoveryNode> localNodeSupplier,
                                          Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
            this.localNodeSupplier = localNodeSupplier;
            this.joinValidators = joinValidators;
        }

        @Override
        public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
            DiscoveryNode node = localNodeSupplier.get();
            assert node != null : "local node is null";
            joinValidators.stream().forEach(action -> action.accept(node, request.state));
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        }
    }
  • ValidateJoinRequestHandler implements TransportRequestHandler interface, and its messageReceived is mainly to call joinValidators to check one by one.

MembershipAction.MembershipListener

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

    public interface MembershipListener {
        void onJoin(DiscoveryNode node, JoinCallback callback);

        void onLeave(DiscoveryNode node);
    }

    public interface JoinCallback {
        void onSuccess();

        void onFailure(Exception e);
    }
  • The MembershipListener interface defines onJoin and onLeave methods, where the onJoin method receives a JoinCallback; ; It has an implementation class with the same name, in the ZenDiscovery class

ZenDiscovery.MembershipListener

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {

    //......

    private class MembershipListener implements MembershipAction.MembershipListener {
        @Override
        public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {
            handleJoinRequest(node, ZenDiscovery.this.clusterState(), callback);
        }

        @Override
        public void onLeave(DiscoveryNode node) {
            handleLeaveRequest(node);
        }
    }

    void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final MembershipAction.JoinCallback callback) {
        if (nodeJoinController == null) {
            throw new IllegalStateException("discovery module is not yet started");
        } else {
            // we do this in a couple of places including the cluster update thread. This one here is really just best effort
            // to ensure we fail as fast as possible.
            onJoinValidators.stream().forEach(a -> a.accept(node, state));
            if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
                MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());
            }
            // try and connect to the node, if it fails, we can raise an exception back to the client...
            transportService.connectToNode(node);

            // validate the join request, will throw a failure if it fails, which will get back to the
            // node calling the join request
            try {
                membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);
            } catch (Exception e) {
                logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node),
                    e);
                callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
                return;
            }
            nodeJoinController.handleJoinRequest(node, callback);
        }
    }

    private void handleLeaveRequest(final DiscoveryNode node) {
        if (lifecycleState() != Lifecycle.State.STARTED) {
            // not started, ignore a node failure
            return;
        }
        if (localNodeMaster()) {
            removeNode(node, "zen-disco-node-left", "left");
        } else if (node.equals(clusterState().nodes().getMasterNode())) {
            handleMasterGone(node, null, "shut_down");
        }
    }

    private void removeNode(final DiscoveryNode node, final String source, final String reason) {
        masterService.submitStateUpdateTask(
                source + "(" + node + "), reason(" + reason + ")",
                new NodeRemovalClusterStateTaskExecutor.Task(node, reason),
                ClusterStateTaskConfig.build(Priority.IMMEDIATE),
                nodeRemovalExecutor,
                nodeRemovalExecutor);
    }

    private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
        if (lifecycleState() != Lifecycle.State.STARTED) {
            // not started, ignore a master failure
            return;
        }
        if (localNodeMaster()) {
            // we might get this on both a master telling us shutting down, and then the disconnect failure
            return;
        }

        logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);

        synchronized (stateMutex) {
            if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
                // flush any pending cluster states from old master, so it will not be set as master again
                pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
                rejoin("master left (reason = " + reason + ")");
            }
        }
    }

    //......
}
  • The onJoin method of zendiscovery.membershiptener calls the handleJoinRequest method, which mainly calls nodejoincontroller.handlejoinrequest (node, callback); The onLeave method calls the handleLeaveRequest method, which removeNode for local execution, otherwise handleMasterGone is executed.
  • The removeNode method mainly executes masterservice.submitstateupdatetask, and the ClusterStateTaskExecutor and ClusterStateTaskListener passed are nodeRemovalExecutor.
  • The handleMasterGone method is mainly to execute PendingStateQueue. FailallStateAndClear (New ElasticeSearchException (“masterleft [{}]”, Reason))

NodeJoinController.handleJoinRequest

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

public class NodeJoinController {

    //......

    /**
     * processes or queues an incoming join request.
     * <p>
     * Note: doesn't do any validation. This should have been done before.
     */
    public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
        if (electionContext != null) {
            electionContext.addIncomingJoin(node, callback);
            checkPendingJoinsAndElectIfNeeded();
        } else {
            masterService.submitStateUpdateTask("zen-disco-node-join",
                node, ClusterStateTaskConfig.build(Priority.URGENT),
                joinTaskExecutor, new JoinTaskListener(callback, logger));
        }
    }

    //......
}
  • NodeJoinController’s handleJoinRequest method executes the electionContext. AddIncomingJoin when the ElectionContext is not null; Otherwise, masterservice.submitstateupdatetask is executed, and the passed ClusterStateTaskExecutor is joinTaskExecutor and ClusterStateTaskListener is JoinTaskListener.

MasterService.submitStateUpdateTask

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

public class MasterService extends AbstractLifecycleComponent {
    //......

    public <T> void submitStateUpdateTask(String source, T task,
                                          ClusterStateTaskConfig config,
                                          ClusterStateTaskExecutor<T> executor,
                                          ClusterStateTaskListener listener) {
        submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
    }

    public <T> void submitStateUpdateTasks(final String source,
                                           final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
                                           final ClusterStateTaskExecutor<T> executor) {
        if (!lifecycle.started()) {
            return;
        }
        final ThreadContext threadContext = threadPool.getThreadContext();
        final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
        try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
            threadContext.markAsSystemContext();

            List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
                .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor))
                .collect(Collectors.toList());
            taskBatcher.submitTasks(safeTasks, config.timeout());
        } catch (EsRejectedExecutionException e) {
            // ignore cases where we are shutting down..., there is really nothing interesting
            // to be done here...
            if (!lifecycle.stoppedOrClosed()) {
                throw e;
            }
        }
    }

    class Batcher extends TaskBatcher {

        Batcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {
            super(logger, threadExecutor);
        }

        @Override
        protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {
            threadPool.generic().execute(
                () -> tasks.forEach(
                    task -> ((UpdateTask) task).listener.onFailure(task.source,
                        new ProcessClusterEventTimeoutException(timeout, task.source))));
        }

        @Override
        protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
            ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
            List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
            runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
        }

        class UpdateTask extends BatchedTask {
            final ClusterStateTaskListener listener;

            UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,
                       ClusterStateTaskExecutor<?> executor) {
                super(priority, source, executor, task);
                this.listener = listener;
            }

            @Override
            public String describeTasks(List<? extends BatchedTask> tasks) {
                return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(
                    tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));
            }
        }
    }

    protected class TaskInputs {
        public final String summary;
        public final List<Batcher.UpdateTask> updateTasks;
        public final ClusterStateTaskExecutor<Object> executor;

        TaskInputs(ClusterStateTaskExecutor<Object> executor, List<Batcher.UpdateTask> updateTasks, String summary) {
            this.summary = summary;
            this.executor = executor;
            this.updateTasks = updateTasks;
        }

        public boolean runOnlyWhenMaster() {
            return executor.runOnlyOnMaster();
        }

        public void onNoLongerMaster() {
            updateTasks.forEach(task -> task.listener.onNoLongerMaster(task.source()));
        }
    }

    //......
}
  • The submitStateUpdateTasks method is mainly to create Batcher.UpdateTask, and then submit it to run through taskBatcher.submitTasks; Batcher inherits TaskBatcher. Its run method is to call the runTasks method, and the passed parameter is TaskInputs; . RunOnlyWhenMaster method of TaskInputs calls executor.runOnlyOnMaster (), onNoLongerMaster calls Task. Listener. Onnolongermaster (Task. Source ())

ClusterStateTaskExecutor

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java

public interface ClusterStateTaskExecutor<T> {

    ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;

    default boolean runOnlyOnMaster() {
        return true;
    }

    default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
    }

    default String describeTasks(List<T> tasks) {
        return String.join(", ", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() > 0)::iterator);
    }

    //......
}
  • ClusterStateTaskExecutor defines the execute method and provides several default methods such as runOnlyOnMaster, clusterStatePublished and describeTasks. It has many implementation classes, such as JoinTaskExecutor, noderemovalclusterstatetaskexecutor

JoinTaskExecutor

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

    // visible for testing
    public static class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {

        private final AllocationService allocationService;

        private final ElectMasterService electMasterService;

        private final Logger logger;

        private final int minimumMasterNodesOnLocalNode;

        public JoinTaskExecutor(Settings settings, AllocationService allocationService, ElectMasterService electMasterService,
                                Logger logger) {
            this.allocationService = allocationService;
            this.electMasterService = electMasterService;
            this.logger = logger;
            minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
        }

        @Override
        public ClusterTasksResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
            final ClusterTasksResult.Builder<DiscoveryNode> results = ClusterTasksResult.builder();

            final DiscoveryNodes currentNodes = currentState.nodes();
            boolean nodesChanged = false;
            ClusterState.Builder newState;

            if (joiningNodes.size() == 1  && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) {
                return results.successes(joiningNodes).build(currentState);
            } else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {
                assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes;
                // use these joins to try and become the master.
                // Note that we don't have to do any validation of the amount of joining nodes - the commit
                // during the cluster state publishing guarantees that we have enough
                newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes);
                nodesChanged = true;
            } else if (currentNodes.isLocalNodeElectedMaster() == false) {
                logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());
                throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request");
            } else {
                newState = ClusterState.builder(currentState);
            }

            DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());

            assert nodesBuilder.isLocalNodeElectedMaster();

            Version minClusterNodeVersion = newState.nodes().getMinNodeVersion();
            Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion();
            // we only enforce major version transitions on a fully formed clusters
            final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false;
            // processing any joins
            for (final DiscoveryNode node : joiningNodes) {
                if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {
                    // noop
                } else if (currentNodes.nodeExists(node)) {
                    logger.debug("received a join request for an existing node [{}]", node);
                } else {
                    try {
                        if (enforceMajorVersion) {
                            MembershipAction.ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion);
                        }
                        MembershipAction.ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
                        // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
                        // we have to reject nodes that don't support all indices we have in this cluster
                        MembershipAction.ensureIndexCompatibility(node.getVersion(), currentState.getMetaData());
                        nodesBuilder.add(node);
                        nodesChanged = true;
                        minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
                        maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
                    } catch (IllegalArgumentException | IllegalStateException e) {
                        results.failure(node, e);
                        continue;
                    }
                }
                results.success(node);
            }
            if (nodesChanged) {
                newState.nodes(nodesBuilder);
                return results.build(allocationService.reroute(newState.build(), "node_join"));
            } else {
                // we must return a new cluster state instance to force publishing. This is important
                // for the joining node to finalize its join and set us as a master
                return results.build(newState.build());
            }
        }

        //......

        @Override
        public boolean runOnlyOnMaster() {
            // we validate that we are allowed to change the cluster state during cluster state processing
            return false;
        }

        @Override
        public void clusterStatePublished(ClusterChangedEvent event) {
            electMasterService.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
        }
    }
  • JoinTaskExecutor’s execute method is to build ClusterState based on joiningnodes. If Nodes change, then allocate service. reroute (newstate. build (), “node _ join”) is called to reroute location

NodeRemovalClusterStateTaskExecutor

elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

    public static class NodeRemovalClusterStateTaskExecutor
        implements ClusterStateTaskExecutor<NodeRemovalClusterStateTaskExecutor.Task>, ClusterStateTaskListener {

        private final AllocationService allocationService;
        private final ElectMasterService electMasterService;
        private final Consumer<String> rejoin;
        private final Logger logger;

        public static class Task {

            private final DiscoveryNode node;
            private final String reason;

            public Task(final DiscoveryNode node, final String reason) {
                this.node = node;
                this.reason = reason;
            }

            public DiscoveryNode node() {
                return node;
            }

            public String reason() {
                return reason;
            }

            @Override
            public String toString() {
                return node + " " + reason;
            }
        }

        public NodeRemovalClusterStateTaskExecutor(
                final AllocationService allocationService,
                final ElectMasterService electMasterService,
                final Consumer<String> rejoin,
                final Logger logger) {
            this.allocationService = allocationService;
            this.electMasterService = electMasterService;
            this.rejoin = rejoin;
            this.logger = logger;
        }

        @Override
        public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
            final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
            boolean removed = false;
            for (final Task task : tasks) {
                if (currentState.nodes().nodeExists(task.node())) {
                    remainingNodesBuilder.remove(task.node());
                    removed = true;
                } else {
                    logger.debug("node [{}] does not exist in cluster state, ignoring", task);
                }
            }

            if (!removed) {
                // no nodes to remove, keep the current cluster state
                return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);
            }

            final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);

            final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
            if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
                final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
                rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
                                                         masterNodes, electMasterService.minimumMasterNodes()));
                return resultBuilder.build(currentState);
            } else {
                ClusterState ptasksDisassociatedState = PersistentTasksCustomMetaData.disassociateDeadNodes(remainingNodesClusterState);
                return resultBuilder.build(allocationService.disassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks)));
            }
        }

        // visible for testing
        // hook is used in testing to ensure that correct cluster state is used to test whether a
        // rejoin or reroute is needed
        ClusterState remainingNodesClusterState(final ClusterState currentState, DiscoveryNodes.Builder remainingNodesBuilder) {
            return ClusterState.builder(currentState).nodes(remainingNodesBuilder).build();
        }

        @Override
        public void onFailure(final String source, final Exception e) {
            logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
        }

        @Override
        public void onNoLongerMaster(String source) {
            logger.debug("no longer master while processing node removal [{}]", source);
        }

    }
  • Noderemovalclusterstatetaskexecutor also implements the clusterstatetaskexecutor and clusterstatetasklistener interfaces. Its execute method is mainly to remove the corresponding node from the currentState and build remainingNodesClusterState. for hasEnoughMasterNodes, allocate service. disassociatedednodes is executed, otherwise, a Consumer named rejoin is executed

Summary

  • MembershipAction defines three types of requests, namely LeaveRequest, JoinRequest, ValidateJoinRequest; ; At the same time, the TransportRequestHandler for these requests is also defined, namely LeaverRequestHandler, LeaveRequestRequestHandler, ValidateJoinRequestRequestHandler
  • LeaveRequestRequestHandler implements the TransportRequestHandler interface. Its messageReceived mainly calls the onLeave method of the MembershipListe ner. JoinRequestRequestHandler implements the TransportRequestHandler interface. Its messageReceived mainly calls the onJoin method of the MembershipL istener. ValidateJoinRequestHandler implements TransportRequestHandler interface, and its messageReceived is mainly to call joinValidators to check one by one.
  • The MembershipListener interface defines onJoin and onLeave methods, where the onJoin method receives a JoinCallback; ; It has an implementation class with the same name, in the ZenDiscovery class; The onJoin method of zendiscovery.membershiptener calls the handleJoinRequest method, which mainly calls nodejoincontroller.handlejoinrequest (node, callback); The onLeave method calls the handleLeaveRequest method, which removeNode for local execution, otherwise handleMasterGone; is executed; The removeNode method is mainly to execute MasterService.submitstateupdatetask, and both the ClusterStateTaskExecutor and ClusterStateTaskListener passed are nodeRemovalExecutor; ; The handleMasterGone method is mainly to execute PendingStateQueue. FailallStateAndClear (New ElasticeSearchException (“masterleft [{}]”, Reason))
  • NodeJoinController’s handleJoinRequest method executes the electionContext. AddIncomingJoin when the ElectionContext is not null; Otherwise, masterservice.submitstateupdatetask is executed, and the passed ClusterStateTaskExecutor is joinTaskExecutor and ClusterStateTaskListener is JoinTaskListener.
  • MasterService’s submitStateUpdateTasks method is mainly to create Batcher.UpdateTask, and then submit it to run through taskBatcher.submitTasks; Batcher inherits TaskBatcher. Its run method is to call the runTasks method, and the passed parameter is TaskInputs; . RunOnlyWhenMaster method of TaskInputs calls executor.runOnlyOnMaster (), onNoLongerMaster calls Task. Listener. Onnolongermaster (Task. Source ())
  • ClusterStateTaskExecutor defines the execute method and provides several default methods such as runOnlyOnMaster, clusterStatePublished and describeTasks. It has many implementation classes, such as JoinTaskExecutor, NoderemovalClusterStatesExecutor; The execute method of JoinTaskExecutor is to build ClusterState according to joiningnodes. If Nodes change, then Allocation Service. Reroute (NewState. Build (), “Node _ Join”) is called to reroute location; Noderemovalclusterstatetaskexecutor also implements the clusterstatetaskexecutor and clusterstatetasklistener interfaces. Its execute method is mainly to remove the corresponding node from the currentState and build remainingNodesClusterState. for hasEnoughMasterNodes, allocate service. disassociatedednodes is executed, otherwise, a Consumer named rejoin is executed

doc