Talk about flink’s RpcServer

  flink

Order

This article mainly studies flink’s RpcServer.

RpcGateway

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java

public interface RpcGateway {

    /**
     * Returns the fully qualified address under which the associated rpc endpoint is reachable.
     *
     * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable
     */
    String getAddress();

    /**
     * Returns the fully qualified hostname under which the associated rpc endpoint is reachable.
     *
     * @return Fully qualified hostname under which the associated rpc endpoint is reachable
     */
    String getHostname();
}
  • RpcGateway defines two methods: getAddress and getHostname

MainThreadExecutable

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java

public interface MainThreadExecutable {

    /**
     * Execute the runnable in the main thread of the underlying RPC endpoint.
     *
     * @param runnable Runnable to be executed
     */
    void runAsync(Runnable runnable);

    /**
     * Execute the callable in the main thread of the underlying RPC endpoint and return a future for
     * the callable result. If the future is not completed within the given timeout, the returned
     * future will throw a {@link TimeoutException}.
     *
     * @param callable Callable to be executed
     * @param callTimeout Timeout for the future to complete
     * @param <V> Return value of the callable
     * @return Future of the callable result
     */
    <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout);

    /**
     * Execute the runnable in the main thread of the underlying RPC endpoint, with
     * a delay of the given number of milliseconds.
     *
     * @param runnable Runnable to be executed
     * @param delay    The delay, in milliseconds, after which the runnable will be executed
     */
    void scheduleRunAsync(Runnable runnable, long delay);
}
  • MainThreadExecutable defines three methods: runAsync, callAsync, and scheduleRunAsync.

StartStoppable

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java

public interface StartStoppable {

    /**
     * Starts the processing of remote procedure calls.
     */
    void start();

    /**
     * Stops the processing of remote procedure calls.
     */
    void stop();
}
  • StartStoppable defines the start and stop methods.

RpcServer

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java

public interface RpcServer extends StartStoppable, MainThreadExecutable, RpcGateway {

    /**
     * Return a future which is completed when the rpc endpoint has been terminated.
     *
     * @return Future indicating when the rpc endpoint has been terminated
     */
    CompletableFuture<Void> getTerminationFuture();
}
  • RpcServer interface inherits RpcGateway, MainThreadExecutable and StartStoppable, and defines getTerminationFuture method. It has two implementation classes, AkkaInvocationHandler and FencedAkkaInvocationHandler; . Among them, FencedAkkaInvocationHandler inherited AkkaInvocationHandler.

AkkaBasedEndpoint

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java

interface AkkaBasedEndpoint extends RpcGateway {

    /**
     * Returns the {@link ActorRef} of the underlying RPC actor.
     *
     * @return the {@link ActorRef} of the underlying RPC actor
     */
    ActorRef getActorRef();
}
  • Akkabasedendpointinterface inherits RpcGateway interface, which additionally defines getActorRef () method to obtain ActorRef

AkkaInvocationHandler

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java

