Talk about scalecube-cluster MembershipProtocol.

  distributed-systems

Order

This article mainly studies the MembershipProtocol of scalecube-cluster.

MembershipProtocol

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocol.java

/**
 * Cluster Membership Protocol component responsible for managing information about existing members
 * of the cluster.
 */
public interface MembershipProtocol {

  /**
   * Starts running cluster membership protocol. After started it begins to receive and send cluster
   * membership messages
   */
  Mono<Void> start();

  /** Stops running cluster membership protocol and releases occupied resources. */
  void stop();

  /** Listen changes in cluster membership. */
  Flux<MembershipEvent> listen();

  /**
   * Returns list of all members of the joined cluster. This will include all cluster members
   * including local member.
   *
   * @return all members in the cluster (including local one)
   */
  Collection<Member> members();

  /**
   * Returns list of all cluster members of the joined cluster excluding local member.
   *
   * @return all members in the cluster (excluding local one)
   */
  Collection<Member> otherMembers();

  /**
   * Returns local cluster member which corresponds to this cluster instance.
   *
   * @return local member
   */
  Member member();

  /**
   * Returns cluster member with given id or null if no member with such id exists at joined
   * cluster.
   *
   * @return member by id
   */
  Optional<Member> member(String id);

  /**
   * Returns cluster member by given address or null if no member with such address exists at joined
   * cluster.
   *
   * @return member by address
   */
  Optional<Member> member(Address address);
}
  • The MembershipProtocol interface defines the start, stop, listen, members, otherMembers, member methods.

