Talk about PeerFinder from elasticsearch.

  elasticsearch

Order

This article mainly studies PeerFinder of elasticsearch.

PeersRequest

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeersRequest.java

public class PeersRequest extends TransportRequest {
    private final DiscoveryNode sourceNode;
    private final List<DiscoveryNode> knownPeers;

    public PeersRequest(DiscoveryNode sourceNode, List<DiscoveryNode> knownPeers) {
        assert knownPeers.contains(sourceNode) == false : "local node is not a peer";
        this.sourceNode = sourceNode;
        this.knownPeers = knownPeers;
    }

    public PeersRequest(StreamInput in) throws IOException {
        super(in);
        sourceNode = new DiscoveryNode(in);
        knownPeers = in.readList(DiscoveryNode::new);
    }

    @Override
    public void writeTo(StreamOutput out) throws IOException {
        super.writeTo(out);
        sourceNode.writeTo(out);
        out.writeList(knownPeers);
    }

    public List<DiscoveryNode> getKnownPeers() {
        return knownPeers;
    }

    public DiscoveryNode getSourceNode() {
        return sourceNode;
    }

    @Override
    public String toString() {
        return "PeersRequest{" +
            "sourceNode=" + sourceNode +
            ", knownPeers=" + knownPeers +
            '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        PeersRequest that = (PeersRequest) o;
        return Objects.equals(sourceNode, that.sourceNode) &&
            Objects.equals(knownPeers, that.knownPeers);
    }

    @Override
    public int hashCode() {
        return Objects.hash(sourceNode, knownPeers);
    }
}
  • PeersRequest has two attributes, sourceNode and knownPeers.

PeersResponse

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/cluster/coordination/PeersResponse.java

public class PeersResponse extends TransportResponse {
    private final Optional<DiscoveryNode> masterNode;
    private final List<DiscoveryNode> knownPeers;
    private final long term;

    public PeersResponse(Optional<DiscoveryNode> masterNode, List<DiscoveryNode> knownPeers, long term) {
        assert masterNode.isPresent() == false || knownPeers.isEmpty();
        this.masterNode = masterNode;
        this.knownPeers = knownPeers;
        this.term = term;
    }

    public PeersResponse(StreamInput in) throws IOException {
        masterNode = Optional.ofNullable(in.readOptionalWriteable(DiscoveryNode::new));
        knownPeers = in.readList(DiscoveryNode::new);
        term = in.readLong();
        assert masterNode.isPresent() == false || knownPeers.isEmpty();
    }

    @Override
    public void writeTo(StreamOutput out) throws IOException {
        super.writeTo(out);
        out.writeOptionalWriteable(masterNode.orElse(null));
        out.writeList(knownPeers);
        out.writeLong(term);
    }

    /**
     * @return the node that is currently leading, according to the responding node.
     */
    public Optional<DiscoveryNode> getMasterNode() {
        return masterNode;
    }

    /**
     * @return the collection of known peers of the responding node, or an empty collection if the responding node believes there
     * is currently a leader.
     */
    public List<DiscoveryNode> getKnownPeers() {
        return knownPeers;
    }

    /**
     * @return the current term of the responding node. If the responding node is the leader then this is the term in which it is
     * currently leading.
     */
    public long getTerm() {
        return term;
    }

    @Override
    public String toString() {
        return "PeersResponse{" +
            "masterNode=" + masterNode +
            ", knownPeers=" + knownPeers +
            ", term=" + term +
            '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        PeersResponse that = (PeersResponse) o;
        return term == that.term &&
            Objects.equals(masterNode, that.masterNode) &&
            Objects.equals(knownPeers, that.knownPeers);
    }

    @Override
    public int hashCode() {
        return Objects.hash(masterNode, knownPeers, term);
    }
}
  • PeersResponse has three attributes: masterNode, knownPeers, and term.

PeerFinder

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

public abstract class PeerFinder {

    private static final Logger logger = LogManager.getLogger(PeerFinder.class);

    public static final String REQUEST_PEERS_ACTION_NAME = "internal:discovery/request_peers";

