Talk about TransportProxyClient of elasticsearch.

  elasticsearch

Order

This article mainly studies the TransportProxyClient of elasticsearch.

TransportProxyClient

elasticsearch-6.4.3-sources.jar! /org/elasticsearch/client/transport/TransportProxyClient.java

final class TransportProxyClient {

    private final TransportClientNodesService nodesService;
    private final Map<Action, TransportActionNodeProxy> proxies;

    TransportProxyClient(Settings settings, TransportService transportService,
                                TransportClientNodesService nodesService, List<GenericAction> actions) {
        this.nodesService = nodesService;
        Map<Action, TransportActionNodeProxy> proxies = new HashMap<>();
        for (GenericAction action : actions) {
            if (action instanceof Action) {
                proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
            }
        }
        this.proxies = unmodifiableMap(proxies);
    }

    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
        ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action,
                                                                              final Request request, ActionListener<Response> listener) {
        final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
        assert proxy != null : "no proxy found for action: " + action;
        nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
    }
}
  • The constructor of TransportProxyClient receives four parameters: Settings, TransportService, TransportClientNodesService, List<GenericAction >
  • TransportProxyClient’s constructor creates TransportActionNodeProxy for each action based on actions and puts it into a map called proxies
  • TransportProxyClient mainly provides the execute method, which takes out the corresponding TransportActionNodeProxy from proxies, and then executes the proxy. execute method through the EXECUTE method of TransportClientNodesService

TransportActionNodeProxy

elasticsearch-6.4.3-sources.jar! /org/elasticsearch/action/TransportActionNodeProxy.java

public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {

    private final TransportService transportService;
    private final GenericAction<Request, Response> action;
    private final TransportRequestOptions transportOptions;

    public TransportActionNodeProxy(Settings settings, GenericAction<Request, Response> action, TransportService transportService) {
        super(settings);
        this.action = action;
        this.transportService = transportService;
        this.transportOptions = action.transportOptions(settings);
    }

    public void execute(final DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }
        transportService.sendRequest(node, action.name(), request, transportOptions,
            new ActionListenerResponseHandler<>(listener, action::newResponse));
    }
}
  • The constructor of TransportActionNodeProxy requires three parameters: Settings, GenericAction and TransportService. TransportActionNodeProxy provides the execute method. Its method parameters require the input of DiscoveryNode, Request and ActionListener. This method mainly wraps ActionListener as ActionListenerResponseHandler then calls transportService.sendRequest

Nodes

TransportClientNodesService Nodes

elasticsearch-6.4.3-sources.jar! /org/elasticsearch/client/transport/TransportClientNodesService.java

final class TransportClientNodesService extends AbstractComponent implements Closeable {

    private final TimeValue nodesSamplerInterval;

    private final long pingTimeout;

    private final ClusterName clusterName;

    private final TransportService transportService;

    private final ThreadPool threadPool;

    private final Version minCompatibilityVersion;

    // nodes that are added to be discovered
    private volatile List<DiscoveryNode> listedNodes = Collections.emptyList();

    private final Object mutex = new Object();

    private volatile List<DiscoveryNode> nodes = Collections.emptyList();
    // Filtered nodes are nodes whose cluster name does not match the configured cluster name
    private volatile List<DiscoveryNode> filteredNodes = Collections.emptyList();

    private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();

    private final NodeSampler nodesSampler;

    private volatile ScheduledFuture nodesSamplerFuture;