MembershipProtocolImpl

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {

  private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocolImpl.class);

  private enum MembershipUpdateReason {
    FAILURE_DETECTOR_EVENT,
    MEMBERSHIP_GOSSIP,
    SYNC,
    INITIAL_SYNC,
    SUSPICION_TIMEOUT
  }

  // Qualifiers

  public static final String SYNC = "sc/membership/sync";
  public static final String SYNC_ACK = "sc/membership/syncAck";
  public static final String MEMBERSHIP_GOSSIP = "sc/membership/gossip";

  private final Member localMember;

  // Injected

  private final Transport transport;
  private final MembershipConfig config;
  private final List<Address> seedMembers;
  private final FailureDetector failureDetector;
  private final GossipProtocol gossipProtocol;
  private final MetadataStore metadataStore;
  private final CorrelationIdGenerator cidGenerator;

  // State

  private final Map<String, MembershipRecord> membershipTable = new HashMap<>();
  private final Map<String, Member> members = new HashMap<>();

  // Subject

  private final FluxProcessor<MembershipEvent, MembershipEvent> subject =
      DirectProcessor.<MembershipEvent>create().serialize();
  private final FluxSink<MembershipEvent> sink = subject.sink();

  // Disposables
  private final Disposable.Composite actionsDisposables = Disposables.composite();

  // Scheduled
  private final Scheduler scheduler;
  private final Map<String, Disposable> suspicionTimeoutTasks = new HashMap<>();

  /**
   * Creates new instantiates of cluster membership protocol with given transport and config.
   *
   * @param localMember local cluster member
   * @param transport cluster transport
   * @param failureDetector failure detector
   * @param gossipProtocol gossip protocol
   * @param metadataStore metadata store
   * @param config membership config parameters
   * @param scheduler scheduler
   * @param cidGenerator correlation id generator
   */
  public MembershipProtocolImpl(
      Member localMember,
      Transport transport,
      FailureDetector failureDetector,
      GossipProtocol gossipProtocol,
      MetadataStore metadataStore,
      MembershipConfig config,
      Scheduler scheduler,
      CorrelationIdGenerator cidGenerator) {

    this.transport = Objects.requireNonNull(transport);
    this.config = Objects.requireNonNull(config);
    this.failureDetector = Objects.requireNonNull(failureDetector);
    this.gossipProtocol = Objects.requireNonNull(gossipProtocol);
    this.metadataStore = Objects.requireNonNull(metadataStore);
    this.localMember = Objects.requireNonNull(localMember);
    this.scheduler = Objects.requireNonNull(scheduler);
    this.cidGenerator = Objects.requireNonNull(cidGenerator);

    // Prepare seeds
    seedMembers = cleanUpSeedMembers(config.getSeedMembers());

    // Init membership table with local member record
    membershipTable.put(localMember.id(), new MembershipRecord(localMember, ALIVE, 0));

    // fill in the table of members with local member
    members.put(localMember.id(), localMember);

    actionsDisposables.addAll(
        Arrays.asList(
            // Listen to incoming SYNC and SYNC ACK requests from other members
            transport
                .listen() //
                .publishOn(scheduler)
                .subscribe(this::onMessage, this::onError),

            // Listen to events from failure detector
            failureDetector
                .listen()
                .publishOn(scheduler)
                .subscribe(this::onFailureDetectorEvent, this::onError),

            // Listen to membership gossips
            gossipProtocol
                .listen()
                .publishOn(scheduler)
                .subscribe(this::onMembershipGossip, this::onError)));
  }

  @Override
  public Flux<MembershipEvent> listen() {
    return subject.onBackpressureBuffer();
  }

  @Override
  public Mono<Void> start() {
    // Make initial sync with all seed members
    return Mono.create(
        sink -> {
          // In case no members at the moment just schedule periodic sync
          if (seedMembers.isEmpty()) {
            schedulePeriodicSync();
            sink.success();
            return;
          }
          // If seed addresses are specified in config - send initial sync to those nodes
          LOGGER.debug("Making initial Sync to all seed members: {}", seedMembers);

          //noinspection unchecked
          Mono<Message>[] syncs =
              seedMembers
                  .stream()
                  .map(
                      address -> {
                        String cid = cidGenerator.nextCid();
                        return transport
                            .requestResponse(prepareSyncDataMsg(SYNC, cid), address)
                            .filter(this::checkSyncGroup);
                      })
                  .toArray(Mono[]::new);

          // Process initial SyncAck
          Flux.mergeDelayError(syncs.length, syncs)
              .take(1)
              .timeout(Duration.ofMillis(config.getSyncTimeout()), scheduler)
              .publishOn(scheduler)
              .flatMap(message -> onSyncAck(message, true))
              .doFinally(
                  s -> {
                    schedulePeriodicSync();
                    sink.success();
                  })
              .subscribe(
                  null,
                  ex -> LOGGER.info("Exception on initial SyncAck, cause: {}", ex.toString()));
        });
  }

  @Override
  public void stop() {
    // Stop accepting requests, events and sending sync
    actionsDisposables.dispose();

    // Cancel remove members tasks
    for (String memberId : suspicionTimeoutTasks.keySet()) {
      Disposable future = suspicionTimeoutTasks.get(memberId);
      if (future != null && !future.isDisposed()) {
        future.dispose();
      }
    }
    suspicionTimeoutTasks.clear();

    // Stop publishing events
    sink.complete();
  }

  @Override
  public Collection<Member> members() {
    return new ArrayList<>(members.values());
  }

  @Override
  public Collection<Member> otherMembers() {
    return new ArrayList<>(members.values())
        .stream()
        .filter(member -> !member.equals(localMember))
        .collect(Collectors.toList());
  }

  @Override
  public Member member() {
    return localMember;
  }

  @Override
  public Optional<Member> member(String id) {
    return Optional.ofNullable(members.get(id));
  }

  @Override
  public Optional<Member> member(Address address) {
    return new ArrayList<>(members.values())
        .stream()
        .filter(member -> member.address().equals(address))
        .findFirst();
  }

  //......

}
  • MembershipProtocol implements membershipprotocol interface; It defines the MembershipUpdateReason enumeration (FAILURE_DETECTOR_EVENT、MEMBERSHIP_GOSSIP、SYNC、INITIAL_SYNC、SUSPICION_TIMEOUT)
  • The constructor of MembershipProtocolImpl listened to the transport.listen () trigger onMessage method; Listen () triggered the failureDetector.listen method; Listen () triggered onMembershipGossip method
  • The start method of MembershipProtocolImpl will execute the schedulePeriodicSync method when seedMembers.isEmpty (), that is, the doSync method will be executed every syncInterval. When seedMembers is not empty, it traverses seedMembers to send SYNC through transport.requestResponse, and upon successful execution, onSyncAck; is triggered; The stop method destroys the future of suspicionTimeoutTasks one by one.

