Talk about jest’s NodeChecker

  jest

Order

This article mainly studies jest’s NodeChecker.

NodeChecker

jest-common-6.3.1-sources.jar! /io/searchbox/client/config/discovery/NodeChecker.java

public class NodeChecker extends AbstractScheduledService {

    private final static Logger log = LoggerFactory.getLogger(NodeChecker.class);
    private final static String PUBLISH_ADDRESS_KEY = "http_address";
    private final static String PUBLISH_ADDRESS_KEY_V5 = "publish_address"; // The one that under "http" node
    private final static Pattern INETSOCKETADDRESS_PATTERN = Pattern.compile("(?:inet\\[)?(?:(?:[^:]+)?\\/)?([^:]+):(\\d+)\\]?");

    private final NodesInfo action;

    protected JestClient client;
    protected Scheduler scheduler;
    protected String defaultScheme;
    protected Set<String> bootstrapServerList;
    protected Set<String> discoveredServerList;

    public NodeChecker(JestClient jestClient, ClientConfig clientConfig) {
        action = new NodesInfo.Builder()
                .withHttp()
                .addNode(clientConfig.getDiscoveryFilter())
                .build();
        this.client = jestClient;
        this.defaultScheme = clientConfig.getDefaultSchemeForDiscoveredNodes();
        this.scheduler = Scheduler.newFixedDelaySchedule(
                0l,
                clientConfig.getDiscoveryFrequency(),
                clientConfig.getDiscoveryFrequencyTimeUnit()
        );
        this.bootstrapServerList = ImmutableSet.copyOf(clientConfig.getServerList());
        this.discoveredServerList = new LinkedHashSet<String>();
    }

    @Override
    protected void runOneIteration() throws Exception {
        JestResult result;
        try {
            result = client.execute(action);
        } catch (CouldNotConnectException cnce) {
            // Can't connect to this node, remove it from the list
            log.error("Connect exception executing NodesInfo!", cnce);
            removeNodeAndUpdateServers(cnce.getHost());
            return;
            // do not elevate the exception since that will stop the scheduled calls.
            // throw new RuntimeException("Error executing NodesInfo!", e);
        } catch (Exception e) {
            log.error("Error executing NodesInfo!", e);
            client.setServers(bootstrapServerList);
            return;
            // do not elevate the exception since that will stop the scheduled calls.
            // throw new RuntimeException("Error executing NodesInfo!", e);
        }  

        if (result.isSucceeded()) {
            LinkedHashSet<String> httpHosts = new LinkedHashSet<String>();

            JsonObject jsonMap = result.getJsonObject();
            JsonObject nodes = (JsonObject) jsonMap.get("nodes");
            if (nodes != null) {
                for (Entry<String, JsonElement> entry : nodes.entrySet()) {

                    JsonObject host = entry.getValue().getAsJsonObject();
                    JsonElement addressElement = null;
                    if (host.has("version")) {
                        int majorVersion = Integer.parseInt(Splitter.on('.').splitToList(host.get("version").getAsString()).get(0));

                        if (majorVersion >= 5) {
                            JsonObject http = host.getAsJsonObject("http");
                            if (http != null && http.has(PUBLISH_ADDRESS_KEY_V5))
                                addressElement = http.get(PUBLISH_ADDRESS_KEY_V5);
                        }
                    }

                    if (addressElement == null) {
                        // get as a JsonElement first as some nodes in the cluster may not have an http_address
                        if (host.has(PUBLISH_ADDRESS_KEY)) addressElement = host.get(PUBLISH_ADDRESS_KEY);
                    }

                    if (addressElement != null && !addressElement.isJsonNull()) {
                        String httpAddress = getHttpAddress(addressElement.getAsString());
                        if(httpAddress != null) httpHosts.add(httpAddress);
                    }
              }
            }
            if (log.isDebugEnabled()) {
                log.debug("Discovered {} HTTP hosts: {}", httpHosts.size(), Joiner.on(',').join(httpHosts));
            }
            discoveredServerList = httpHosts;
            client.setServers(discoveredServerList);
        } else {
            log.warn("NodesInfo request resulted in error: {}", result.getErrorMessage());
            client.setServers(bootstrapServerList);
        }
    }

    protected void removeNodeAndUpdateServers(final String hostToRemove) {
        log.warn("Removing host {}", hostToRemove);
        discoveredServerList.remove(hostToRemove);
        if (log.isInfoEnabled()) {
            log.info("Discovered server pool is now: {}", Joiner.on(',').join(discoveredServerList));
        }
        if (!discoveredServerList.isEmpty()) {
          client.setServers(discoveredServerList);
        } else {
          client.setServers(bootstrapServerList);
        }
    }

    @Override
    protected Scheduler scheduler() {
        return scheduler;
    }