    private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt());

    private final boolean ignoreClusterName;

    private volatile boolean closed;

    private final TransportClient.HostFailureListener hostFailureListener;

    //......

    public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {
        synchronized (mutex) {
            if (closed) {
                throw new IllegalStateException("transport client is closed, can't add an address");
            }
            List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);
            for (TransportAddress transportAddress : transportAddresses) {
                boolean found = false;
                for (DiscoveryNode otherNode : listedNodes) {
                    if (otherNode.getAddress().equals(transportAddress)) {
                        found = true;
                        logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
                        break;
                    }
                }
                if (!found) {
                    filtered.add(transportAddress);
                }
            }
            if (filtered.isEmpty()) {
                return this;
            }
            List<DiscoveryNode> builder = new ArrayList<>(listedNodes);
            for (TransportAddress transportAddress : filtered) {
                DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(),
                        transportAddress, Collections.emptyMap(), Collections.emptySet(), minCompatibilityVersion);
                logger.debug("adding address [{}]", node);
                builder.add(node);
            }
            listedNodes = Collections.unmodifiableList(builder);
            nodesSampler.sample();
        }
        return this;
    }

    public TransportClientNodesService removeTransportAddress(TransportAddress transportAddress) {
        synchronized (mutex) {
            if (closed) {
                throw new IllegalStateException("transport client is closed, can't remove an address");
            }
            List<DiscoveryNode> listNodesBuilder = new ArrayList<>();
            for (DiscoveryNode otherNode : listedNodes) {
                if (!otherNode.getAddress().equals(transportAddress)) {
                    listNodesBuilder.add(otherNode);
                } else {
                    logger.debug("removing address [{}] from listed nodes", otherNode);
                }
            }
            listedNodes = Collections.unmodifiableList(listNodesBuilder);
            List<DiscoveryNode> nodesBuilder = new ArrayList<>();
            for (DiscoveryNode otherNode : nodes) {
                if (!otherNode.getAddress().equals(transportAddress)) {
                    nodesBuilder.add(otherNode);
                } else {
                    logger.debug("disconnecting from node with address [{}]", otherNode);
                    transportService.disconnectFromNode(otherNode);
                }
            }
            nodes = Collections.unmodifiableList(nodesBuilder);
            nodesSampler.sample();
        }
        return this;
    }

    //......

}
  • TransportClientnodesService defines three List attributes about DiscoveryNode, namely listedNodes, Nodes, filteredNodes
  • The addTransportAddresses method updates listednodes, and then calls nodesSampler.sample () to update Nodes and filteredNodes; ; The removeTransportAddress method updates listedNodes, nodes, and then calls nodesSampler.sample () to update nodes and filteredNodes.
  • ListedNodes are node added through the addTransportAddresses method (ClusterNodes are generally specified through configuration files.); The nodesSampler.sample () method will further detect listedNodes, for example, put clusterName that is not currently configured clusterName into filteredNodes, and then establish the connection for the rest and successfully put it into nodes.

TransportClient Nodes

elasticsearch-6.4.3-sources.jar! /org/elasticsearch/client/transport/TransportClient.java

public abstract class TransportClient extends AbstractClient {

    private final TransportClientNodesService nodesService;

    private final TransportProxyClient proxy;

    //......

    /**
     * Returns the current connected transport nodes that this client will use.
     * <p>
     * The nodes include all the nodes that are currently alive based on the transport
     * addresses provided.
     */
    public List<DiscoveryNode> connectedNodes() {
        return nodesService.connectedNodes();
    }

    /**
     * The list of filtered nodes that were not connected to, for example, due to
     * mismatch in cluster name.
     */
    public List<DiscoveryNode> filteredNodes() {
        return nodesService.filteredNodes();
    }

    /**
     * Returns the listed nodes in the transport client (ones added to it).
     */
    public List<DiscoveryNode> listedNodes() {
        return nodesService.listedNodes();
    }

    /**
     * Adds a transport address that will be used to connect to.
     * <p>
     * The Node this transport address represents will be used if its possible to connect to it.
     * If it is unavailable, it will be automatically connected to once it is up.
     * <p>
     * In order to get the list of all the current connected nodes, please see {@link #connectedNodes()}.
     */
    public TransportClient addTransportAddress(TransportAddress transportAddress) {
        nodesService.addTransportAddresses(transportAddress);
        return this;
    }

    /**
     * Adds a list of transport addresses that will be used to connect to.
     * <p>
     * The Node this transport address represents will be used if its possible to connect to it.
     * If it is unavailable, it will be automatically connected to once it is up.
     * <p>
     * In order to get the list of all the current connected nodes, please see {@link #connectedNodes()}.
     */
    public TransportClient addTransportAddresses(TransportAddress... transportAddress) {
        nodesService.addTransportAddresses(transportAddress);
        return this;
    }

    /**
     * Removes a transport address from the list of transport addresses that are used to connect to.
     */
    public TransportClient removeTransportAddress(TransportAddress transportAddress) {
        nodesService.removeTransportAddress(transportAddress);
        return this;
    }

    //......
}
  • TransportClient provides methods of connectedNodes, filteredNodes, and listedNodes. It can be seen that they are all methods corresponding to the called TransportClientNodesService. As can be seen from the comments, connectedNodes return nodes that have already established connections for the client side to use. FilteredNodes returned nodes that were filtered out due to clusterName mismatch, and these nodes will not be used by the client; ListedNodes returns nodes added through addTransportAddresses.