    // the time between attempts to find all peers
    public static final Setting<TimeValue> DISCOVERY_FIND_PEERS_INTERVAL_SETTING =
        Setting.timeSetting("discovery.find_peers_interval",
            TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

    public static final Setting<TimeValue> DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING =
        Setting.timeSetting("discovery.request_peers_timeout",
            TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

    private final Settings settings;

    private final TimeValue findPeersInterval;
    private final TimeValue requestPeersTimeout;

    private final Object mutex = new Object();
    private final TransportService transportService;
    private final TransportAddressConnector transportAddressConnector;
    private final ConfiguredHostsResolver configuredHostsResolver;

    private volatile long currentTerm;
    private boolean active;
    private DiscoveryNodes lastAcceptedNodes;
    private final Map<TransportAddress, Peer> peersByAddress = new LinkedHashMap<>();
    private Optional<DiscoveryNode> leader = Optional.empty();
    private volatile List<TransportAddress> lastResolvedAddresses = emptyList();

    public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
                      ConfiguredHostsResolver configuredHostsResolver) {
        this.settings = settings;
        findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
        requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
        this.transportService = transportService;
        this.transportAddressConnector = transportAddressConnector;
        this.configuredHostsResolver = configuredHostsResolver;

        transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false,
            PeersRequest::new,
            (request, channel, task) -> channel.sendResponse(handlePeersRequest(request)));

        transportService.registerRequestHandler(UnicastZenPing.ACTION_NAME, Names.GENERIC, false, false,
            UnicastZenPing.UnicastPingRequest::new, new Zen1UnicastPingRequestHandler());
    }

    public void activate(final DiscoveryNodes lastAcceptedNodes) {
        logger.trace("activating with {}", lastAcceptedNodes);

        synchronized (mutex) {
            assert assertInactiveWithNoKnownPeers();
            active = true;
            this.lastAcceptedNodes = lastAcceptedNodes;
            leader = Optional.empty();
            handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected
        }

        onFoundPeersUpdated(); // trigger a check for a quorum already
    }

    public void deactivate(DiscoveryNode leader) {
        final boolean peersRemoved;
        synchronized (mutex) {
            logger.trace("deactivating and setting leader to {}", leader);
            active = false;
            peersRemoved = handleWakeUp();
            this.leader = Optional.of(leader);
            assert assertInactiveWithNoKnownPeers();
        }
        if (peersRemoved) {
            onFoundPeersUpdated();
        }
    }

    // exposed to subclasses for testing
    protected final boolean holdsLock() {
        return Thread.holdsLock(mutex);
    }

    private boolean assertInactiveWithNoKnownPeers() {
        assert holdsLock() : "PeerFinder mutex not held";
        assert active == false;
        assert peersByAddress.isEmpty() : peersByAddress.keySet();
        return true;
    }

    PeersResponse handlePeersRequest(PeersRequest peersRequest) {
        synchronized (mutex) {
            assert peersRequest.getSourceNode().equals(getLocalNode()) == false;
            final List<DiscoveryNode> knownPeers;
            if (active) {
                assert leader.isPresent() == false : leader;
                if (peersRequest.getSourceNode().isMasterNode()) {
                    startProbe(peersRequest.getSourceNode().getAddress());
                }
                peersRequest.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(this::startProbe);
                knownPeers = getFoundPeersUnderLock();
            } else {
                assert leader.isPresent() || lastAcceptedNodes == null;
                knownPeers = emptyList();
            }
            return new PeersResponse(leader, knownPeers, currentTerm);
        }
    }

    // exposed for checking invariant in o.e.c.c.Coordinator (public since this is a different package)
    public Optional<DiscoveryNode> getLeader() {
        synchronized (mutex) {
            return leader;
        }
    }

    // exposed for checking invariant in o.e.c.c.Coordinator (public since this is a different package)
    public long getCurrentTerm() {
        return currentTerm;
    }

    public void setCurrentTerm(long currentTerm) {
        this.currentTerm = currentTerm;
    }

    private DiscoveryNode getLocalNode() {
        final DiscoveryNode localNode = transportService.getLocalNode();
        assert localNode != null;
        return localNode;
    }

    protected abstract void onActiveMasterFound(DiscoveryNode masterNode, long term);

    protected abstract void onFoundPeersUpdated();

