Talk about NodesFaultDetection of elasticsearch.

  elasticsearch

Order

This article mainly studies NodesFaultDetection of elasticsearch.

NodesFaultDetection

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

public class NodesFaultDetection extends AbstractComponent {

    public static interface Listener {

        void onNodeFailure(DiscoveryNode node, String reason);
    }

    private final ThreadPool threadPool;

    private final TransportService transportService;


    private final boolean connectOnNetworkDisconnect;

    private final TimeValue pingInterval;

    private final TimeValue pingRetryTimeout;

    private final int pingRetryCount;

    // used mainly for testing, should always be true
    private final boolean registerConnectionListener;


    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();

    private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();

    private final FDConnectionListener connectionListener;

    private volatile DiscoveryNodes latestNodes = EMPTY_NODES;

    private volatile boolean running = false;

    public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService) {
        super(settings);
        this.threadPool = threadPool;
        this.transportService = transportService;

        this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", true);
        this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
        this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
        this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);
        this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true);

        logger.debug("[node  ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);

        transportService.registerHandler(PingRequestHandler.ACTION, new PingRequestHandler());

        this.connectionListener = new FDConnectionListener();
        if (registerConnectionListener) {
            transportService.addConnectionListener(connectionListener);
        }
    }

    public NodesFaultDetection start() {
        if (running) {
            return this;
        }
        running = true;
        return this;
    }

    public NodesFaultDetection stop() {
        if (!running) {
            return this;
        }
        running = false;
        return this;
    }

    public void close() {
        stop();
        transportService.removeHandler(PingRequestHandler.ACTION);
        transportService.removeConnectionListener(connectionListener);
    }

    //......
}
  • NodesFaultDetection inherits AbstractComponent, which defines a listeners of CopyOnWriteArrayList type, nodesFD, connectionListener, latestNodes, running and other attributes of a ConcurrentMap.
  • Its constructor reads connect_on_network_disconnect (Default true)、ping_interval(Default 1s)、ping_timeout(Default 30s)、ping_retries(The default is 3)、register_connection_listener(Default true) configuration, and then to the transportService registered PingRequestHandler.ACTION’s PingRequestHandler, added FDConnectionListener
  • The start method is used to set running to true; ; Stop is used to set running to false; ; The close method executes stop first, then removes the handler for PingRequestHandler.ACTION from the transportService and removes the connectionListener.

PingRequestHandler

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

    class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {

        public static final String ACTION = "discovery/zen/fd/ping";

        @Override
        public PingRequest newInstance() {
            return new PingRequest();
        }

        @Override
        public void messageReceived(PingRequest request, TransportChannel channel) throws Exception {
            // if we are not the node we are supposed to be pinged, send an exception
            // this can happen when a kill -9 is sent, and another node is started using the same port
            if (!latestNodes.localNodeId().equals(request.nodeId)) {
                throw new ElasticSearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]");
            }
            channel.sendResponse(new PingResponse());
        }

        @Override
        public String executor() {
            return ThreadPool.Names.SAME;
        }
    }

    static class PingRequest extends TransportRequest {

        // the (assumed) node id we are pinging
        private String nodeId;

        PingRequest() {
        }

        PingRequest(String nodeId) {
            this.nodeId = nodeId;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            nodeId = in.readString();
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeString(nodeId);
        }
    }

    private static class PingResponse extends TransportResponse {

        private PingResponse() {
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
        }
    }
  • PingRequestHandler’s newInstance method is used to create a PingRequest, which defines the nodeId attribute to identify the target nodeid it wants to request; The messageReceived method is used to respond to the PingRequest request. It will first determine whether the target nodeId is consistent with the loca lNodeId, and return the PingResponse if it is consistent.

FDConnectionListener

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

    private class FDConnectionListener implements TransportConnectionListener {
        @Override
        public void onNodeConnected(DiscoveryNode node) {
        }

        @Override
        public void onNodeDisconnected(DiscoveryNode node) {
            handleTransportDisconnect(node);
        }
    }

    private void handleTransportDisconnect(DiscoveryNode node) {
        if (!latestNodes.nodeExists(node.id())) {
            return;
        }
        NodeFD nodeFD = nodesFD.remove(node);
        if (nodeFD == null) {
            return;
        }
        if (!running) {
            return;
        }
        nodeFD.running = false;
        if (connectOnNetworkDisconnect) {
            try {
                transportService.connectToNode(node);
                nodesFD.put(node, new NodeFD());
                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(node));
            } catch (Exception e) {
                logger.trace("[node  ] [{}] transport disconnected (with verified connect)", node);
                notifyNodeFailure(node, "transport disconnected (with verified connect)");
            }
        } else {
            logger.trace("[node  ] [{}] transport disconnected", node);
            notifyNodeFailure(node, "transport disconnected");
        }
    }

    private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
        threadPool.generic().execute(new Runnable() {
            @Override
            public void run() {
                for (Listener listener : listeners) {
                    listener.onNodeFailure(node, reason);
                }
            }
        });
    }
  • FDConnectionListener executes handleTransportDisconnect; when onNodeDisconnected; This method will remove the node from nodesFD and mark the running of nodeFD as false.
  • If connectOnNetworkDisconnect is true, connect the node; if successful, place the node in nodesFD, register the delay task of SendPingRequest for the node, and delay pingInterval for execution; If connect exception or connectOnNetworkDisconnect is false, no notifyNodeFailure method is executed
  • The notifyNodeFailure method will trigger the nodesfault detection.listener.ononNodeFailure callback, which calls back the nodefailure method of NodeFailureListener of ZenDiscovery.