NodeSampler

ScheduledNodeSampler

elasticsearch-6.4.3-sources.jar! /org/elasticsearch/client/transport/TransportClientNodesService.java

    TransportClientNodesService(Settings settings, TransportService transportService,
                                       ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
        super(settings);
        this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
        this.transportService = transportService;
        this.threadPool = threadPool;
        this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();

        this.nodesSamplerInterval = TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
        this.pingTimeout = TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();
        this.ignoreClusterName = TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings);

        if (logger.isDebugEnabled()) {
            logger.debug("node_sampler_interval[{}]", nodesSamplerInterval);
        }

        if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(this.settings)) {
            this.nodesSampler = new SniffNodesSampler();
        } else {
            this.nodesSampler = new SimpleNodeSampler();
        }
        this.hostFailureListener = hostFailureListener;
        this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
    }

    //......

    class ScheduledNodeSampler implements Runnable {
        @Override
        public void run() {
            try {
                nodesSampler.sample();
                if (!closed) {
                    nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this);
                }
            } catch (Exception e) {
                logger.warn("failed to sample", e);
            }
        }
    }

    //......
  • TransportClientNodesService’s constructor will be configured according to settings’ client.transport.sniff (The default is false.) to determine whether to create SniffNodesSampler or SimpleNodeSampler, register a scheduling task through threadPool, and execute ScheduledNodeSampler; every nodesSamplerInterval; ScheduledNodeSampler implements the Runnable interface. Its fun method mainly calls nodesSampler.sample (). After that, as long as TransportClientNodesService does not close, it will continue to register the scheduling task and update nodesSamplerFuture.

NodeSampler

elasticsearch-6.4.3-sources.jar! /org/elasticsearch/client/transport/TransportClientNodesService.java

    abstract class NodeSampler {
        public void sample() {
            synchronized (mutex) {
                if (closed) {
                    return;
                }
                doSample();
            }
        }

        protected abstract void doSample();

        /**
         * Establishes the node connections. If validateInHandshake is set to true, the connection will fail if
         * node returned in the handshake response is different than the discovery node.
         */
        List<DiscoveryNode> establishNodeConnections(Set<DiscoveryNode> nodes) {
            for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext(); ) {
                DiscoveryNode node = it.next();
                if (!transportService.nodeConnected(node)) {
                    try {
                        logger.trace("connecting to node [{}]", node);
                        transportService.connectToNode(node);
                    } catch (Exception e) {
                        it.remove();
                        logger.debug(() -> new ParameterizedMessage("failed to connect to discovered node [{}]", node), e);
                    }
                }
            }

            return Collections.unmodifiableList(new ArrayList<>(nodes));
        }
    }
  • NodeSampler is an abstract class, which defines the sample method and calls the defined abstract method doSample; inside. NodeSampler also provides the establishNodeConnections method, which determines whether a Node is connected through TransportService.NodeConnected (Node). If not, it will try to connect again through TransportService.ConnectTonode (Node). If it throws an exception, it will remove the node, and finally it will return the nodes; with connected detection. It has two subclasses, SimpleNodeSampler and SniffNodesSampler

SimpleNodeSampler

elasticsearch-6.4.3-sources.jar! /org/elasticsearch/client/transport/TransportClientNodesService.java

    class SimpleNodeSampler extends NodeSampler {

        @Override
        protected void doSample() {
            HashSet<DiscoveryNode> newNodes = new HashSet<>();
            ArrayList<DiscoveryNode> newFilteredNodes = new ArrayList<>();
            for (DiscoveryNode listedNode : listedNodes) {
                try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
                    final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
                        new FutureTransportResponseHandler<LivenessResponse>() {
                            @Override
                            public LivenessResponse read(StreamInput in) throws IOException {
                                LivenessResponse response = new LivenessResponse();
                                response.readFrom(in);
                                return response;
                            }
                        });
                    transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
                        TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
                        handler);
                    final LivenessResponse livenessResponse = handler.txGet();
                    if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) {
                        logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
                        newFilteredNodes.add(listedNode);
                    } else {
                        // use discovered information but do keep the original transport address,
                        // so people can control which address is exactly used.
                        DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
                        newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(),
                            nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(),
                            nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
                    }
                } catch (ConnectTransportException e) {
                    logger.debug(() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
                    hostFailureListener.onNodeDisconnected(listedNode, e);
                } catch (Exception e) {
                    logger.info(() -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e);
                }
            }

            nodes = establishNodeConnections(newNodes);
            filteredNodes = Collections.unmodifiableList(newFilteredNodes);
        }
    }
  • SimpleNodeSampler’s doSample method will carry out further survival detection on nodes, mainly by sending LivenessRequest. if LivenessResponse can be successfully returned, it will determine whether clusterName is consistent, add inconsistent to newFilteredNodes, and finally assign value to filteredNodes;; It is added to newNodes in a consistent way. finally, the connection is established through the establishNodeConnections method and the node that fails to connect is removed (Try again) is finally assigned to nodes

