Talk about PhiAccrualFailureDetector of hazelcast

  hazelcast

Order

This article mainly studies PhiAccrualFailureDetector of hazelcast.

FailureDetector

hazelcast-3.12-sources.jar! /com/hazelcast/internal/cluster/fd/FailureDetector.java

/**
 * Failure detector tracks heartbeats of a member and decides liveness/availability of the member.
 */
public interface FailureDetector {

    /**
     * Notifies this failure detector about received heartbeat message from the tracked member.
     *
     * @param timestamp timestamp of heartbeat message in milliseconds
     */
    void heartbeat(long timestamp);

    /**
     * Returns true if the tracked member is considered as alive/available.
     * @param timestamp timestamp in milliseconds
     * @return true if the member is alive
     */
    boolean isAlive(long timestamp);

    /**
     * Returns the last heartbeat timestamp for the tracked member.
     * @return heartbeat timestamp in milliseconds
     */
    long lastHeartbeat();

    /**
     * Returns suspicion level about the tracked member. Returned value is mostly implementation dependent.
     * <code>0</code> indicates no suspicion at all.
     * @param timestamp timestamp in milliseconds
     * @return suspicion level
     */
    double suspicionLevel(long timestamp);
}
  • The FailureDetector interface defines the heartbeat, isAlive, lastHeartbeat, suspicionLevel methods.

PhiAccrualFailureDetector

hazelcast-3.12-sources.jar! /com/hazelcast/internal/cluster/fd/PhiAccrualFailureDetector.java

/**
 * Port of Akka's PhiAccrualFailureDetector.scala
 * <p>
 * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper.
 * <p>
 * The suspicion level of failure is given by a value called φ (phi).
 * The basic idea of the φ failure detector is to express the value of φ on a scale that
 * is dynamically adjusted to reflect current network conditions. A configurable
 * threshold is used to decide if <code>φ</code> is considered to be a failure.
 * <p>
 * The value of <code>φ</code> is calculated as:
 * <p>
 * <code>
 * <pre>
 * φ = -log10(1 - F(timeSinceLastHeartbeat)
 * </pre>
 * </code>
 * where F is the cumulative distribution function of a normal distribution with mean
 * and standard deviation estimated from historical heartbeat inter-arrival times.
 */
public class PhiAccrualFailureDetector implements FailureDetector {

    static final long NO_HEARTBEAT_TIMESTAMP = -1;

    private final double threshold;
    private final double minStdDeviationMillis;
    private final long acceptableHeartbeatPauseMillis;

    private final HeartbeatHistory heartbeatHistory;
    private volatile long lastHeartbeatMillis = NO_HEARTBEAT_TIMESTAMP;

    /**
     * @param threshold                      A low threshold is prone to generate many wrong suspicions but ensures
     *                                       a quick detection in the event of a real crash. Conversely, a high threshold
     *                                       generates fewer mistakes but needs more time to detect actual crashes
     * @param maxSampleSize                  Number of samples to use for calculation of mean and standard deviation of
     *                                       inter-arrival times.
     * @param minStdDeviationMillis          Minimum standard deviation to use for the normal distribution used when
     *                                       calculating phi. Too low standard deviation might result in too much sensitivity
     *                                       for sudden, but normal, deviations in heartbeat inter arrival times.
     * @param acceptableHeartbeatPauseMillis Duration corresponding to number of potentially lost/delayed
     *                                       heartbeats that will be accepted before considering it to be an anomaly.
     *                                       This margin is important to be able to survive sudden, occasional, pauses
     *                                       in heartbeat arrivals, due to for example garbage collect or network drop.
     * @param firstHeartbeatEstimateMillis   Bootstrap the stats with heartbeats that corresponds to this duration,
     *                                       with a with rather high standard deviation (since environment is unknown
     *                                       in the beginning)
     */
    public PhiAccrualFailureDetector(double threshold, int maxSampleSize, double minStdDeviationMillis,
            long acceptableHeartbeatPauseMillis, long firstHeartbeatEstimateMillis) {

        this.threshold = checkPositive(threshold, "Threshold must be positive: " + threshold);
        this.minStdDeviationMillis = checkPositive(minStdDeviationMillis, "Minimum standard deviation must be positive: "
                + minStdDeviationMillis);

        this.acceptableHeartbeatPauseMillis = checkNotNegative(acceptableHeartbeatPauseMillis,
                "Acceptable heartbeat pause millis must be >= 0: " + acceptableHeartbeatPauseMillis);

        checkPositive(firstHeartbeatEstimateMillis, "First heartbeat value must be > 0: " + firstHeartbeatEstimateMillis);

        heartbeatHistory = new HeartbeatHistory(maxSampleSize);
        firstHeartbeat(firstHeartbeatEstimateMillis);
    }

