Talk about FailureDetector of scalecube-cluster

  distributed-systems

Order

This article mainly studies the FailureDetector of scalecube-cluster.

FailureDetector

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetector.java

/**
 * Failure Detector component responsible for monitoring availability of other members in the
 * cluster. This interface is supposed to be used internally as part cluster membership protocol. It
 * doesn't specify that particular node is failed, but just provide information that either it is
 * suspected or trusted at current moment of time. So it is up to cluster membership or other top
 * level component to define when suspected member is actually failed.
 */
public interface FailureDetector {

  /**
   * Starts running failure detection algorithm. After started it begins to receive and send ping
   * messages.
   */
  void start();

  /** Stops running failure detection algorithm and releases occupied resources. */
  void stop();

  /** Listens for results of ping checks (ALIVE/SUSPECT) done periodically by failure detector. */
  Flux<FailureDetectorEvent> listen();
}
  • FailureDetector defines the start, stop, listen methods.

FailureDetectorImpl

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {

  private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetectorImpl.class);

  // Qualifiers

  public static final String PING = "sc/fdetector/ping";
  public static final String PING_REQ = "sc/fdetector/pingReq";
  public static final String PING_ACK = "sc/fdetector/pingAck";

  // Injected

  private final Member localMember;
  private final Transport transport;
  private final FailureDetectorConfig config;
  private final CorrelationIdGenerator cidGenerator;

  // State

  private long currentPeriod = 0;
  private List<Member> pingMembers = new ArrayList<>();
  private int pingMemberIndex = 0; // index for sequential ping member selection

  // Disposables

  private final Disposable.Composite actionsDisposables = Disposables.composite();

  // Subject
  private final FluxProcessor<FailureDetectorEvent, FailureDetectorEvent> subject =
      DirectProcessor.<FailureDetectorEvent>create().serialize();

  private final FluxSink<FailureDetectorEvent> sink = subject.sink();

  // Scheduled
  private final Scheduler scheduler;

  /**
   * Creates new instance of failure detector with given transport and settings.
   *
   * @param localMember local cluster member
   * @param transport cluster transport
   * @param membershipProcessor membership event processor
   * @param config failure detector settings
   * @param scheduler scheduler
   * @param cidGenerator correlationId generator
   */
  public FailureDetectorImpl(
      Member localMember,
      Transport transport,
      Flux<MembershipEvent> membershipProcessor,
      FailureDetectorConfig config,
      Scheduler scheduler,
      CorrelationIdGenerator cidGenerator) {

    this.localMember = Objects.requireNonNull(localMember);
    this.transport = Objects.requireNonNull(transport);
    this.config = Objects.requireNonNull(config);
    this.scheduler = Objects.requireNonNull(scheduler);
    this.cidGenerator = Objects.requireNonNull(cidGenerator);

    // Subscribe
    actionsDisposables.addAll(
        Arrays.asList(
            membershipProcessor //
                .publishOn(scheduler)
                .subscribe(this::onMemberEvent, this::onError),
            transport
                .listen() //
                .publishOn(scheduler)
                .subscribe(this::onMessage, this::onError)));
  }

  @Override
  public void start() {
    actionsDisposables.add(
        scheduler.schedulePeriodically(
            this::doPing,
            config.getPingInterval(),
            config.getPingInterval(),
            TimeUnit.MILLISECONDS));
  }

  @Override
  public void stop() {
    // Stop accepting requests and sending pings
    actionsDisposables.dispose();

    // Stop publishing events
    sink.complete();
  }

  @Override
  public Flux<FailureDetectorEvent> listen() {
    return subject.onBackpressureBuffer();
  }

  //......

}
  • FailureDetectorImpl implements the FailureDetector interface; It defines the qualifier of 3 message as PING, PING_REQ, PING_ACK; respectively; The pingMembers list is also defined.
  • FailureDetectorImpl’s constructor subscribes to the membershipProcessor triggering onMemberEvent method and to the transport.listen () triggering onMessage method.
  • The start method registers the doPing task through scheduler.schedulePeriodically, and executes every pingInterval, with a default of 5000ms; ; The stop method executes actionsDisposables.dispose () and sink.complete (); Listen () returns subject.onBackpressureBuffer ()