SniffNodesSampler

elasticsearch-6.4.3-sources.jar! /org/elasticsearch/client/transport/TransportClientNodesService.java

    class SniffNodesSampler extends NodeSampler {

        @Override
        protected void doSample() {
            // the nodes we are going to ping include the core listed nodes that were added
            // and the last round of discovered nodes
            Set<DiscoveryNode> nodesToPing = new HashSet<>();
            for (DiscoveryNode node : listedNodes) {
                nodesToPing.add(node);
            }
            for (DiscoveryNode node : nodes) {
                nodesToPing.add(node);
            }

            final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
            final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
            try {
                for (final DiscoveryNode nodeToPing : nodesToPing) {
                    threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {

                        /**
                         * we try to reuse existing connections but if needed we will open a temporary connection
                         * that will be closed at the end of the execution.
                         */
                        Transport.Connection connectionToClose = null;

                        void onDone() {
                            try {
                                IOUtils.closeWhileHandlingException(connectionToClose);
                            } finally {
                                latch.countDown();
                            }
                        }

                        @Override
                        public void onFailure(Exception e) {
                            onDone();
                            if (e instanceof ConnectTransportException) {
                                logger.debug(() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", nodeToPing), e);
                                hostFailureListener.onNodeDisconnected(nodeToPing, e);
                            } else {
                                logger.info(() -> new ParameterizedMessage(
                                        "failed to get local cluster state info for {}, disconnecting...", nodeToPing), e);
                            }
                        }

                        @Override
                        protected void doRun() throws Exception {
                            Transport.Connection pingConnection = null;
                            if (nodes.contains(nodeToPing)) {
                                try {
                                    pingConnection = transportService.getConnection(nodeToPing);
                                } catch (NodeNotConnectedException e) {
                                    // will use a temp connection
                                }
                            }
                            if (pingConnection == null) {
                                logger.trace("connecting to cluster node [{}]", nodeToPing);
                                connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
                                pingConnection = connectionToClose;
                            }
                            transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
                                Requests.clusterStateRequest().clear().nodes(true).local(true),
                                TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
                                    .withTimeout(pingTimeout).build(),
                                new TransportResponseHandler<ClusterStateResponse>() {

                                    @Override
                                    public ClusterStateResponse newInstance() {
                                        return new ClusterStateResponse();
                                    }

                                    @Override
                                    public String executor() {
                                        return ThreadPool.Names.SAME;
                                    }

                                    @Override
                                    public void handleResponse(ClusterStateResponse response) {
                                        clusterStateResponses.put(nodeToPing, response);
                                        onDone();
                                    }

                                    @Override
                                    public void handleException(TransportException e) {
                                        logger.info(() -> new ParameterizedMessage(
                                                "failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
                                        try {
                                            hostFailureListener.onNodeDisconnected(nodeToPing, e);
                                        } finally {
                                            onDone();
                                        }
                                    }
                                });
                        }
                    });
                }
                latch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }

            HashSet<DiscoveryNode> newNodes = new HashSet<>();
            HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
            for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
                if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
                    logger.warn("node {} not part of the cluster {}, ignoring...",
                            entry.getValue().getState().nodes().getLocalNode(), clusterName);
                    newFilteredNodes.add(entry.getKey());
                    continue;
                }
                for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {
                    newNodes.add(cursor.value);
                }
            }

            nodes = establishNodeConnections(newNodes);
            filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
        }
    }
  • The doSample method of SniffNodesSampler first merges listedNodes and nodes into a Set called nodesToPing, and then puts nodesToPing nodes one by one into the thread pool for asynchronous execution detection. here, CountDownLatch is used to wait for all nodes to finish asynchronous execution.
  • The logic of asynchronous thread pool detection is to send requests. clusterstatequest (). clear (). nodes (true). local (true) requests to node. if successful, clusterstateevent is returned and added to clusterStateResponses, the ConcurrentMap
  • After traversing clusterStateResponses, a ConcurrentMap, node with inconsistent clusterName are added to newFilteredNodes and finally assigned to filteredNodes;. If the clusterName is consistent, it traverses clusterstateevent.getstate (). nodes (). getdatanodes (). values (), adds these nodes to newNodes, and finally establishes a connection through the establishNodeConnections method and removes nodes that fail to connect (Try again) is finally assigned to nodes