onMessage

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {
  //......

  private void onMessage(Message message) {
    if (checkSyncGroup(message)) {
      if (SYNC.equals(message.qualifier())) {
        onSync(message).subscribe(null, this::onError);
      }
      if (SYNC_ACK.equals(message.qualifier())) {
        if (message.correlationId() == null) { // filter out initial sync
          onSyncAck(message, false).subscribe(null, this::onError);
        }
      }
    }
  }

  private boolean checkSyncGroup(Message message) {
    if (message.data() instanceof SyncData) {
      SyncData syncData = message.data();
      return config.getSyncGroup().equals(syncData.getSyncGroup());
    }
    return false;
  }

  /** Merges incoming SYNC data, merges it and sending back merged data with SYNC_ACK. */
  private Mono<Void> onSync(Message syncMsg) {
    return Mono.defer(
        () -> {
          LOGGER.debug("Received Sync: {}", syncMsg);
          return syncMembership(syncMsg.data(), false)
              .doOnSuccess(
                  avoid -> {
                    Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId());
                    Address address = syncMsg.sender();
                    transport
                        .send(address, message)
                        .subscribe(
                            null,
                            ex ->
                                LOGGER.debug(
                                    "Failed to send SyncAck: {} to {}, cause: {}",
                                    message,
                                    address,
                                    ex.toString()));
                  });
        });
  }

  private Mono<Void> onSyncAck(Message syncAckMsg, boolean onStart) {
    return Mono.defer(
        () -> {
          LOGGER.debug("Received SyncAck: {}", syncAckMsg);
          return syncMembership(syncAckMsg.data(), onStart);
        });
  }

  private Mono<Void> syncMembership(SyncData syncData, boolean onStart) {
    return Mono.defer(
        () -> {
          MembershipUpdateReason reason =
              onStart ? MembershipUpdateReason.INITIAL_SYNC : MembershipUpdateReason.SYNC;
          return Mono.whenDelayError(
              syncData
                  .getMembership()
                  .stream()
                  .filter(r1 -> !r1.equals(membershipTable.get(r1.id())))
                  .map(r1 -> updateMembership(r1, reason))
                  .toArray(Mono[]::new));
        });
  }

  //......

}
  • The onMessage method first checks whether the syncGroup’s message is through checkSyncGroup. After that, onSYNC is executed according to whether message.qualifier () is sync or SYNC_ACK.
  • The onSync method executes syncMembership and returns SYNC_ACK information to the sender when it succeeds. The onSyncAck method also calls syncMembership, but does not return information to sender.
  • The syncMembership method executes the updateMembership method one by one according to syncData’s membership.

onFailureDetectorEvent

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {
  //......

  /** Merges FD updates and processes them. */
  private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {
    MembershipRecord r0 = membershipTable.get(fdEvent.member().id());
    if (r0 == null) { // member already removed
      return;
    }
    if (r0.status() == fdEvent.status()) { // status not changed
      return;
    }
    LOGGER.debug("Received status change on failure detector event: {}", fdEvent);
    if (fdEvent.status() == ALIVE) {
      // TODO: Consider to make more elegant solution
      // Alive won't override SUSPECT so issue instead extra sync with member to force it spread
      // alive with inc + 1
      Message syncMsg = prepareSyncDataMsg(SYNC, null);
      Address address = fdEvent.member().address();
      transport
          .send(address, syncMsg)
          .subscribe(
              null,
              ex ->
                  LOGGER.debug(
                      "Failed to send {} to {}, cause: {}", syncMsg, address, ex.toString()));
    } else {
      MembershipRecord record =
          new MembershipRecord(r0.member(), fdEvent.status(), r0.incarnation());
      updateMembership(record, MembershipUpdateReason.FAILURE_DETECTOR_EVENT)
          .subscribe(null, this::onError);
    }
  }

  //......

}
  • The onFailureDetectorEvent method judges whether the state of the MembershipRecord has changed according to FailureDetectorEvent, and if it becomes ALIVE, sends SYNC information to fdEvent.member().address (); Failure _ detector _ event to updateMembership.