onMemberEvent

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {
  //......

  private void onMemberEvent(MembershipEvent event) {
    Member member = event.member();
    if (event.isRemoved()) {
      pingMembers.remove(member);
    }
    if (event.isAdded()) {
      // insert member into random positions
      int size = pingMembers.size();
      int index = size > 0 ? ThreadLocalRandom.current().nextInt(size) : 0;
      pingMembers.add(index, member);
    }
  }

  //......
}
  • OnMemberEvent removes or adds members to pingMembers using a random index based on MembershipEvent.

onMessage

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {
  //......

  private void onMessage(Message message) {
    if (isPing(message)) {
      onPing(message);
    } else if (isPingReq(message)) {
      onPingReq(message);
    } else if (isTransitPingAck(message)) {
      onTransitPingAck(message);
    }
  }

  private boolean isPing(Message message) {
    return PING.equals(message.qualifier());
  }

  private boolean isPingReq(Message message) {
    return PING_REQ.equals(message.qualifier());
  }

  private boolean isTransitPingAck(Message message) {
    return PING_ACK.equals(message.qualifier())
        && message.<PingData>data().getOriginalIssuer() != null;
  }

  /** Listens to PING message and answers with ACK. */
  private void onPing(Message message) {
    long period = this.currentPeriod;
    LOGGER.trace("Received Ping[{}]", period);
    PingData data = message.data();
    if (!data.getTo().id().equals(localMember.id())) {
      LOGGER.warn(
          "Received Ping[{}] to {}, but local member is {}", period, data.getTo(), localMember);
      return;
    }
    String correlationId = message.correlationId();
    Message ackMessage =
        Message.withData(data)
            .qualifier(PING_ACK)
            .correlationId(correlationId)
            .sender(localMember.address())
            .build();
    Address address = data.getFrom().address();
    LOGGER.trace("Send PingAck[{}] to {}", period, address);
    transport
        .send(address, ackMessage)
        .subscribe(
            null,
            ex ->
                LOGGER.debug(
                    "Failed to send PingAck[{}] to {}, cause: {}", period, address, ex.toString()));
  }

  /** Listens to PING_REQ message and sends PING to requested cluster member. */
  private void onPingReq(Message message) {
    long period = this.currentPeriod;
    LOGGER.trace("Received PingReq[{}]", period);
    PingData data = message.data();
    Member target = data.getTo();
    Member originalIssuer = data.getFrom();
    String correlationId = message.correlationId();
    PingData pingReqData = new PingData(localMember, target, originalIssuer);
    Message pingMessage =
        Message.withData(pingReqData)
            .qualifier(PING)
            .correlationId(correlationId)
            .sender(localMember.address())
            .build();
    Address address = target.address();
    LOGGER.trace("Send transit Ping[{}] to {}", period, address);
    transport
        .send(address, pingMessage)
        .subscribe(
            null,
            ex ->
                LOGGER.debug(
                    "Failed to send transit Ping[{}] to {}, cause: {}",
                    period,
                    address,
                    ex.toString()));
  }

  /**
   * Listens to ACK with message containing ORIGINAL_ISSUER then converts message to plain ACK and
   * sends it to ORIGINAL_ISSUER.
   */
  private void onTransitPingAck(Message message) {
    long period = this.currentPeriod;
    LOGGER.trace("Received transit PingAck[{}]", period);
    PingData data = message.data();
    Member target = data.getOriginalIssuer();
    String correlationId = message.correlationId();
    PingData originalAckData = new PingData(target, data.getTo());
    Message originalAckMessage =
        Message.withData(originalAckData)
            .qualifier(PING_ACK)
            .correlationId(correlationId)
            .sender(localMember.address())
            .build();
    Address address = target.address();
    LOGGER.trace("Resend transit PingAck[{}] to {}", period, address);
    transport
        .send(address, originalAckMessage)
        .subscribe(
            null,
            ex ->
                LOGGER.debug(
                    "Failed to resend transit PingAck[{}] to {}, cause: {}",
                    period,
                    address,
                    ex.toString()));
  }

  //......

}
  • The onMessage method determines whether to perform onPing or onPingReq or onTransitPingAck method according to different qualifier and originalIssuer i nformation of the message. The onPing method returns a PING_ACK message to sender; ; The onPingReq method sends a PING to the member; specified by pingReq; The onTransitPingAck method forwards the ack returned by the pingReq back to the originalIssuer.