TransportClientNodesService.execute

elasticsearch-6.4.3-sources.jar! /org/elasticsearch/client/transport/TransportClientNodesService.java

final class TransportClientNodesService extends AbstractComponent implements Closeable {

    private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt());

    //......

    public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) {
        // we first read nodes before checking the closed state; this
        // is because otherwise we could be subject to a race where we
        // read the state as not being closed, and then the client is
        // closed and the nodes list is cleared, and then a
        // NoNodeAvailableException is thrown
        // it is important that the order of first setting the state of
        // closed and then clearing the list of nodes is maintained in
        // the close method
        final List<DiscoveryNode> nodes = this.nodes;
        if (closed) {
            throw new IllegalStateException("transport client is closed");
        }
        ensureNodesAreAvailable(nodes);
        int index = getNodeNumber();
        RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, hostFailureListener);
        DiscoveryNode node = retryListener.getNode(0);
        try {
            callback.doWithNode(node, retryListener);
        } catch (Exception e) {
            try {
                //this exception can't come from the TransportService as it doesn't throw exception at all
                listener.onFailure(e);
            } finally {
                retryListener.maybeNodeFailed(node, e);
            }
        }
    }

    private void ensureNodesAreAvailable(List<DiscoveryNode> nodes) {
        if (nodes.isEmpty()) {
            String message = String.format(Locale.ROOT, "None of the configured nodes are available: %s", this.listedNodes);
            throw new NoNodeAvailableException(message);
        }
    }

    private int getNodeNumber() {
        int index = randomNodeGenerator.incrementAndGet();
        if (index < 0) {
            index = 0;
            randomNodeGenerator.set(0);
        }
        return index;
    }

    public static class RetryListener<Response> implements ActionListener<Response> {
        private final NodeListenerCallback<Response> callback;
        private final ActionListener<Response> listener;
        private final List<DiscoveryNode> nodes;
        private final int index;
        private final TransportClient.HostFailureListener hostFailureListener;

        private volatile int i;

        RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener,
                      List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
            this.callback = callback;
            this.listener = listener;
            this.nodes = nodes;
            this.index = index;
            this.hostFailureListener = hostFailureListener;
        }

        @Override
        public void onResponse(Response response) {
            listener.onResponse(response);
        }

        @Override
        public void onFailure(Exception e) {
            Throwable throwable = ExceptionsHelper.unwrapCause(e);
            if (throwable instanceof ConnectTransportException) {
                maybeNodeFailed(getNode(this.i), (ConnectTransportException) throwable);
                int i = ++this.i;
                if (i >= nodes.size()) {
                    listener.onFailure(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e));
                } else {
                    try {
                        callback.doWithNode(getNode(i), this);
                    } catch(final Exception inner) {
                        inner.addSuppressed(e);
                        // this exception can't come from the TransportService as it doesn't throw exceptions at all
                        listener.onFailure(inner);
                    }
                }
            } else {
                listener.onFailure(e);
            }
        }

        final DiscoveryNode getNode(int i) {
            return nodes.get((index + i) % nodes.size());
        }

        final void maybeNodeFailed(DiscoveryNode node, Exception ex) {
            if (ex instanceof NodeDisconnectedException || ex instanceof NodeNotConnectedException) {
                hostFailureListener.onNodeDisconnected(node, ex);
            }
        }
    }

    //......
}
  • The execute method provided by TransportClientNodesService mainly does two things, one is to balance the load of clients on nodes nodes, and the other is to add a retry mechanism to requests through a RetryListener
  • The ensureNodesAreAvailable method first ensures that the nodes list is not empty, and throws a NoNodeAvailableException; if it is empty; After that, the index value is determined by the getNodeNumber method, which uses randomnodegeener to increment to obtain index, returns if index is greater than or equal to 0, resets the value of randomnodegeener to 0 and returns 0 if index is less than 0; Here randomnodegeener is of AtomicInteger type, and its initial value is Randomness.get().nextInt ()
  • The constructor of the RetryListener receives the index value calculated in the previous step, and it has an I variable with an initial value of 0. when onFailure, if there is a ConnectTransportException exception exception, it will retry. when retrying, it will first increment I, and then it will stop retrying if i>=nodes size. Throws NoNodeAvailableException, otherwise continue to call callback.doWithNode for retry, when retried, the node is obtained through the getNode method, and the current listener; is passed in at the same time; The getNode method uses (index+i)% nodes.size () to obtain the index of node and form the effect of Round Robin. For the RetryListener, I will be incremented during internal retry, and for the execute method, the index value is also incremented, so whether the request succeeds or fails, it will have the effect of Round Robin for nodes’ method.