onMembershipGossip

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {
  //......

  /** Merges received membership gossip (not spreading gossip further). */
  private void onMembershipGossip(Message message) {
    if (MEMBERSHIP_GOSSIP.equals(message.qualifier())) {
      MembershipRecord record = message.data();
      LOGGER.debug("Received membership gossip: {}", record);
      updateMembership(record, MembershipUpdateReason.MEMBERSHIP_GOSSIP)
          .subscribe(null, this::onError);
    }
  }

  //......

}
  • The onMembershipGossip method uses MembershipUpdateReason. MembershipGossip to updateMembership for messages whose message.qualifier () is membershipGossip.

updateMembership

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {
  //......

  /**
   * Try to update membership table with the given record.
   *
   * @param r1 new membership record which compares with existing r0 record
   * @param reason indicating the reason for updating membership table
   */
  private Mono<Void> updateMembership(MembershipRecord r1, MembershipUpdateReason reason) {
    return Mono.defer(
        () -> {
          Objects.requireNonNull(r1, "Membership record can't be null");
          // Get current record
          MembershipRecord r0 = membershipTable.get(r1.id());

          // Check if new record r1 overrides existing membership record r0
          if (!r1.isOverrides(r0)) {
            return Mono.empty();
          }

          // If received updated for local member then increase incarnation and spread Alive gossip
          if (r1.member().id().equals(localMember.id())) {
            int currentIncarnation = Math.max(r0.incarnation(), r1.incarnation());
            MembershipRecord r2 =
                new MembershipRecord(localMember, r0.status(), currentIncarnation + 1);

            membershipTable.put(localMember.id(), r2);

            LOGGER.debug(
                "Local membership record r0: {}, but received r1: {}, "
                    + "spread with increased incarnation r2: {}",
                r0,
                r1,
                r2);

            spreadMembershipGossip(r2)
                .subscribe(
                    null,
                    ex -> {
                      // on-op
                    });
            return Mono.empty();
          }

          // Update membership
          if (r1.isDead()) {
            membershipTable.remove(r1.id());
          } else {
            membershipTable.put(r1.id(), r1);
          }

          // Schedule/cancel suspicion timeout task
          if (r1.isSuspect()) {
            scheduleSuspicionTimeoutTask(r1);
          } else {
            cancelSuspicionTimeoutTask(r1.id());
          }

          // Emit membership and regardless of result spread gossip
          return emitMembershipEvent(r0, r1)
              .doFinally(
                  s -> {
                    // Spread gossip (unless already gossiped)
                    if (reason != MembershipUpdateReason.MEMBERSHIP_GOSSIP
                        && reason != MembershipUpdateReason.INITIAL_SYNC) {
                      spreadMembershipGossip(r1)
                          .subscribe(
                              null,
                              ex -> {
                                // no-op
                              });
                    }
                  });
        });
  }

  private Mono<Void> spreadMembershipGossip(MembershipRecord record) {
    return Mono.defer(
        () -> {
          Message msg = Message.withData(record).qualifier(MEMBERSHIP_GOSSIP).build();
          LOGGER.debug("Spead membreship: {} with gossip", msg);
          return gossipProtocol
              .spread(msg)
              .doOnError(
                  ex ->
                      LOGGER.debug(
                          "Failed to spread membership: {} with gossip, cause: {}",
                          msg,
                          ex.toString()))
              .then();
        });
  }

  private void scheduleSuspicionTimeoutTask(MembershipRecord record) {
    long suspicionTimeout =
        ClusterMath.suspicionTimeout(
            config.getSuspicionMult(), membershipTable.size(), config.getPingInterval());
    suspicionTimeoutTasks.computeIfAbsent(
        record.id(),
        id ->
            scheduler.schedule(
                () -> onSuspicionTimeout(id), suspicionTimeout, TimeUnit.MILLISECONDS));
  }

  private void onSuspicionTimeout(String memberId) {
    suspicionTimeoutTasks.remove(memberId);
    MembershipRecord record = membershipTable.get(memberId);
    if (record != null) {
      LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", record);
      MembershipRecord deadRecord =
          new MembershipRecord(record.member(), DEAD, record.incarnation());
      updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT)
          .subscribe(null, this::onError);
    }
  }

  private void cancelSuspicionTimeoutTask(String memberId) {
    Disposable future = suspicionTimeoutTasks.remove(memberId);
    if (future != null && !future.isDisposed()) {
      future.dispose();
    }
  }

  private Mono<Void> emitMembershipEvent(MembershipRecord r0, MembershipRecord r1) {
    return Mono.defer(
        () -> {
          final Member member = r1.member();

          if (r1.isDead()) {
            members.remove(member.id());
            // removed
            return Mono.fromRunnable(
                () -> {
                  Map<String, String> metadata = metadataStore.removeMetadata(member);
                  sink.next(MembershipEvent.createRemoved(member, metadata));
                });
          }

          if (r0 == null && r1.isAlive()) {
            members.put(member.id(), member);
            // added
            return metadataStore
                .fetchMetadata(member)
                .doOnSuccess(
                    metadata -> {
                      metadataStore.updateMetadata(member, metadata);
                      sink.next(MembershipEvent.createAdded(member, metadata));
                    })
                .onErrorResume(TimeoutException.class, e -> Mono.empty())
                .then();
          }

          if (r0 != null && r0.incarnation() < r1.incarnation()) {
            // updated
            return metadataStore
                .fetchMetadata(member)
                .doOnSuccess(
                    metadata1 -> {
                      Map<String, String> metadata0 =
                          metadataStore.updateMetadata(member, metadata1);
                      sink.next(MembershipEvent.createUpdated(member, metadata0, metadata1));
                    })
                .onErrorResume(TimeoutException.class, e -> Mono.empty())
                .then();
          }

          return Mono.empty();
        });
  }

  //......

}
  • UpdateMembership compares the incoming membershipcORD with the localMember. If it is necessary to update the localMember, it will execute spreadMembershipGossip, and then perform different processing according to the status of the membershipcORD. For example, isDead is removed from membershipTable, for example isSuspect executes scheduleSuspicionTimeoutTask, otherwise cancelSuspicionTimeoutTask is executed, and finally emitMembershipEvent and spreadMembershipGossip are executed.
  • The scheduleSuspicionTimeoutTask method calculates suspicionTimeout and then registers a SuspicionTimeout delayed task if suspicionTimeoutTasks do not have a task with the record.id (); OnSuspicionTimeout first removes the task from suspicionTimeoutTasks, and then uses MembershipUpdateReason. Suspicion _ Timeout to updateMembership; ; The cancelSuspicionTimeoutTask method also removes the task from suspicionTimeoutTasks and dispose of the future.
  • The emitMembershipEvent method mainly updates the Metadata of the member in the metadataStore here. If it is isDead, the metadata store.removemetadata (member) will be executed, while the others will be executed as the case may be.