doPing

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

public final class FailureDetectorImpl implements FailureDetector {
  //......

  private void doPing() {
    // Increment period counter
    long period = currentPeriod++;

    // Select ping member
    Member pingMember = selectPingMember();
    if (pingMember == null) {
      return;
    }

    // Send ping
    String cid = cidGenerator.nextCid();
    PingData pingData = new PingData(localMember, pingMember);
    Message pingMsg =
        Message.withData(pingData)
            .qualifier(PING)
            .correlationId(cid)
            .sender(localMember.address())
            .build();

    LOGGER.trace("Send Ping[{}] to {}", period, pingMember);
    Address address = pingMember.address();
    transport
        .requestResponse(pingMsg, address)
        .timeout(Duration.ofMillis(config.getPingTimeout()), scheduler)
        .publishOn(scheduler)
        .subscribe(
            message -> {
              LOGGER.trace("Received PingAck[{}] from {}", period, pingMember);
              publishPingResult(period, pingMember, MemberStatus.ALIVE);
            },
            ex -> {
              LOGGER.debug(
                  "Failed to get PingAck[{}] from {} within {} ms",
                  period,
                  pingMember,
                  config.getPingTimeout());

              final int timeLeft = config.getPingInterval() - config.getPingTimeout();
              final List<Member> pingReqMembers = selectPingReqMembers(pingMember);

              if (timeLeft <= 0 || pingReqMembers.isEmpty()) {
                LOGGER.trace("No PingReq[{}] occurred", period);
                publishPingResult(period, pingMember, MemberStatus.SUSPECT);
              } else {
                doPingReq(currentPeriod, pingMember, pingReqMembers, cid);
              }
            });
  }

  private Member selectPingMember() {
    if (pingMembers.isEmpty()) {
      return null;
    }
    if (pingMemberIndex >= pingMembers.size()) {
      pingMemberIndex = 0;
      Collections.shuffle(pingMembers);
    }
    return pingMembers.get(pingMemberIndex++);
  }

  private List<Member> selectPingReqMembers(Member pingMember) {
    if (config.getPingReqMembers() <= 0) {
      return Collections.emptyList();
    }
    List<Member> candidates = new ArrayList<>(pingMembers);
    candidates.remove(pingMember);
    if (candidates.isEmpty()) {
      return Collections.emptyList();
    }
    Collections.shuffle(candidates);
    boolean selectAll = candidates.size() < config.getPingReqMembers();
    return selectAll ? candidates : candidates.subList(0, config.getPingReqMembers());
  }

