Talk about Ewma of rseockeloadbalancers.

  rsocket

Order

This article mainly studies Ewma of rseockeloadbalancer.

Moving Average

SMA

SMA(Simple Moving Average), namely simple moving average, the formula is as follows:

SMAt = (Pt + Pt-1 + Pt-2 + Pt-3 + ... + Pt-n+1) / n

Here, Pt to Pt-n+1 is the latest n data

WMA

WMA(Weighted Moving Average), or weighted moving average, has the following formula:

WMAt = (Pt * Wt) + (Pt-1 * Wt-1) + ... + (Pt-n+1 * Wt-n+1)

WMA will add weights to the latest N data, where the sum of these weights is 1, and the weight of the nearest data is generally larger.

EMA or EWMA

EMA(Exponentially Moving Average) Index Moving Average or EWMA (Exponentially Weighted Moving Average) Exponentially weighted moving average, the formula is as follows:

EMAt = (Pt * S) + (1- S) * EMAt-1

It has an S parameter of smoothness index, which is generally 2/(N+1)

Ewma

rsocket-load-balancer-0.12.1-sources.jar! /io/rsocket/stat/Ewma.java

public class Ewma {
  private final long tau;
  private volatile long stamp;
  private volatile double ewma;

  public Ewma(long halfLife, TimeUnit unit, double initialValue) {
    this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
    stamp = 0L;
    ewma = initialValue;
  }

  public synchronized void insert(double x) {
    long now = Clock.now();
    double elapsed = Math.max(0, now - stamp);
    stamp = now;

    double w = Math.exp(-elapsed / tau);
    ewma = w * ewma + (1.0 - w) * x;
  }

  public synchronized void reset(double value) {
    stamp = 0L;
    ewma = value;
  }

  public double value() {
    return ewma;
  }

  @Override
  public String toString() {
    return "Ewma(value=" + ewma + ", age=" + (Clock.now() - stamp) + ")";
  }
}
  • Ewma’s constructor needs to specify halfLife, timeunit, initialValue (Ewma initial value) parameters; ewma = wewma + (1.0 – w)X, where x is the current value and w is the weight.
  • The weight w = Math.exp(-elapsed/tau), that is, E’s -elapsed/tau power; Elapsed is the length of time since the last calculation; tau(Greek alphabet) is the time constant of EWMA
  • Here tau = halfLife/Math.log(2) is the converted value according to timeunit; The halfLife parameter represents speed of convergence, i.e. the speed of convergence.

RSocketSupplier

rsocket-load-balancer-0.12.1-sources.jar! /io/rsocket/client/filter/RSocketSupplier.java

public class RSocketSupplier implements Availability, Supplier<Mono<RSocket>>, Closeable {

  private static final double EPSILON = 1e-4;

  private Supplier<Mono<RSocket>> rSocketSupplier;

  private final MonoProcessor<Void> onClose;

  private final long tau;
  private long stamp;
  private final Ewma errorPercentage;