    @Override
    protected ScheduledExecutorService executor() {
        final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder()
                .setDaemon(true)
                .setNameFormat(serviceName())
                .build());
        // Add a listener to shutdown the executor after the service is stopped.  This ensures that the
        // JVM shutdown will not be prevented from exiting after this service has stopped or failed.
        // Technically this listener is added after start() was called so it is a little gross, but it
        // is called within doStart() so we know that the service cannot terminate or fail concurrently
        // with adding this listener so it is impossible to miss an event that we are interested in.
        addListener(new Listener() {
            @Override public void terminated(State from) {
                executor.shutdown();
            }
            @Override public void failed(State from, Throwable failure) {
                executor.shutdown();
            }}, MoreExecutors.directExecutor());
        return executor;
    }

    /**
     * Converts the Elasticsearch reported publish address in the format "inet[<hostname>:<port>]" or
     * "inet[<hostname>/<hostaddress>:<port>]" to a normalized http address in the form "http://host:port".
     */
    protected String getHttpAddress(String httpAddress) {
        Matcher resolvedMatcher = INETSOCKETADDRESS_PATTERN.matcher(httpAddress);
        if (resolvedMatcher.matches()) {
            return defaultScheme + resolvedMatcher.group(1) + ":" + resolvedMatcher.group(2);
        }

        return null;
    }

}
  • NodeChecker inherits AbstractScheduledService, and its constructor creates a new fixedDelayScheduler to execute NodeChecker according to clientConfig’s discoveryFrequency and discoveryFrequencyTimeUnit. It implements the runOneIteration method, which mainly sends NodesInfo requests (GET /_nodes/_all/http)
  • If the request throws a CouldNotConnectException, call the removeNodeAndUpdateServers method to remove the host; ; If you throw another Exception, reset the client’s servers to bootstrapServerList.
  • If the request is successful, the body is parsed; if there is version under nodes and is greater than or equal to 5, the PUBLISH_ADDRESS_KEY_V5 (publish_address) attribute value is added to discoveredServerList;; In the old version, it was published under nodes _ address _ key (http_address) attribute value is added to discoveredServerList.

NodesInfo returns an instance

{
  "_nodes" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "cluster_name" : "docker-cluster",
  "nodes" : {
    "RmyGhZEbTjC7JCQFVS3HWQ" : {
      "name" : "RmyGhZE",
      "transport_address" : "172.17.0.2:9300",
      "host" : "172.17.0.2",
      "ip" : "172.17.0.2",
      "version" : "6.6.2",
      "build_flavor" : "oss",
      "build_type" : "tar",
      "build_hash" : "3bd3e59",
      "roles" : [
        "master",
        "data",
        "ingest"
      ],
      "http" : {
        "bound_address" : [
          "0.0.0.0:9200"
        ],
        "publish_address" : "192.168.99.100:9200",
        "max_content_length_in_bytes" : 104857600
      }
    }
  }
}
  • If it is version 5 or above, there is an http attribute under nodes, in which the publish_address attribute is used to return the publish address of the node.

JestHttpClient

jest-6.3.1-sources.jar! /io/searchbox/client/http/JestHttpClient.java

public class JestHttpClient extends AbstractJestClient {
    //......

    @Override
    public <T extends JestResult> T execute(Action<T> clientRequest) throws IOException {
        return execute(clientRequest, null);
    }

    public <T extends JestResult> T execute(Action<T> clientRequest, RequestConfig requestConfig) throws IOException {
        HttpUriRequest request = prepareRequest(clientRequest, requestConfig);
        CloseableHttpResponse response = null;
        try {
            response = executeRequest(request);
            return deserializeResponse(response, request, clientRequest);
        } catch (HttpHostConnectException ex) {
            throw new CouldNotConnectException(ex.getHost().toURI(), ex);
        } finally {
            if (response != null) {
                try {
                    response.close();
                } catch (IOException ex) {
                    log.error("Exception occurred while closing response stream.", ex);
                }
            }
        }
    }

    @Override
    public <T extends JestResult> void executeAsync(final Action<T> clientRequest, final JestResultHandler<? super T> resultHandler) {
        executeAsync(clientRequest, resultHandler, null);
    }

    public <T extends JestResult> void executeAsync(final Action<T> clientRequest, final JestResultHandler<? super T> resultHandler, final RequestConfig requestConfig) {
        synchronized (this) {
            if (!asyncClient.isRunning()) {
                asyncClient.start();
            }
        }

        HttpUriRequest request = prepareRequest(clientRequest, requestConfig);
        executeAsyncRequest(clientRequest, resultHandler, request);
    }