schedulePeriodicSync

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

public final class MembershipProtocolImpl implements MembershipProtocol {
  //......

  private void schedulePeriodicSync() {
    int syncInterval = config.getSyncInterval();
    actionsDisposables.add(
        scheduler.schedulePeriodically(
            this::doSync, syncInterval, syncInterval, TimeUnit.MILLISECONDS));
  }

  private void doSync() {
    Optional<Address> addressOptional = selectSyncAddress();
    if (!addressOptional.isPresent()) {
      return;
    }

    Address address = addressOptional.get();
    Message message = prepareSyncDataMsg(SYNC, null);
    LOGGER.debug("Send Sync: {} to {}", message, address);
    transport
        .send(address, message)
        .subscribe(
            null,
            ex ->
                LOGGER.debug(
                    "Failed to send Sync: {} to {}, cause: {}", message, address, ex.toString()));
  }

  private Optional<Address> selectSyncAddress() {
    List<Address> addresses =
        Stream.concat(seedMembers.stream(), otherMembers().stream().map(Member::address))
            .collect(Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new));
    Collections.shuffle(addresses);
    if (addresses.isEmpty()) {
      return Optional.empty();
    } else {
      int i = ThreadLocalRandom.current().nextInt(addresses.size());
      return Optional.of(addresses.get(i));
    }
  }

  private Message prepareSyncDataMsg(String qualifier, String cid) {
    List<MembershipRecord> membershipRecords = new ArrayList<>(membershipTable.values());
    SyncData syncData = new SyncData(membershipRecords, config.getSyncGroup());
    return Message.withData(syncData)
        .qualifier(qualifier)
        .correlationId(cid)
        .sender(localMember.address())
        .build();
  }

  //......
}
  • SchedulePeriodically registers doSync tasks to be executed every syncInterval; The doSync method first calls selectSyncAddress to randomly select a member as the target for sending SYNC, then constructs a sync message through prepareSyncDataMsg, and then sends it through transport.send