ZenDiscovery

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {
    //......

    @Inject
    public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
                        TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
                        DiscoveryNodeService discoveryNodeService, ZenPingService pingService) {
        super(settings);
        this.clusterName = clusterName;
        this.threadPool = threadPool;
        this.clusterService = clusterService;
        this.transportService = transportService;
        this.discoveryNodeService = discoveryNodeService;
        this.pingService = pingService;

        // also support direct discovery.zen settings, for cases when it gets extended
        this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
        this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true);

        this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client", true);
        this.masterElectionFilterDataNodes = settings.getAsBoolean("discovery.zen.master_election.filter_data", false);

        logger.debug("using ping.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);

        this.electMaster = new ElectMasterService(settings);
        nodeSettingsService.addListener(new ApplySettings());

        this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
        this.masterFD.addListener(new MasterNodeFailureListener());

        this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
        this.nodesFD.addListener(new NodeFailureListener());

        this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());
        this.pingService.setNodesProvider(this);
        this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());

        transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler());
    }

    protected void doStart() throws ElasticSearchException {
        Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
        // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
        String nodeId = UUID.randomBase64UUID();
        localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes);
        latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
        nodesFD.updateNodes(latestDiscoNodes);
        pingService.start();

        // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
        asyncJoinCluster();
    }

    public void publish(ClusterState clusterState) {
        if (!master) {
            throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master");
        }
        latestDiscoNodes = clusterState.nodes();
        nodesFD.updateNodes(clusterState.nodes());
        publishClusterState.publish(clusterState);
    }

    private class NodeFailureListener implements NodesFaultDetection.Listener {

        @Override
        public void onNodeFailure(DiscoveryNode node, String reason) {
            handleNodeFailure(node, reason);
        }
    }

    private void handleNodeFailure(final DiscoveryNode node, String reason) {
        if (lifecycleState() != Lifecycle.State.STARTED) {
            // not started, ignore a node failure
            return;
        }
        if (!master) {
            // nothing to do here...
            return;
        }
        clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ProcessedClusterStateUpdateTask() {
            @Override
            public ClusterState execute(ClusterState currentState) {
                DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
                        .putAll(currentState.nodes())
                        .remove(node.id());
                latestDiscoNodes = builder.build();
                currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
                // check if we have enough master nodes, if not, we need to move into joining the cluster again
                if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
                    return rejoin(currentState, "not enough master nodes");
                }
                // eagerly run reroute to remove dead nodes from routing table
                RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build());
                return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
            }

            @Override
            public void clusterStateProcessed(ClusterState clusterState) {
                sendInitialStateEventIfNeeded();
            }
        });
    }

    //......
}
  • ZenDiscovery’s constructor created NodesFaultDetection and added NodeFailureListener; to it; The listener implements the nodesFaultDetection.Listener interface, and its onNodeFailure callback executes the handleNodeFailure method, which executes ProcessDclusterStateUtateTask to remove the Node from currentState.nodes () . Then judge whether the number of masterNode meets the requirements of minimumMasterNodes, if not, execute the rejoin method, if not, execute allocationService.reroute
  • Its doStart method will create localNode according to the node configuration in the configuration file, then add it to latestnodes, then execute the nodesfd.updatenodes (latestnodes) method, and then pingService.start () and asyncJoinCluster ()
  • Its publish method updates the local latestnodes according to clusterState nodes, then executes nodesfd. updatenodes (latestnodes) method, and finally executes publishclusterstate. publish (clusterstate)

NodesFaultDetection.updateNodes

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

public class NodesFaultDetection extends AbstractComponent {
    //......

    public void updateNodes(DiscoveryNodes nodes) {
        DiscoveryNodes prevNodes = latestNodes;
        this.latestNodes = nodes;
        if (!running) {
            return;
        }
        DiscoveryNodes.Delta delta = nodes.delta(prevNodes);
        for (DiscoveryNode newNode : delta.addedNodes()) {
            if (newNode.id().equals(nodes.localNodeId())) {
                // no need to monitor the local node
                continue;
            }
            if (!nodesFD.containsKey(newNode)) {
                nodesFD.put(newNode, new NodeFD());
                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(newNode));
            }
        }
        for (DiscoveryNode removedNode : delta.removedNodes()) {
            nodesFD.remove(removedNode);
        }
    }

    //......
}
  • NodesFaultDetection provides the updatenodes method to update its latestNodes. The method calls nodes.delta(prevNodes) to calculate DiscoveryNodes.Delta. Its addedNodes method returns the newly added Node, while the emovedNodes () method returns the deleted Node
  • For newNode, it first determines whether it is in nodesFD. if it is not, it will be added to nodesFD, and a delay task of SendPingRequest will be registered and pingInterval will be delayed for execution.
  • For removedNode, remove it from nodesFD; The handleTransportDisconnect method also removes a disconnect node from ndoesFD, and if the retry succeeds, it will be placed in nodesFD again.