    protected <T extends JestResult> HttpUriRequest prepareRequest(final Action<T> clientRequest, final RequestConfig requestConfig) {
        String elasticSearchRestUrl = getRequestURL(getNextServer(), clientRequest.getURI(elasticsearchVersion));
        HttpUriRequest request = constructHttpMethod(clientRequest.getRestMethodName(), elasticSearchRestUrl, clientRequest.getData(gson), requestConfig);

        log.debug("Request method={} url={}", clientRequest.getRestMethodName(), elasticSearchRestUrl);

        // add headers added to action
        for (Entry<String, Object> header : clientRequest.getHeaders().entrySet()) {
            request.addHeader(header.getKey(), header.getValue().toString());
        }

        return request;
    }

    protected String getNextServer() {
        return serverPoolReference.get().getNextServer();
    }

    //......
}
  • JestHttpClient inherits AbstractJestClient, and its execute and executeAsync methods both call prepareRequest to construct HttpUriRequest; ; The method first calls the getNextServer method to obtain the address of the elasticSearchServer to be requested; The getNextServer method is called serverpoolreferenceget (). getnextserver ()

AbstractJestClient

jest-common-6.3.1-sources.jar! /io/searchbox/client/AbstractJestClient.java

public abstract class AbstractJestClient implements JestClient {

    private final AtomicReference<ServerPool> serverPoolReference =
            new AtomicReference<ServerPool>(new ServerPool(ImmutableSet.<String>of()));

    //......

    public void setServers(Set<String> servers) {
        if (servers.equals(serverPoolReference.get().getServers())) {
            if (log.isDebugEnabled()) {
                log.debug("Server pool already contains same list of servers: {}",
                        Joiner.on(',').join(scrubServerURIs(servers)));
            }
            return;
        }
        if (log.isInfoEnabled()) {
            log.info("Setting server pool to a list of {} servers: [{}]",
                      servers.size(), Joiner.on(',').join(scrubServerURIs(servers)));
        }
        serverPoolReference.set(new ServerPool(servers));

        if (servers.isEmpty()) {
            log.warn("No servers are currently available to connect.");
        }
    }

    //......        
}
  • AbstractJestClient has a serverPoolReference attribute, which is AtomicReference, and its generic type is ServerPool; ; The setServers method is to create a new ServerPool and then update serverPoolReference

ServerPool

jest-common-6.3.1-sources.jar! /io/searchbox/client/AbstractJestClient.java

    private static final class ServerPool {
        private final List<String> serversRing;
        private final AtomicInteger nextServerIndex = new AtomicInteger(0);

        public ServerPool(final Set<String> servers) {
            this.serversRing = ImmutableList.copyOf(servers);
        }

        public Set<String> getServers() {
            return ImmutableSet.copyOf(serversRing);
        }

        public String getNextServer() {
            if (serversRing.size() > 0) {
                try {
                    return serversRing.get(nextServerIndex.getAndIncrement() % serversRing.size());
                } catch (IndexOutOfBoundsException outOfBoundsException) {
                    // In the very rare case where nextServerIndex overflowed, this will end up with a negative number,
                    // resulting in an IndexOutOfBoundsException.
                    // We should then start back at the beginning of the server list.
                    // Note that this might happen on several threads at once, in which the reset might happen a few times
                    log.info("Resetting next server index");
                    nextServerIndex.set(0);
                    return serversRing.get(nextServerIndex.getAndIncrement() % serversRing.size());
                }
            }
            else {
                throw new NoServerConfiguredException("No Server is assigned to client to connect");
            }
        }

        public int getSize() {
            return serversRing.size();
        }
    }
  • ServerPool has a nextServerIndex of AtomicInteger type. The getNextServer method determines the index of the selected serversRing List through NextServ erIndex.GetAndIncrement ()% ServerRing.size (), which implements the Round Robin policy. In extreme cases, if IndexOutOfBoundsException occurs, nextServerIndex will be reset to 0, and then the next server will continue according to the Round Robin policy.

Summary

  • NodeChecker inherits AbstractScheduledService, and its constructor creates a new fixedDelayScheduler to execute NodeChecker according to clientConfig’s discoveryFrequency and discoveryFrequencyTimeUnit. It implements the runOneIteration method, which mainly sends NodesInfo requests (GET /_nodes/_all/http), and then get nodes’ PUBLISH_ADDRESS to update discoveredServerList.
  • JestHttpClient inherits AbstractJestClient, and its execute and executeAsync methods both call prepareRequest to construct HttpUriRequest; ; The method first calls the getNextServer method to obtain the address of the elasticSearchServer to be requested; The getNextServer method is called serverpoolref.get (). getnextserver (); AbstractJestClient has a serverPoolReference attribute, which is AtomicReference, and its generic type is ServerPool; ; The setServers method is to create a new ServerPool and then update serverPoolReference
  • ServerPool has a nextServerIndex of AtomicInteger type. The getNextServer method determines the index of the selected serversRing List through NextServ erIndex.GetAndIncrement ()% ServerRing.size (), which implements the Round Robin policy. In extreme cases, if IndexOutOfBoundsException occurs, nextServerIndex will be reset to 0, and then the next server will continue according to the Round Robin policy.

doc