Talk about SeedHostsProvider of elasticsearch.

  elasticsearch

Order

This article mainly studies the SeedHostsProvider of elasticsearch.

SeedHostsProvider

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsProvider.java

/**
 * A pluggable provider of the list of seed hosts to use for discovery.
 */
public interface SeedHostsProvider {

    /**
     * Returns a list of seed hosts to use for discovery. Called repeatedly while discovery is active (i.e. while there is no master)
     * so that this list may be dynamic.
     */
    List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver);

    /**
     * Helper object that allows to resolve a list of hosts to a list of transport addresses.
     * Each host is resolved into a transport address (or a collection of addresses if the
     * number of ports is greater than one)
     */
    interface HostsResolver {
        List<TransportAddress> resolveHosts(List<String> hosts, int limitPortCounts);
    }
}
  • The SeedHostsProvider interface defines the getSeedAddresses method, whose parameter type is HostsResolver; ; HostsResolver interface defines resolveHosts method; It has several implementation classes, namely SettingsBasedSeedHostsProvider, FileBasedSeedHostsProvider, GceSeedHostsProvider, AwsEc2SeedHostsProvider, AzureSeedHostsProvider.

SettingsBasedSeedHostsProvider

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SettingsBasedSeedHostsProvider.java

public class SettingsBasedSeedHostsProvider implements SeedHostsProvider {

    private static final Logger logger = LogManager.getLogger(SettingsBasedSeedHostsProvider.class);

    public static final Setting<List<String>> LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING =
        Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(), Property.NodeScope, Property.Deprecated);

    public static final Setting<List<String>> DISCOVERY_SEED_HOSTS_SETTING =
        Setting.listSetting("discovery.seed_hosts", emptyList(), Function.identity(), Property.NodeScope);

    // these limits are per-address
    private static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
    private static final int LIMIT_LOCAL_PORTS_COUNT = 5;

    private final List<String> configuredHosts;
    private final int limitPortCounts;

    public SettingsBasedSeedHostsProvider(Settings settings, TransportService transportService) {
        if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
            if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) {
                throw new IllegalArgumentException("it is forbidden to set both ["
                    + DISCOVERY_SEED_HOSTS_SETTING.getKey() + "] and ["
                    + LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey() + "]");
            }
            configuredHosts = LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
            // we only limit to 1 address, makes no sense to ping 100 ports
            limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
        } else if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) {
            configuredHosts = DISCOVERY_SEED_HOSTS_SETTING.get(settings);
            // we only limit to 1 address, makes no sense to ping 100 ports
            limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
        } else {
            // if unicast hosts are not specified, fill with simple defaults on the local machine
            configuredHosts = transportService.getLocalAddresses();
            limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
        }

        logger.debug("using initial hosts {}", configuredHosts);
    }

    @Override
    public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {
        return hostsResolver.resolveHosts(configuredHosts, limitPortCounts);
    }
}
  • SettingsBasedSeedHostsProvider mainly reads the configuration of discovery.seed_hosts or discovery.zen.ping.unicast.hosts.

FileBasedSeedHostsProvider

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/FileBasedSeedHostsProvider.java

public class FileBasedSeedHostsProvider implements SeedHostsProvider {

    private static final Logger logger = LogManager.getLogger(FileBasedSeedHostsProvider.class);

    public static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";

    private final Path unicastHostsFilePath;

    public FileBasedSeedHostsProvider(Path configFile) {
        this.unicastHostsFilePath = configFile.resolve(UNICAST_HOSTS_FILE);
    }

    private List<String> getHostsList() {
        if (Files.exists(unicastHostsFilePath)) {
            try (Stream<String> lines = Files.lines(unicastHostsFilePath)) {
                return lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments
                    .collect(Collectors.toList());
            } catch (IOException e) {
                logger.warn(() -> new ParameterizedMessage("failed to read file [{}]", unicastHostsFilePath), e);
                return Collections.emptyList();
            }
        }

        logger.warn("expected, but did not find, a dynamic hosts list at [{}]", unicastHostsFilePath);

        return Collections.emptyList();
    }