class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaInvocationHandler.class);

    /**
     * The Akka (RPC) address of {@link #rpcEndpoint} including host and port of the ActorSystem in
     * which the actor is running.
     */
    private final String address;

    /**
     * Hostname of the host, {@link #rpcEndpoint} is running on.
     */
    private final String hostname;

    private final ActorRef rpcEndpoint;

    // whether the actor ref is local and thus no message serialization is needed
    protected final boolean isLocal;

    // default timeout for asks
    private final Time timeout;

    private final long maximumFramesize;

    // null if gateway; otherwise non-null
    @Nullable
    private final CompletableFuture<Void> terminationFuture;

    AkkaInvocationHandler(
            String address,
            String hostname,
            ActorRef rpcEndpoint,
            Time timeout,
            long maximumFramesize,
            @Nullable CompletableFuture<Void> terminationFuture) {

        this.address = Preconditions.checkNotNull(address);
        this.hostname = Preconditions.checkNotNull(hostname);
        this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
        this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
        this.timeout = Preconditions.checkNotNull(timeout);
        this.maximumFramesize = maximumFramesize;
        this.terminationFuture = terminationFuture;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Class<?> declaringClass = method.getDeclaringClass();

        Object result;

        if (declaringClass.equals(AkkaBasedEndpoint.class) ||
            declaringClass.equals(Object.class) ||
            declaringClass.equals(RpcGateway.class) ||
            declaringClass.equals(StartStoppable.class) ||
            declaringClass.equals(MainThreadExecutable.class) ||
            declaringClass.equals(RpcServer.class)) {
            result = method.invoke(this, args);
        } else if (declaringClass.equals(FencedRpcGateway.class)) {
            throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +
                method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +
                "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +
                "retrieve a properly FencedRpcGateway.");
        } else {
            result = invokeRpc(method, args);
        }

        return result;
    }

    @Override
    public ActorRef getActorRef() {
        return rpcEndpoint;
    }

    @Override
    public void runAsync(Runnable runnable) {
        scheduleRunAsync(runnable, 0L);
    }

    @Override
    public void scheduleRunAsync(Runnable runnable, long delayMillis) {
        checkNotNull(runnable, "runnable");
        checkArgument(delayMillis >= 0, "delay must be zero or greater");

        if (isLocal) {
            long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
            tell(new RunAsync(runnable, atTimeNanos));
        } else {
            throw new RuntimeException("Trying to send a Runnable to a remote actor at " +
                rpcEndpoint.path() + ". This is not supported.");
        }
    }

    @Override
    public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
        if (isLocal) {
            @SuppressWarnings("unchecked")
            CompletableFuture<V> resultFuture = (CompletableFuture<V>) ask(new CallAsync(callable), callTimeout);

            return resultFuture;
        } else {
            throw new RuntimeException("Trying to send a Callable to a remote actor at " +
                rpcEndpoint.path() + ". This is not supported.");
        }
    }

    @Override
    public void start() {
        rpcEndpoint.tell(Processing.START, ActorRef.noSender());
    }

    @Override
    public void stop() {
        rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
    }

    // ------------------------------------------------------------------------
    //  Private methods
    // ------------------------------------------------------------------------

    private Object invokeRpc(Method method, Object[] args) throws Exception {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        Annotation[][] parameterAnnotations = method.getParameterAnnotations();
        Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);

        final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args);

        Class<?> returnType = method.getReturnType();

        final Object result;

        if (Objects.equals(returnType, Void.TYPE)) {
            tell(rpcInvocation);

            result = null;
        } else if (Objects.equals(returnType, CompletableFuture.class)) {
            // execute an asynchronous call
            result = ask(rpcInvocation, futureTimeout);
        } else {
            // execute a synchronous call
            CompletableFuture<?> futureResult = ask(rpcInvocation, futureTimeout);

            result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
        }

        return result;
    }

    protected RpcInvocation createRpcInvocationMessage(
            final String methodName,
            final Class<?>[] parameterTypes,
            final Object[] args) throws IOException {
        final RpcInvocation rpcInvocation;

        if (isLocal) {
            rpcInvocation = new LocalRpcInvocation(
                methodName,
                parameterTypes,
                args);
        } else {
            try {
                RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(
                    methodName,
                    parameterTypes,
                    args);

                if (remoteRpcInvocation.getSize() > maximumFramesize) {
                    throw new IOException("The rpc invocation size exceeds the maximum akka framesize.");
                } else {
                    rpcInvocation = remoteRpcInvocation;
                }
            } catch (IOException e) {
                LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", e);
                throw e;
            }
        }

        return rpcInvocation;
    }

    // ------------------------------------------------------------------------
    //  Helper methods
    // ------------------------------------------------------------------------

    private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Time defaultTimeout) {
        if (args != null) {
            Preconditions.checkArgument(parameterAnnotations.length == args.length);

            for (int i = 0; i < parameterAnnotations.length; i++) {
                if (isRpcTimeout(parameterAnnotations[i])) {
                    if (args[i] instanceof Time) {
                        return (Time) args[i];
                    } else {
                        throw new RuntimeException("The rpc timeout parameter must be of type " +
                            Time.class.getName() + ". The type " + args[i].getClass().getName() +
                            " is not supported.");
                    }
                }
            }
        }

        return defaultTimeout;
    }

    private static boolean isRpcTimeout(Annotation[] annotations) {
        for (Annotation annotation : annotations) {
            if (annotation.annotationType().equals(RpcTimeout.class)) {
                return true;
            }
        }

        return false;
    }

    protected void tell(Object message) {
        rpcEndpoint.tell(message, ActorRef.noSender());
    }

    protected CompletableFuture<?> ask(Object message, Time timeout) {
        return FutureUtils.toJava(
            Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()));
    }

    @Override
    public String getAddress() {
        return address;
    }

    @Override
    public String getHostname() {
        return hostname;
    }

    @Override
    public CompletableFuture<Void> getTerminationFuture() {
        return terminationFuture;
    }
}
  • AkkaInvocationHandler implements the InvocationHandler interfaces of RpcServer, AkkaBasedEndpoint jdk. Its constructor requires input of address, hostname, rpcEndpoint (ActorRef)、terminationFuture; GetAddress, getHostname, getTerminationFuture all directly return the corresponding attributes
  • The runAsync method internally calls scheduleRunAsync; ; The scheduleRunAsync method uses the tell method, calling rpcEndpoint.tell to pass RunAsync messages; The callAsync method uses the ask method, calling Patterns.ask to pass CallAsync messages.
  • The start method executes rpcendpoint.tell (processing.start, actorref.nosend ()); The stop method executes rpcendpoint.tell (processing.stop, actorref.nosend ()); The invoke method makes corresponding method calls to the current Object for the methods of Object, RpcGateway, MainThreadExecutable, StartStoppable, AkkaBasedEndpoint RpcServer. An UnsupportedOperationException is thrown for the method of FencedRpcGateway, while the other methods call the invokeRpc method internally to construct an RpcInvocation message for calling.

Summary

  • RpcServer interface inherits RpcGateway, MainThreadExecutable and StartStoppable, and defines getTerminationFuture method. It has two implementation classes, AkkaInvocationHandler and FencedAkkaInvocationHandler; . Among them, FencedAkkaInvocationHandler inherited AkkaInvocationHandler.
  • AkkaInvocationHandler implements the InvocationHandler interfaces of RpcServer, AkkaBasedEndpoint jdk. Its constructor requires input of address, hostname, rpcEndpoint (ActorRef)、terminationFuture; GetAddress, getHostname, getTerminationFuture all directly return the corresponding attributes; The runAsync method internally calls scheduleRunAsync;; The scheduleRunAsync method uses the tell method, calling rpcEndpoint.tell to pass RunAsync messages; The callAsync method uses the ask method, calling Patterns.ask to pass CallAsync messages.
  • AkkaInvocationHandler’s start method executes rpcendpoint.tell (processing.start, actorref.nosend ()); The stop method executes rpcendpoint.tell (processing.stop, actorref.nosend ()); The invoke method makes corresponding method calls to the current Object for the methods of Object, RpcGateway, MainThreadExecutable, StartStoppable, AkkaBasedEndpoint RpcServer. An UnsupportedOperationException is thrown for the method of FencedRpcGateway, while the other methods call the invokeRpc method internally to construct an RpcInvocation message for calling.

doc