Talk about flink’s FencedAkkaInvocationHandler

  flink

Order

This article mainly studies flink’s FencedAkkaInvocationHandler

FencedRpcGateway

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcGateway.java

public interface FencedRpcGateway<F extends Serializable> extends RpcGateway {

    /**
     * Get the current fencing token.
     *
     * @return current fencing token
     */
    F getFencingToken();
}
  • The FencedRpcGateway interface inherits the RpcGateway interface and defines a generic f, which is the generic of fencing token.

FencedMainThreadExecutable

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedMainThreadExecutable.java

public interface FencedMainThreadExecutable extends MainThreadExecutable {

    /**
     * Run the given runnable in the main thread without attaching a fencing token.
     *
     * @param runnable to run in the main thread without validating the fencing token.
     */
    void runAsyncWithoutFencing(Runnable runnable);

    /**
     * Run the given callable in the main thread without attaching a fencing token.
     *
     * @param callable to run in the main thread without validating the fencing token.
     * @param timeout for the operation
     * @param <V> type of the callable result
     * @return Future containing the callable result
     */
    <V> CompletableFuture<V> callAsyncWithoutFencing(Callable<V> callable, Time timeout);
}
  • The FencedMainThreadExecutable interface inherits mainthhreadexecutible and defines the runAsyncWithoutFencing, callAsyncWithoutFencing methods for running either unfenced runnable or unfenced callable. This definition is mainly because FencedMainThreadExecutab le inherits maintreadexecutible, so the semantics of runAsync, callAsync, scheduleRunAsync methods defined in maintreadexecutible become Fenced.

FencedAkkaInvocationHandler

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java

public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInvocationHandler implements FencedMainThreadExecutable, FencedRpcGateway<F> {

    private final Supplier<F> fencingTokenSupplier;

    public FencedAkkaInvocationHandler(
            String address,
            String hostname,
            ActorRef rpcEndpoint,
            Time timeout,
            long maximumFramesize,
            @Nullable CompletableFuture<Void> terminationFuture,
            Supplier<F> fencingTokenSupplier) {
        super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture);

        this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Class<?> declaringClass = method.getDeclaringClass();

        if (declaringClass.equals(FencedMainThreadExecutable.class) ||
            declaringClass.equals(FencedRpcGateway.class)) {
            return method.invoke(this, args);
        } else {
            return super.invoke(proxy, method, args);
        }
    }

    @Override
    public void runAsyncWithoutFencing(Runnable runnable) {
        checkNotNull(runnable, "runnable");

        if (isLocal) {
            getActorRef().tell(
                new UnfencedMessage<>(new RunAsync(runnable, 0L)), ActorRef.noSender());
        } else {
            throw new RuntimeException("Trying to send a Runnable to a remote actor at " +
                getActorRef().path() + ". This is not supported.");
        }
    }

    @Override
    public <V> CompletableFuture<V> callAsyncWithoutFencing(Callable<V> callable, Time timeout) {
        checkNotNull(callable, "callable");
        checkNotNull(timeout, "timeout");

        if (isLocal) {
            @SuppressWarnings("unchecked")
            CompletableFuture<V> resultFuture = (CompletableFuture<V>) FutureUtils.toJava(
                Patterns.ask(
                    getActorRef(),
                    new UnfencedMessage<>(new CallAsync(callable)),
                    timeout.toMilliseconds()));

            return resultFuture;
        } else {
            throw new RuntimeException("Trying to send a Runnable to a remote actor at " +
                getActorRef().path() + ". This is not supported.");
        }
    }

    @Override
    public void tell(Object message) {
        super.tell(fenceMessage(message));
    }

    @Override
    public CompletableFuture<?> ask(Object message, Time timeout) {
        return super.ask(fenceMessage(message), timeout);
    }

    @Override
    public F getFencingToken() {
        return fencingTokenSupplier.get();
    }

    private <P> FencedMessage<F, P> fenceMessage(P message) {
        if (isLocal) {
            return new LocalFencedMessage<>(fencingTokenSupplier.get(), message);
        } else {
            if (message instanceof Serializable) {
                @SuppressWarnings("unchecked")
                FencedMessage<F, P> result = (FencedMessage<F, P>) new RemoteFencedMessage<>(fencingTokenSupplier.get(), (Serializable) message);

                return result;
            } else {
                throw new RuntimeException("Trying to send a non-serializable message " + message + " to a remote " +
                    "RpcEndpoint. Please make sure that the message implements java.io.Serializable.");
            }
        }
    }
}
  • FencedAkkaInvocationHandler inherits AkkaInvocationHandler and implements FencedMainThreadExecutable and FencedRpcGateway interfaces. RunAsyncWithoutF encing and callAsyncWithoutFencing all sent UnfencedMessage.
  • The invoke method of FencedAkkaInvocationHandler makes corresponding method calls to the current object for the methods of FencedRpcGateway and FencedM ainThreadExecutable, and other methods are changed to invoke the Invoke method of the parent class.
  • The parent class’s runAsync, scheduleRunAsync, callAsync last called the tell or ask method, while FencedAkkaInvocationHandler covered the parent class’s tell and ask methods, changing the semantics of runAsync, scheduleRunAsync, callAsync methods to Fenced; ; The tell and ask methods here construct FencedMessage through fenceMessage method, while fenceMessage method obtains FENCE TOKEN through getFencingToken method; The getFencingToken method calls fencingTokenSupplier.get (), which is passed in by the constructor