    public List<TransportAddress> getLastResolvedAddresses() {
        return lastResolvedAddresses;
    }

    public Iterable<DiscoveryNode> getFoundPeers() {
        synchronized (mutex) {
            return getFoundPeersUnderLock();
        }
    }

    private List<DiscoveryNode> getFoundPeersUnderLock() {
        assert holdsLock() : "PeerFinder mutex not held";
        return peersByAddress.values().stream()
            .map(Peer::getDiscoveryNode).filter(Objects::nonNull).distinct().collect(Collectors.toList());
    }

    private Peer createConnectingPeer(TransportAddress transportAddress) {
        Peer peer = new Peer(transportAddress);
        peer.establishConnection();
        return peer;
    }

    /**
     * @return whether any peers were removed due to disconnection
     */
    private boolean handleWakeUp() {
        assert holdsLock() : "PeerFinder mutex not held";

        final boolean peersRemoved = peersByAddress.values().removeIf(Peer::handleWakeUp);

        if (active == false) {
            logger.trace("not active");
            return peersRemoved;
        }

        logger.trace("probing master nodes from cluster state: {}", lastAcceptedNodes);
        for (ObjectCursor<DiscoveryNode> discoveryNodeObjectCursor : lastAcceptedNodes.getMasterNodes().values()) {
            startProbe(discoveryNodeObjectCursor.value.getAddress());
        }

        configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> {
            synchronized (mutex) {
                lastResolvedAddresses = providedAddresses;
                logger.trace("probing resolved transport addresses {}", providedAddresses);
                providedAddresses.forEach(this::startProbe);
            }
        });

        transportService.getThreadPool().scheduleUnlessShuttingDown(findPeersInterval, Names.GENERIC, new AbstractRunnable() {
            @Override
            public boolean isForceExecution() {
                return true;
            }

            @Override
            public void onFailure(Exception e) {
                assert false : e;
                logger.debug("unexpected exception in wakeup", e);
            }

            @Override
            protected void doRun() {
                synchronized (mutex) {
                    if (handleWakeUp() == false) {
                        return;
                    }
                }
                onFoundPeersUpdated();
            }

            @Override
            public String toString() {
                return "PeerFinder handling wakeup";
            }
        });

        return peersRemoved;
    }

    protected void startProbe(TransportAddress transportAddress) {
        assert holdsLock() : "PeerFinder mutex not held";
        if (active == false) {
            logger.trace("startProbe({}) not running", transportAddress);
            return;
        }

        if (transportAddress.equals(getLocalNode().getAddress())) {
            logger.trace("startProbe({}) not probing local node", transportAddress);
            return;
        }

        peersByAddress.computeIfAbsent(transportAddress, this::createConnectingPeer);
    }

    private class Zen1UnicastPingRequestHandler implements TransportRequestHandler<UnicastZenPing.UnicastPingRequest> {
        @Override
        public void messageReceived(UnicastZenPing.UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {
            final PeersRequest peersRequest = new PeersRequest(request.pingResponse.node(),
                Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(emptyList()));
            final PeersResponse peersResponse = handlePeersRequest(peersRequest);
            final List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
            final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
            pingResponses.add(new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(transportService.getLocalNode()),
                peersResponse.getMasterNode().orElse(null),
                clusterName, ClusterState.UNKNOWN_VERSION));
            peersResponse.getKnownPeers().forEach(dn -> pingResponses.add(
                new ZenPing.PingResponse(ZenPing.PingResponse.FAKE_PING_ID,
                    isZen1Node(dn) ? dn : createDiscoveryNodeWithImpossiblyHighId(dn), null, clusterName, ClusterState.UNKNOWN_VERSION)));
            channel.sendResponse(new UnicastZenPing.UnicastPingResponse(request.id, pingResponses.toArray(new ZenPing.PingResponse[0])));
        }
    }

    //......
}
  • The constructor of PeerFinder has registered two handler. One is to execute handlePeersRequest; for REQUEST_PEERS_ACTION_NAME; One is for UnicastZenPing.ACTION_NAME, whose handler is Zen1UnicastPingRequestHandler, in which handlePeersRequest method is also called.
  • HandlePeersRequest is mainly aimed at masterNode and peersRequest.getKnownPeers () execute startProbe one by one, then set knownPeers through getFoundPeersUnderLock, and finally construct PeersResponse through leader, knownPeers and currentTerm to return.
  • The startProbe method is mainly to execute createConnectingPeer, and put the result into the map named peersByAddress whose key is transportAddress. The createConnectingPeer method is mainly to create a Peer and then call the Peer’s establishConnection method