Summary

  • TransportProxyClient mainly provides the execute method, which takes out the corresponding TransportActionNodeProxy from proxies, and then executes the proxy. execute method through the EXECUTE method of TransportClientNodesService. TransportActionNodeProxy provides the execute method. Its method parameters require the input of DiscoveryNode, Request and ActionListener. This method mainly wraps ActionListener as ActionListenerResponseHandler then calls transportService.sendRequest
  • TransportClientnodesService defines three List attributes about DiscoveryNode, namely listedNodes, Nodes, filteredNodes; ; Where listedNodes are nodes; added through addTransportAddresses; Nodes is a list of nodes that are currently connected and used by the client. FilteredNodes are nodes that have been filtered out because the clusterN ame does not match, and these nodes will not be used by the client.
  • TransportClientNodesService’s constructor will be configured according to settings’ client.transport.sniff (The default is false.) to determine whether to create SniffNodesSampler or SimpleNodeSampler, register a scheduling task through threadPool, and execute ScheduledNodeSampler; every nodesSamplerInterval; ScheduledNodeSampler implements the Runnable interface. Its fun method mainly calls nodesSampler.sample (). After that, as long as TransportClientNodesService does not close, it will continue to register the scheduling task and update nodesSamplerFuture.
  • NodeSampler is an abstract class, which defines the sample method and calls the defined abstract method doSample; inside. NodeSampler also provides the establishNodeConnections method, which determines whether a Node is connected through TransportService.NodeConnected (Node). If not, it will try to connect again through TransportService.ConnectTonode (Node). If it throws an exception, it will remove the node, and finally it will return the nodes; with connected detection. It has two subclasses, SimpleNodeSampler and SniffNodesSampler
  • SimpleNodeSampler’s doSample method will carry out further survival detection on nodes, mainly by sending LivenessRequest. if LivenessResponse can be successfully returned, it will determine whether clusterName is consistent, add inconsistent to newFilteredNodes, and finally assign value to filteredNodes;; It is added to newNodes in a consistent way. finally, the connection is established through the establishNodeConnections method and the node that fails to connect is removed (Try again) is finally assigned to nodes
  • The doSample method of SniffNodesSampler first merges listedNodes and nodes into Set named nodesToPing, and then puts nodesToPing nodes one by one into the thread pool for asynchronous execution detection. here, CountDownLatch is used to wait for all nodes to finish asynchronous execution. The logic of asynchronous thread pool detection is to send requests. clusterstatequest (). clear (). nodes (true). local (true) requests to node. if successful, clusterstateevent is returned and added to the clusterStateResponses ConcurrentMap. After traversing clusterStateResponses, a ConcurrentMap, node with inconsistent clusterName are added to newFilteredNodes and finally assigned to filteredNodes;. If the clusterName is consistent, it traverses clusterstateevent.getstate (). nodes (). getdatanodes (). values (), adds these nodes to newNodes, and finally establishes a connection through the establishNodeConnections method and removes nodes that fail to connect (Try again) is finally assigned to nodes
  • The execute method provided by TransportClientNodesService mainly does two things, one is to balance the load of clients on nodes nodes, and the other is to add a retry mechanism to requests through a RetryListener. Its load balancing strategy for nodes is Round Robin, while RetryListener only retries ConnectTransportException exceptions, with the maximum number of retries being nodes.size()-1

doc