UnfencedMessage

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/UnfencedMessage.java

public class UnfencedMessage<P> {
    private final P payload;

    public UnfencedMessage(P payload) {
        this.payload = Preconditions.checkNotNull(payload);
    }

    public P getPayload() {
        return payload;
    }

    @Override
    public String toString() {
        return "UnfencedMessage(" + payload + ')';
    }
}
  • UnfencedMessage does not require a message from a fencing token.

FencedMessage

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/FencedMessage.java

public interface FencedMessage<F extends Serializable, P> {

    F getFencingToken();

    P getPayload();
}
  • The FencedMessage interface defines getFencingToken and getPayload methods; It has two subclasses: LocalFencedMessage and RemoteFencedMessage

LocalFencedMessage

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java

public class LocalFencedMessage<F extends Serializable, P> implements FencedMessage<F, P> {

    private final F fencingToken;
    private final P payload;

    public LocalFencedMessage(@Nullable F fencingToken, P payload) {
        this.fencingToken = fencingToken;
        this.payload = Preconditions.checkNotNull(payload);
    }

    @Override
    public F getFencingToken() {
        return fencingToken;
    }

    @Override
    public P getPayload() {
        return payload;
    }

    @Override
    public String toString() {
        return "LocalFencedMessage(" + fencingToken + ", " + payload + ')';
    }
}
  • LocalFencedMessage implements FencedMessage interface. The type of fencingToken requires Serializable interface, which has fencingToken and payload attributes.

RemoteFencedMessage

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java

public class RemoteFencedMessage<F extends Serializable, P extends Serializable> implements FencedMessage<F, P>, Serializable {
    private static final long serialVersionUID = 4043136067468477742L;

    private final F fencingToken;
    private final P payload;

    public RemoteFencedMessage(@Nullable F fencingToken, P payload) {
        this.fencingToken = fencingToken;
        this.payload = Preconditions.checkNotNull(payload);
    }

    @Override
    public F getFencingToken() {
        return fencingToken;
    }

    @Override
    public P getPayload() {
        return payload;
    }

    @Override
    public String toString() {
        return "RemoteFencedMessage(" + fencingToken + ", " + payload + ')';
    }
}
  • RemoteFencedMessage implements FencedMessage and Serializable interface. At the same time, the type of payload also requires to implement Serializable interface, which has fencingToken and payload two attributes

Summary

  • The FencedRpcGateway interface inherits the RpcGateway interface and defines a generic f, which is the generic of fencing token. The FencedMainThreadExecutable interface inherits mainthhreadexecutible and defines the runAsyncWithoutFencing and callAsyncWithoutFencing methods to run either unfenced runnable or unfenced callable.
  • FencedAkkaInvocationHandler inherits AkkaInvocationHandler and implements FencedMainThreadExecutable and FencedRpcGateway interfaces. RunAsyncWithoutF encing and callAsyncWithoutFencing all sent UnfencedMessage; ; The parent class’s runAsync, scheduleRunAsync, callAsync last called the tell or ask method, while FencedAkkaInvocationHandler covered the parent class’s tell and ask methods, changing the semantics of runAsync, scheduleRunAsync, callAsync methods to Fenced; ; The tell and ask methods here construct fencedessage through fenceMessage method.
  • UnfencedMessage does not need the message; of fencing token; The FencedMessage interface defines getFencingToken and getPayload methods; It has two subclasses: LocalFencedMessage and RemoteFencedMessage; . LocalFencedMessage differs from RemoteEncedMessage in that RemoteEncedMessage implements the Serializable interface, and the type of payload also requires the Serializable interface to be implemented.

doc