Summary

  • The MembershipProtocol interface defines the start, stop, listen, members, otherMembers, member methods; MembershipProtocol implements membershipprotocol interface; It defines the MembershipUpdateReason enumeration (FAILURE_DETECTOR_EVENT、MEMBERSHIP_GOSSIP、SYNC、INITIAL_SYNC、SUSPICION_TIMEOUT)
  • The constructor of MembershipProtocolImpl listened to the transport.listen () trigger onMessage method; Listen () triggered the failureDetector.listen method; Listen () triggered onMembershipGossip method; The start method of MembershipProtocolImpl will execute the schedulePeriodicSync method when seedMembers.isEmpty (), that is, the doSync method will be executed every syncInterval. When seedMembers is not empty, it traverses seedMembers to send SYNC through transport.requestResponse, and upon successful execution, onSyncAck; is triggered; The stop method destroys the future of suspicionTimeoutTasks one by one.
  • The onMessage method first checks whether the syncGroup’s message is through checkSyncGroup. After that, onSync is executed according to whether message.qualifier () is SYNC or SYNC_ACK. The onSync method executes syncMembership and returns SYNC_ACK information to the sender when it succeeds. The onSyncAck method also calls syncMembership, but does not return information to sender. The syncMembership method executes the updateMembership method one by one according to syncData’s membership.
  • The onFailureDetectorEvent method judges whether the state of the MembershipRecord has changed according to FailureDetectorEvent, and if it becomes ALIVE, sends SYNC information to fdEvent.member().address (); Failure _ detector _ event to updateMembership; ; The onMembershipGossip method uses MembershipUpdateReason. MembershipGossip to updateMembership for messages whose message.qualifier () is membershipGossip.
  • UpdateMembership compares the incoming membershipcORD with the localMember. If it is necessary to update the localMember, it will execute spreadMembershipGossip, and then perform different processing according to the status of the membershipcORD. For example, isDead is removed from membershipTable, for example isSuspect executes scheduleSuspicionTimeoutTask, otherwise cancelSuspicionTimeoutTask is executed, and finally emitMembershipEvent and spreadMembershipGossip are executed.
  • SchedulePeriodically registers doSync tasks to be executed every syncInterval; The doSync method first calls selectSyncAddress to randomly select a member as the target for sending SYNC, then constructs a sync message through prepareSyncDataMsg, and then sends it through transport.send

The start method of MembershipProtocolImpl registers the doSync task (Performed every syncInterval), the task will send SYNC messages to randomly selected member to sync the full amount of membershipRecords;; The onMessage method executes syncMembership when it receives a SYNC message and returns SYNC_ACK when it succeeds, and also executes SYNC membership when it receives SYNC_ACK; OnFailureDetectorEvent and onMembershipGossip methods both trigger UpdateMembershipMethod to update membershipTable. spreadMembershipGossip is required.

doc