    // guess statistics for first heartbeat,
    // important so that connections with only one heartbeat becomes unavailable
    // bootstrap with 2 entries with rather high standard deviation
    @SuppressWarnings("checkstyle:magicnumber")
    private void firstHeartbeat(long firstHeartbeatEstimateMillis) {
        long stdDeviationMillis = firstHeartbeatEstimateMillis / 4;
        heartbeatHistory.add(firstHeartbeatEstimateMillis - stdDeviationMillis);
        heartbeatHistory.add(firstHeartbeatEstimateMillis + stdDeviationMillis);
    }

    private double ensureValidStdDeviation(double stdDeviationMillis) {
        return Math.max(stdDeviationMillis, minStdDeviationMillis);
    }

    /**
     * The suspicion level of the accrual failure detector.
     *
     * If a connection does not have any records in failure detector then it is
     * considered healthy.
     */
    private double phi(long timestampMillis) {
        long timeDiffMillis;
        double meanMillis;
        double stdDeviationMillis;

        synchronized (heartbeatHistory) {
            long lastTimestampMillis = lastHeartbeatMillis;
            if (lastTimestampMillis == NO_HEARTBEAT_TIMESTAMP) {
                return 0.0;
            }

            timeDiffMillis = timestampMillis - lastTimestampMillis;
            meanMillis = heartbeatHistory.mean();
            stdDeviationMillis = ensureValidStdDeviation(heartbeatHistory.stdDeviation());
        }

        return phi(timeDiffMillis, meanMillis + acceptableHeartbeatPauseMillis, stdDeviationMillis);
    }

    /**
     * Calculation of phi, derived from the Cumulative distribution function for
     * N(mean, stdDeviation) normal distribution, given by
     * 1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y)))
     * where y = (x - mean) / standard_deviation
     * This is an approximation defined in β Mathematics Handbook (Logistic approximation).
     * Error is 0.00014 at +- 3.16
     * The calculated value is equivalent to -log10(1 - CDF(y))
     */
    @SuppressWarnings("checkstyle:magicnumber")
    private static double phi(long timeDiffMillis, double meanMillis, double stdDeviationMillis) {
        double y = (timeDiffMillis - meanMillis) / stdDeviationMillis;
        double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
        if (timeDiffMillis > meanMillis) {
            return -Math.log10(e / (1.0 + e));
        } else {
            return -Math.log10(1.0 - 1.0 / (1.0 + e));
        }
    }

    @Override
    public boolean isAlive(long timestampMillis) {
        double phi = phi(timestampMillis);
        return phi < threshold;
    }

    @Override
    public void heartbeat(long timestampMillis) {
        synchronized (heartbeatHistory) {
            long lastTimestampMillis = getAndSetLastHeartbeat(timestampMillis);
            if (lastTimestampMillis == NO_HEARTBEAT_TIMESTAMP) {
                return;
            }

            if (isAlive(timestampMillis)) {
                heartbeatHistory.add(timestampMillis - lastTimestampMillis);
            }
        }
    }

    private long getAndSetLastHeartbeat(long timestampMillis) {
        long lastTimestampMillis = lastHeartbeatMillis;
        lastHeartbeatMillis = timestampMillis;
        return lastTimestampMillis;
    }

    @Override
    public long lastHeartbeat() {
        return lastHeartbeatMillis;
    }

    @Override
    public double suspicionLevel(long timestamp) {
        return phi(timestamp);
    }