  private void doPingReq(
      long period, final Member pingMember, final List<Member> pingReqMembers, String cid) {
    Message pingReqMsg =
        Message.withData(new PingData(localMember, pingMember))
            .qualifier(PING_REQ)
            .correlationId(cid)
            .sender(localMember.address())
            .build();
    LOGGER.trace("Send PingReq[{}] to {} for {}", period, pingReqMembers, pingMember);

    Duration timeout = Duration.ofMillis(config.getPingInterval() - config.getPingTimeout());
    pingReqMembers.forEach(
        member ->
            transport
                .requestResponse(pingReqMsg, member.address())
                .timeout(timeout, scheduler)
                .publishOn(scheduler)
                .subscribe(
                    message -> {
                      LOGGER.trace(
                          "Received transit PingAck[{}] from {} to {}",
                          period,
                          message.sender(),
                          pingMember);
                      publishPingResult(period, pingMember, MemberStatus.ALIVE);
                    },
                    throwable -> {
                      LOGGER.trace(
                          "Timeout getting transit PingAck[{}] from {} to {} within {} ms",
                          period,
                          pingReqMembers,
                          pingMember,
                          timeout);
                      publishPingResult(period, pingMember, MemberStatus.SUSPECT);
                    }));
  }

  private void publishPingResult(long period, Member member, MemberStatus status) {
    LOGGER.debug("Member {} detected as {} at [{}]", member, status, period);
    sink.next(new FailureDetectorEvent(member, status));
  }

  //......

}
  • The doPing method first increments the currentPeriod, then randomly selectPingMember through SELECTION PINGGMEMBER, then constructs pingData, then sends a request through transport.requestResponse, and executes publishPingResult when the request succeeds. Under abnormal conditions, pingReqMembers are randomly selected through selectPingReqMembers. if config.getpinginterval ()-config.getpingtimeout () is less than or equal to 0 or pingReqMembers is empty, publishingresult (period, pingmmember, memberstatus.suspect) is executed; otherwise, doPingReq is executed
  • The selectPingMember method resets the index to 0 when the pingMemberIndex is greater than or equal to pingMembers.size (), executes collections.shuffle (pingmembers), and then increments the pingmemberindex; The selectPingReqMembers method creates a new list based on pingMembers, then removes pingmembers to obtain candidates, then collects. shuffle (candidates), and then subList the candidates list to obtain pingReqMembers according to config.getPingReqMembers ()
  • The onPingReq method traverses pingReqMembers to send pingReqMsg to it, and executes publishpingresult (period, pingmember, memberstatus. alive) when receiving transit PingAck, and publishpingresult (period, pingmember, memberstatus. suspect) when an exception occurs; The publishPingResult method puts the FailureDetectorEvent event into the sink

Summary

  • FailureDetector defines the start, stop, listen methods; FailureDetectorImpl implements the FailureDetector interface; It defines the qualifier of 3 message as PING, PING_REQ, PING_ACK; respectively; At the same time, the pingMembers list is also defined. FailureDetectorImpl’s constructor subscribes to the membershipProcessor triggering onMemberEvent method, and subscribes to the transport.listen () triggering onMessage method. The start method registers the doPing task through scheduler.schedulePeriodically, and executes every pingInterval, with a default of 5000ms; ; The stop method executes actionsDisposables.dispose () and sink.complete (); Listen () returns subject.onBackpressureBuffer ()
  • OnMemberEvent will remove or use random index to add member to pingMembers according to MembershipEvent; The onMessage method determines whether to perform onPing or onPingReq or onTransitPingAck method according to different qualifier and originalIssuer information of the message. The onPing method returns a PING_ACK message to sender; ; The onPingReq method sends a PING to the member; specified by pingReq; The onTransitPingAck method forwards the ack returned by the pingReq back to the originalIssuer.
  • The doPing method first increments the currentPeriod, then randomly selectPingMember through SELECTION PINGGMEMBER, then constructs pingData, then sends a request through transport.requestResponse, and executes publishPingResult when the request succeeds. Under abnormal conditions, pingReqMembers are randomly selected through selectPingReqMembers. if config.getpinginterval ()-config.getpingtimeout () is less than or equal to 0 or pingReqMembers is empty, publishingresult (period, pingmmember, memberstatus.suspect) is executed; otherwise, doPingReq is executed

doc