  public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier, long halfLife, TimeUnit unit) {
    this.rSocketSupplier = rSocketSupplier;
    this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
    this.stamp = Clock.now();
    this.errorPercentage = new Ewma(halfLife, unit, 1.0);
    this.onClose = MonoProcessor.create();
  }

  public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier) {
    this(rSocketSupplier, 5, TimeUnit.SECONDS);
  }

  @Override
  public double availability() {
    double e = errorPercentage.value();
    if (Clock.now() - stamp > tau) {
      // If the window is expired artificially increase the availability
      double a = Math.min(1.0, e + 0.5);
      errorPercentage.reset(a);
    }
    if (e < EPSILON) {
      e = 0.0;
    } else if (1.0 - EPSILON < e) {
      e = 1.0;
    }

    return e;
  }

  private synchronized void updateErrorPercentage(double value) {
    errorPercentage.insert(value);
    stamp = Clock.now();
  }

  @Override
  public Mono<RSocket> get() {
    return rSocketSupplier
        .get()
        .doOnNext(o -> updateErrorPercentage(1.0))
        .doOnError(t -> updateErrorPercentage(0.0))
        .map(AvailabilityAwareRSocketProxy::new);
  }

  @Override
  public void dispose() {
    onClose.onComplete();
  }

  @Override
  public boolean isDisposed() {
    return onClose.isDisposed();
  }

  @Override
  public Mono<Void> onClose() {
    return onClose;
  }

  private class AvailabilityAwareRSocketProxy extends RSocketProxy {
    public AvailabilityAwareRSocketProxy(RSocket source) {
      super(source);

      onClose.doFinally(signalType -> source.dispose()).subscribe();
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
      return source
          .fireAndForget(payload)
          .doOnError(t -> errorPercentage.insert(0.0))
          .doOnSuccess(v -> updateErrorPercentage(1.0));
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
      return source
          .requestResponse(payload)
          .doOnError(t -> errorPercentage.insert(0.0))
          .doOnSuccess(p -> updateErrorPercentage(1.0));
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
      return source
          .requestStream(payload)
          .doOnError(th -> errorPercentage.insert(0.0))
          .doOnComplete(() -> updateErrorPercentage(1.0));
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
      return source
          .requestChannel(payloads)
          .doOnError(th -> errorPercentage.insert(0.0))
          .doOnComplete(() -> updateErrorPercentage(1.0));
    }

    @Override
    public Mono<Void> metadataPush(Payload payload) {
      return source
          .metadataPush(payload)
          .doOnError(t -> errorPercentage.insert(0.0))
          .doOnSuccess(v -> updateErrorPercentage(1.0));
    }

    @Override
    public double availability() {
      // If the window is expired set success and failure to zero and return
      // the child availability
      if (Clock.now() - stamp > tau) {
        updateErrorPercentage(1.0);
      }
      return source.availability() * errorPercentage.value();
    }
  }
}
  • RSocketSupplier implements Availability, Supplier, Closeable interfaces, in which it defines errorPercentage variable, whose type is Ewma; ; If halfLife value is not specified, RSocketSupplier defaults to halfLife of 5 seconds and ewma’s initial value is 1.0
  • RSocketSupplier defines a constant EPSILON = 1e-4, and its availability method calculates availability first, and then resets errorPercentage; when stamp exceeds tau value from the last calculation time; After that, it returns 0 when the availability is less than EPSILON and 1.0 when the availability+EPSILON is greater than 1.
  • The updateErrorPercentage method is used to insert a new value into ewma and update stamp; at the same time. The doOnNext method updateErrorPercentage value of the get method is 1.0, and the doOnError method updateErrorPercentage value is 0.0. Map will convert RSocket to AvailabilityAwareRSocketProxy; ; AvailabilityAwareRRSocketProxy proxies the target Socket, and weaves errorPercentage statistics into the doOnError and doOnSuccess of related methods.

Summary

  • Moving Average has several algorithms, including SMA (Simple Moving Average)、WMA(Weighted Moving Average)、EMA(Exponentially Moving Average) or EWMA (Exponentially Weighted Moving Average)
  • Ewma’s constructor needs to specify halfLife, timeunit, initialValue (Ewma initial value) parameters; ewma = wewma + (1.0 – w)X, where x is the current value and w is the weight; The weight w = Math.exp(-elapsed/tau), that is, E’s -elapsed/tau power; Elapsed is the length of time since the last calculation; tau(Greek alphabet) is the time constant of EWMA; Here tau = halfLife/Math.log(2) is the converted value according to timeunit; The halfLife parameter represents speed of convergence, i.e. the speed of convergence.
  • Rsocket load balancer uses Ewma’s availability; of statistical services; The RSocketSupplier implements Availability, Supplier and Closeable interfaces, in which it defines errorPercentage variable, whose type is Ewma; ; If halfLife value is not specified, RSocketSupplier defaults to halfLife of 5 seconds and ewma’s initial value is 1.0; RSocketSupplier’s get method will convert the RSocket into an AvailabilityWarersoCKETProxy, while AvailabilityWarersoCKETProxy will proxy the target RSocket and weave errorPercentage statistics into doOnError and doOnSuccess of related methods

doc