SendPingRequest

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

    private class SendPingRequest implements Runnable {

        private final DiscoveryNode node;

        private SendPingRequest(DiscoveryNode node) {
            this.node = node;
        }

        @Override
        public void run() {
            if (!running) {
                return;
            }
            transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout),
                    new BaseTransportResponseHandler<PingResponse>() {
                        @Override
                        public PingResponse newInstance() {
                            return new PingResponse();
                        }

                        @Override
                        public void handleResponse(PingResponse response) {
                            if (!running) {
                                return;
                            }
                            NodeFD nodeFD = nodesFD.get(node);
                            if (nodeFD != null) {
                                if (!nodeFD.running) {
                                    return;
                                }
                                nodeFD.retryCount = 0;
                                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, SendPingRequest.this);
                            }
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            // check if the master node did not get switched on us...
                            if (!running) {
                                return;
                            }
                            if (exp instanceof ConnectTransportException) {
                                // ignore this one, we already handle it by registering a connection listener
                                return;
                            }
                            NodeFD nodeFD = nodesFD.get(node);
                            if (nodeFD != null) {
                                if (!nodeFD.running) {
                                    return;
                                }
                                int retryCount = ++nodeFD.retryCount;
                                logger.trace("[node  ] failed to ping [{}], retry [{}] out of [{}]", exp, node, retryCount, pingRetryCount);
                                if (retryCount >= pingRetryCount) {
                                    logger.debug("[node  ] failed to ping [{}], tried [{}] times, each with  maximum [{}] timeout", node, pingRetryCount, pingRetryTimeout);
                                    // not good, failure
                                    if (nodesFD.remove(node) != null) {
                                        notifyNodeFailure(node, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
                                    }
                                } else {
                                    // resend the request, not reschedule, rely on send timeout
                                    transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()),
                                            options().withHighType().withTimeout(pingRetryTimeout), this);
                                }
                            }
                        }

                        @Override
                        public String executor() {
                            return ThreadPool.Names.SAME;
                        }
                    });
        }
    }
  • The SendPingRequest method sends a PingRequest to the target node with a timeout of pingRetryTimeout; ; Its handleResponse method will determine whether the node is in nodesFD. if it has been removed, it will be ignored. if the running of nodeFD is changed to false, it will also be ignored. otherwise, it will reset its retryCount, re-register the delay task of SendPingRequest, and delay pingInterval execution.
  • If the request has a TransportException, it is judged whether it is a ConnectTransportException, and if it is, it is ignored because the exception has been handled by onNodeDisconnected of FDConnectionListener registered with transportService.
  • If there are other exceptions, nodeFD.retryCount will be increased. when retryCount is greater than or equal to configured pingRetryCount, node will be removed from nodesFD and the notifyNodeFailure method will be recalled, specifically, the handleNodeFailure method of ZenDiscovery will be recalled. If the configured pingRetryCount is not exceeded, it will retry and resend the PingRequest request.

Summary

  • NodesFaultDetection registered PingRequestHandler.ACTION’s PingRequestHandler with transportService, adding FDConnectionListener; ; PingRequestHandler is used to respond to a PingRequest request and return a PingResponse; ; FDConnectionListener is used to handle ConnectTransportException exceptions
  • The onnodeDisconnected method of FDConnectionListener will remove the Node from nodesFD, marking the running of the nodeFD as false;; If connectOnNetworkDisconnect is true, it will be retried (Connect the node, and if successful, put it into nodesFD, register the delay task of SendPingRequest for the node, and delay pingInterval for execution.); If connect exception or connectOnNetworkDisconnect is false, no notifyNodeFailure method is executed; The notifyNodeFailure method will trigger the nodesfault detection.listener.ononNodeFailure callback, which calls back the nodefailure method of NodeFailureListener of ZenDiscovery.
  • ZenDiscovery’s doStart method and publish method both execute NodesFaultDetection’s updateNodes method to update latestNodes, and register the delayed task SendPingRequest for new node.
  • SendPingRequest will reset the retryCount and continue to register SendPingRequest’s delayed tasks when it is successfully executed. If it is not a Tra nsportException, it will retry, and if the number of retries exceeds the limit, it will trigger notifyNodeFailure. Callback to nodesfault detection.listener.onNodeFailure method, here callback to NodeFailureListener’s onnodefailere method of ZenDiscovery.
  • ZenDiscovery’s NodeFailureListener implements the NodesFaultDetection.Listener interface, and its onNodeFailure callback executes the handleNodeFailure method, which executes processdclusterstateupdatetask. Remove the node from currentState.nodes (), and then judge whether the number of masterNode meets the requirements of minimumMasterNodes. if not, the rejoin method will be executed, and if not, the allocationService.reroute will be executed.

doc