    @Override
    public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {
        final List<TransportAddress> transportAddresses = hostsResolver.resolveHosts(getHostsList(), 1);
        logger.debug("seed addresses: {}", transportAddresses);
        return transportAddresses;
    }
}
  • FileBasedSeedHostsProvider mainly reads unicast_hosts.txt file from the specified location to parse hostsList

SeedHostsProvider.HostsResolver

UnicastZenPing.createHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    private SeedHostsProvider.HostsResolver createHostsResolver() {
        return (hosts, limitPortCounts) -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts,
            limitPortCounts, transportService, resolveTimeout);
    }
  • UnicastZenPing’s createHostsResolver method creates an anonymous seedhostsprovider.hostsresolver class, which is implemented by delegating to the seedhostsresolver.resolvehostslists method

SeedHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java

public class SeedHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver {

    //......

    public static List<TransportAddress> resolveHostsLists(
        final ExecutorService executorService,
        final Logger logger,
        final List<String> hosts,
        final int limitPortCounts,
        final TransportService transportService,
        final TimeValue resolveTimeout) {
        Objects.requireNonNull(executorService);
        Objects.requireNonNull(logger);
        Objects.requireNonNull(hosts);
        Objects.requireNonNull(transportService);
        Objects.requireNonNull(resolveTimeout);
        if (resolveTimeout.nanos() < 0) {
            throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
        }
        // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
        final List<Callable<TransportAddress[]>> callables =
            hosts
                .stream()
                .map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
                .collect(Collectors.toList());
        final List<Future<TransportAddress[]>> futures;
        try {
            futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return Collections.emptyList();
        }
        final List<TransportAddress> transportAddresses = new ArrayList<>();
        final Set<TransportAddress> localAddresses = new HashSet<>();
        localAddresses.add(transportService.boundAddress().publishAddress());
        localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
        // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
        // hostname with the corresponding task by iterating together
        final Iterator<String> it = hosts.iterator();
        for (final Future<TransportAddress[]> future : futures) {
            final String hostname = it.next();
            if (!future.isCancelled()) {
                assert future.isDone();
                try {
                    final TransportAddress[] addresses = future.get();
                    logger.trace("resolved host [{}] to {}", hostname, addresses);
                    for (int addressId = 0; addressId < addresses.length; addressId++) {
                        final TransportAddress address = addresses[addressId];
                        // no point in pinging ourselves
                        if (localAddresses.contains(address) == false) {
                            transportAddresses.add(address);
                        }
                    }
                } catch (final ExecutionException e) {
                    assert e.getCause() != null;
                    final String message = "failed to resolve host [" + hostname + "]";
                    logger.warn(message, e.getCause());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    // ignore
                }
            } else {
                logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
            }
        }
        return Collections.unmodifiableList(transportAddresses);
    }

    //......
}
  • The resolveHostsLists static method of SeedHostsResolver mainly uses multithreading to execute the transportservice.addressesfromstring method in parallel.

Summary

  • The SeedHostsProvider interface defines the getSeedAddresses method, whose parameter type is HostsResolver; ; HostsResolver interface defines resolveHosts method; It has several implementation classes, namely SettingsBasedSeedHostsProvider, FileBasedSeedHostsProvider, GceSeedHostsProvider, AwsEc2SeedHostsProvider, AzureSeedHostsProvider.
  • SettingsBasedSeedHostsProvider mainly reads the configuration of discovery.seed_hosts or discovery.zen.ping.unicast.hosts; FileBasedSeedHostsProvider mainly reads unicast_hosts.txt file from the specified location to parse hostsList
  • UnicastZenPing’s createHostsResolver method creates an anonymous SeeHostsProvider. Hosts Resolver class, whose implementation is delegated to the SeeHostsResolver. Resolvehostslists method. The resolveHostsLists static method of SeedHostsResolver mainly uses multithreading to execute the transportservice.addressesfromstring method in parallel.

doc