    /**
     * Holds the heartbeat statistics for a specific member.
     * It is capped by the number of samples specified in `maxSampleSize`.
     *
     * The stats (mean, variance, stdDeviation) are not defined for
     * for empty HeartbeatHistory, i.e. throws ArithmeticException.
     */
    private static class HeartbeatHistory {
        private final int maxSampleSize;
        private final LinkedList<Long> intervals = new LinkedList<Long>();
        private long intervalSum;
        private long squaredIntervalSum;

        HeartbeatHistory(int maxSampleSize) {
            if (maxSampleSize < 1) {
                throw new IllegalArgumentException("Sample size must be >= 1 : " + maxSampleSize);
            }
            this.maxSampleSize = maxSampleSize;
        }

        double mean() {
            return (double) intervalSum / intervals.size();
        }

        double variance() {
            double mean = mean();
            return ((double) squaredIntervalSum / intervals.size()) - (mean * mean);
        }

        double stdDeviation() {
            return Math.sqrt(variance());
        }

        void add(long interval) {
            if (intervals.size() >= maxSampleSize) {
                dropOldest();
            }
            intervals.add(interval);
            intervalSum += interval;
            squaredIntervalSum += pow2(interval);
        }

        private void dropOldest() {
            long dropped = intervals.pollFirst();
            intervalSum -= dropped;
            squaredIntervalSum -= pow2(dropped);
        }

        private static long pow2(long x) {
            return x * x;
        }
    }
}
  • PhiAccrualFailureDetector implements the FailureDetector interface, which is implemented by akka.PhiAccrualFailureDetector.scalaJava version of
  • Phi is the suspicion level whose specified value is deemed as failure, and its calculation formula is φ =-log10 (1-CDF), where CDF function is the cumulative distribution function of normal distribution, that is, the cumulative distribution function of normal distribution
  • Phi method uses Logistic approximation formula defined in β Mathematics Handbook to approximate CDF(y) (Error is 0.00014 at +- 3.16), i.e. CDF(y)=1.0/(1.0+math.exp(-y)(1.5976 + 0.070566Y * y))), where y = (x-mean)/standard_deviation

CDF(y) when x > mean (e<1) convert the formula to φ =-math.log10 (e/(1.0+e)); When x <= mean (e>=1) using the formula -Math.log10(1.0-1.0/(1.0+e)), it is not clear why this distinction is made

  • The isAlive method calculates the phi value of the timestampMillis and then compares it with the threshold value (The default value in hazelcast is 10.) judgment, less than threshold value to judge as live
  • The heartbeat method will first determine whether the timestampmillions live, and if so, then add the timestampmillions-lasttimestammillions value to the heartbeatHistory.
  • The implementation here adds the AcceptableSeartbeatUseMillis parameter, that is, the passed-in meanMillis is MeanMillis+AcceptableSeartbeatUseMillis when the phi value is finally calculated.

Summary

  • FailureDetector interface defines the heartbeat, isAlive, lastHeartbeat, suspicionLevel methods; PhiAccrualFailureDetector implements the FailureDetector interface, which is implemented by akka.PhiAccrualFailureDetector.scalaJava version of
  • Phi is the suspicion level whose specified value is deemed as failure, and its calculation formula is φ =-log10 (1-CDF), where CDF function is the cumulative distribution function of normal distribution, that is, the cumulative distribution function of normal distribution
  • In akka’s implementation, its phi method uses the Logistic approximation formula defined in β Mathematics Handbook to approximately calculate CDF(y) (Error is 0.00014 at +- 3.16), i.e. CDF(y)=1.0/(1.0+math.exp(-y)(1.5976 + 0.070566Y * y))), where y = (x-mean)/standard _ deviation; In addition, the AcceptableSeartbeatUseMillis parameter has been added, i.e. the passed-in meanMillis is MeanMillis+AcceptableSeartbeatUseMillis when the phi value is finally calculated.
  • The isAlive method calculates the phi value of the timestampMillis and then compares it with the threshold value (The default value in hazelcast is 10.) judgment, less than threshold value to judge as live
  • The heartbeat method will first determine whether the timestampmillions live, and if so, then add the timestampmillions-lasttimestammillions value to the heartbeatHistory.

doc