Peer

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

    private class Peer {
        private final TransportAddress transportAddress;
        private SetOnce<DiscoveryNode> discoveryNode = new SetOnce<>();
        private volatile boolean peersRequestInFlight;

        Peer(TransportAddress transportAddress) {
            this.transportAddress = transportAddress;
        }

        @Nullable
        DiscoveryNode getDiscoveryNode() {
            return discoveryNode.get();
        }

        boolean handleWakeUp() {
            assert holdsLock() : "PeerFinder mutex not held";

            if (active == false) {
                return true;
            }

            final DiscoveryNode discoveryNode = getDiscoveryNode();
            // may be null if connection not yet established

            if (discoveryNode != null) {
                if (transportService.nodeConnected(discoveryNode)) {
                    if (peersRequestInFlight == false) {
                        requestPeers();
                    }
                } else {
                    logger.trace("{} no longer connected", this);
                    return true;
                }
            }

            return false;
        }

        void establishConnection() {
            assert holdsLock() : "PeerFinder mutex not held";
            assert getDiscoveryNode() == null : "unexpectedly connected to " + getDiscoveryNode();
            assert active;

            logger.trace("{} attempting connection", this);
            transportAddressConnector.connectToRemoteMasterNode(transportAddress, new ActionListener<DiscoveryNode>() {
                @Override
                public void onResponse(DiscoveryNode remoteNode) {
                    assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible";
                    assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node";
                    synchronized (mutex) {
                        if (active == false) {
                            return;
                        }

                        assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get();
                        discoveryNode.set(remoteNode);
                        requestPeers();
                    }

                    assert holdsLock() == false : "PeerFinder mutex is held in error";
                    onFoundPeersUpdated();
                }

                @Override
                public void onFailure(Exception e) {
                    logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e);
                    synchronized (mutex) {
                        peersByAddress.remove(transportAddress);
                    }
                }
            });
        }

        private void requestPeers() {
            assert holdsLock() : "PeerFinder mutex not held";
            assert peersRequestInFlight == false : "PeersRequest already in flight";
            assert active;

            final DiscoveryNode discoveryNode = getDiscoveryNode();
            assert discoveryNode != null : "cannot request peers without first connecting";

            if (discoveryNode.equals(getLocalNode())) {
                logger.trace("{} not requesting peers from local node", this);
                return;
            }

            logger.trace("{} requesting peers", this);
            peersRequestInFlight = true;

            final List<DiscoveryNode> knownNodes = getFoundPeersUnderLock();

            final TransportResponseHandler<PeersResponse> peersResponseHandler = new TransportResponseHandler<PeersResponse>() {

                @Override
                public PeersResponse read(StreamInput in) throws IOException {
                    return new PeersResponse(in);
                }

                @Override
                public void handleResponse(PeersResponse response) {
                    logger.trace("{} received {}", Peer.this, response);
                    synchronized (mutex) {
                        if (active == false) {
                            return;
                        }

                        peersRequestInFlight = false;

                        response.getMasterNode().map(DiscoveryNode::getAddress).ifPresent(PeerFinder.this::startProbe);
                        response.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(PeerFinder.this::startProbe);
                    }

                    if (response.getMasterNode().equals(Optional.of(discoveryNode))) {
                        // Must not hold lock here to avoid deadlock
                        assert holdsLock() == false : "PeerFinder mutex is held in error";
                        onActiveMasterFound(discoveryNode, response.getTerm());
                    }
                }

                @Override
                public void handleException(TransportException exp) {
                    peersRequestInFlight = false;
                    logger.debug(new ParameterizedMessage("{} peers request failed", Peer.this), exp);
                }

                @Override
                public String executor() {
                    return Names.GENERIC;
                }
            };
            final String actionName;
            final TransportRequest transportRequest;
            final TransportResponseHandler<?> transportResponseHandler;
            if (isZen1Node(discoveryNode)) {
                actionName = UnicastZenPing.ACTION_NAME;
                transportRequest = new UnicastZenPing.UnicastPingRequest(1, ZenDiscovery.PING_TIMEOUT_SETTING.get(settings),
                    new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(getLocalNode()), null,
                        ClusterName.CLUSTER_NAME_SETTING.get(settings), ClusterState.UNKNOWN_VERSION));
                transportResponseHandler = peersResponseHandler.wrap(ucResponse -> {
                    Optional<DiscoveryNode> optionalMasterNode = Arrays.stream(ucResponse.pingResponses)
                        .filter(pr -> discoveryNode.equals(pr.node()) && discoveryNode.equals(pr.master()))
                        .map(ZenPing.PingResponse::node)
                        .findFirst();
                    List<DiscoveryNode> discoveredNodes = new ArrayList<>();
                    if (optionalMasterNode.isPresent() == false) {
                        Arrays.stream(ucResponse.pingResponses).map(PingResponse::master).filter(Objects::nonNull)
                            .forEach(discoveredNodes::add);
                        Arrays.stream(ucResponse.pingResponses).map(PingResponse::node).forEach(discoveredNodes::add);
                    }
                    return new PeersResponse(optionalMasterNode, discoveredNodes, 0L);
                }, UnicastZenPing.UnicastPingResponse::new);
            } else {
                actionName = REQUEST_PEERS_ACTION_NAME;
                transportRequest = new PeersRequest(getLocalNode(), knownNodes);
                transportResponseHandler = peersResponseHandler;
            }
            transportService.sendRequest(discoveryNode, actionName,
                transportRequest,
                TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(),
                transportResponseHandler);
        }

        @Override
        public String toString() {
            return "Peer{" +
                "transportAddress=" + transportAddress +
                ", discoveryNode=" + discoveryNode.get() +
                ", peersRequestInFlight=" + peersRequestInFlight +
                '}';
        }
    }
  • Peer’s establishConnection method mainly requests masterNode through TransportaddressConnector. ConnectorEmotemAsterNode, and executes requestPeers and onFoundPeersUpdated methods when the result returns successfully. If an exception occurs, remove the Peer corresponding to the transportAddress from peersByAddress.
  • The requestPeers method obtains the PeersRequest constructed by knownNodes through getFoundPeersUnderLock method, and then sends the requests to discoveryNode through transportService.sendRequest If the request returns successfully, the startProbe of masterNode and the startProbe method of knownPeers are executed. if the current discoveryNode is masterNode, the onActiveMasterFound method is triggered.
  • The 7.0 version of the code does different processing for the previous version. The request sent by the requestPeers method is UniCastzenPing. UniCastPinGreQuest.

Summary

  • The constructor of PeerFinder has registered two handler. One is to execute handlePeersRequest; for REQUEST_PEERS_ACTION_NAME; One is for UnicastZenPing.ACTION_NAME, whose handler is Zen1UnicastPingRequestHandler, in which handlePeersRequest method is also called.
  • HandlePeersRequest is mainly aimed at masterNode and peersRequest.getKnownPeers () execute startProbe one by one, then set knownPeers through getFoundPeersUnderLock, and finally construct PeersResponse return through leader, knownPeers and currentTerm. The startProbe method is mainly to execute createConnectingPeer, and put the result into the map named peersByAddress whose key is transportAddress. The createConnectingPeer method is mainly to create a Peer and then call the Peer’s establishConnection method
  • Peer’s establishConnection method mainly requests masterNode through TransportaddressConnector. ConnectorEmotemAsterNode, and executes requestPeers and onFoundPeersUpdated methods when the result returns successfully. If there is an exception, remove the Peer; corresponding to the transportAddress from peersByAddress; The requestPeers method obtains the PeersRequest constructed by knownNodes through getFoundPeersUnderLock method, and then sends the requests to discoveryNode through transportService.sendRequest If the request returns successfully, the startProbe of masterNode and the startProbe method of knownPeers are executed. if the current discoveryNode is masterNode, the onActiveMasterFound method is triggered.

doc