Talk about flink’s RpcService

  flink

Order

This article mainly studies flink’s RpcService.

RpcService

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java

public interface RpcService {

    String getAddress();

    int getPort();

    <C extends RpcGateway> CompletableFuture<C> connect(
        String address,
        Class<C> clazz);

    <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
        String address,
        F fencingToken,
        Class<C> clazz);

    <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint);

    <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken);

    void stopServer(RpcServer selfGateway);

    CompletableFuture<Void> stopService();

    CompletableFuture<Void> getTerminationFuture();

    Executor getExecutor();

    ScheduledExecutor getScheduledExecutor();

    ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);

    void execute(Runnable runnable);

    <T> CompletableFuture<T> execute(Callable<T> callable);
}
  • RpcService is used to connect to a remote rpc server or start an rpc server to forward remote calls to rpcEndpoint; ; It provides methods such as connect, startServer, fenceRpcServer, stopServer, stopService, getTerminationFuture, scheduleRunnable, execute, etc.

AkkaRpcService

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java

@ThreadSafe
public class AkkaRpcService implements RpcService {

    private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcService.class);

    static final int VERSION = 1;

    static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";

    private final Object lock = new Object();

    private final ActorSystem actorSystem;
    private final Time timeout;

    @GuardedBy("lock")
    private final Map<ActorRef, RpcEndpoint> actors = new HashMap<>(4);

    private final long maximumFramesize;

    private final String address;
    private final int port;

    private final ScheduledExecutor internalScheduledExecutor;

    private final CompletableFuture<Void> terminationFuture;

    private volatile boolean stopped;

    public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
        this.actorSystem = checkNotNull(actorSystem, "actor system");
        this.timeout = checkNotNull(timeout, "timeout");

        if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
            maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH);
        } else {
            // only local communication
            maximumFramesize = Long.MAX_VALUE;
        }

        Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);

        if (actorSystemAddress.host().isDefined()) {
            address = actorSystemAddress.host().get();
        } else {
            address = "";
        }

        if (actorSystemAddress.port().isDefined()) {
            port = (Integer) actorSystemAddress.port().get();
        } else {
            port = -1;
        }

        internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);

        terminationFuture = new CompletableFuture<>();

        stopped = false;
    }

    public ActorSystem getActorSystem() {
        return actorSystem;
    }

    protected int getVersion() {
        return VERSION;
    }

    @Override
    public String getAddress() {
        return address;
    }

    @Override
    public int getPort() {
        return port;
    }

    // this method does not mutate state and is thus thread-safe
    @Override
    public <C extends RpcGateway> CompletableFuture<C> connect(
            final String address,
            final Class<C> clazz) {

        return connectInternal(
            address,
            clazz,
            (ActorRef actorRef) -> {
                Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);

                return new AkkaInvocationHandler(
                    addressHostname.f0,
                    addressHostname.f1,
                    actorRef,
                    timeout,
                    maximumFramesize,
                    null);
            });
    }

    // this method does not mutate state and is thus thread-safe
    @Override
    public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {
        return connectInternal(
            address,
            clazz,
            (ActorRef actorRef) -> {
                Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);

                return new FencedAkkaInvocationHandler<>(
                    addressHostname.f0,
                    addressHostname.f1,
                    actorRef,
                    timeout,
                    maximumFramesize,
                    null,
                    () -> fencingToken);
            });
    }

    @Override
    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
        checkNotNull(rpcEndpoint, "rpc endpoint");

        CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
        final Props akkaRpcActorProps;

        if (rpcEndpoint instanceof FencedRpcEndpoint) {
            akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
        } else {
            akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
        }

        ActorRef actorRef;

        synchronized (lock) {
            checkState(!stopped, "RpcService is stopped");
            actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
            actors.put(actorRef, rpcEndpoint);
        }

        LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());

        final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
        final String hostname;
        Option<String> host = actorRef.path().address().host();
        if (host.isEmpty()) {
            hostname = "localhost";
        } else {
            hostname = host.get();
        }

        Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));

        implementedRpcGateways.add(RpcServer.class);
        implementedRpcGateways.add(AkkaBasedEndpoint.class);

        final InvocationHandler akkaInvocationHandler;

        if (rpcEndpoint instanceof FencedRpcEndpoint) {
            // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
            akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
                akkaAddress,
                hostname,
                actorRef,
                timeout,
                maximumFramesize,
                terminationFuture,
                ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);

            implementedRpcGateways.add(FencedMainThreadExecutable.class);
        } else {
            akkaInvocationHandler = new AkkaInvocationHandler(
                akkaAddress,
                hostname,
                actorRef,
                timeout,
                maximumFramesize,
                terminationFuture);
        }

        // Rather than using the System ClassLoader directly, we derive the ClassLoader
        // from this class . That works better in cases where Flink runs embedded and all Flink
        // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
        ClassLoader classLoader = getClass().getClassLoader();

        @SuppressWarnings("unchecked")
        RpcServer server = (RpcServer) Proxy.newProxyInstance(
            classLoader,
            implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
            akkaInvocationHandler);

        return server;
    }

    @Override
    public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken) {
        if (rpcServer instanceof AkkaBasedEndpoint) {

            InvocationHandler fencedInvocationHandler = new FencedAkkaInvocationHandler<>(
                rpcServer.getAddress(),
                rpcServer.getHostname(),
                ((AkkaBasedEndpoint) rpcServer).getActorRef(),
                timeout,
                maximumFramesize,
                null,
                () -> fencingToken);

            // Rather than using the System ClassLoader directly, we derive the ClassLoader
            // from this class . That works better in cases where Flink runs embedded and all Flink
            // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
            ClassLoader classLoader = getClass().getClassLoader();

            return (RpcServer) Proxy.newProxyInstance(
                classLoader,
                new Class<?>[]{RpcServer.class, AkkaBasedEndpoint.class},
                fencedInvocationHandler);
        } else {
            throw new RuntimeException("The given RpcServer must implement the AkkaGateway in order to fence it.");
        }
    }

    @Override
    public void stopServer(RpcServer selfGateway) {
        if (selfGateway instanceof AkkaBasedEndpoint) {
            final AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway;
            final RpcEndpoint rpcEndpoint;

            synchronized (lock) {
                if (stopped) {
                    return;
                } else {
                    rpcEndpoint = actors.remove(akkaClient.getActorRef());
                }
            }

            if (rpcEndpoint != null) {
                akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
            } else {
                LOG.debug("RPC endpoint {} already stopped or from different RPC service", selfGateway.getAddress());
            }
        }
    }

    @Override
    public CompletableFuture<Void> stopService() {
        synchronized (lock) {
            if (stopped) {
                return terminationFuture;
            }

            stopped = true;
        }

        LOG.info("Stopping Akka RPC service.");

        final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());

        actorSystemTerminationFuture.whenComplete(
            (Terminated ignored, Throwable throwable) -> {
                synchronized (lock) {
                    actors.clear();
                }

                if (throwable != null) {
                    terminationFuture.completeExceptionally(throwable);
                } else {
                    terminationFuture.complete(null);
                }

                LOG.info("Stopped Akka RPC service.");
            });

        return terminationFuture;
    }

    @Override
    public CompletableFuture<Void> getTerminationFuture() {
        return terminationFuture;
    }

    @Override
    public Executor getExecutor() {
        return actorSystem.dispatcher();
    }

    @Override
    public ScheduledExecutor getScheduledExecutor() {
        return internalScheduledExecutor;
    }

    @Override
    public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
        checkNotNull(runnable, "runnable");
        checkNotNull(unit, "unit");
        checkArgument(delay >= 0L, "delay must be zero or larger");

        return internalScheduledExecutor.schedule(runnable, delay, unit);
    }

    @Override
    public void execute(Runnable runnable) {
        actorSystem.dispatcher().execute(runnable);
    }

    @Override
    public <T> CompletableFuture<T> execute(Callable<T> callable) {
        Future<T> scalaFuture = Futures.<T>future(callable, actorSystem.dispatcher());

        return FutureUtils.toJava(scalaFuture);
    }

    private <C extends RpcGateway> CompletableFuture<C> connectInternal(
            final String address,
            final Class<C> clazz,
            Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
        checkState(!stopped, "RpcService is stopped");

        LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
            address, clazz.getName());

        final ActorSelection actorSel = actorSystem.actorSelection(address);

        final Future<ActorIdentity> identify = Patterns
            .ask(actorSel, new Identify(42), timeout.toMilliseconds())
            .<ActorIdentity>mapTo(ClassTag$.MODULE$.<ActorIdentity>apply(ActorIdentity.class));

        final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);

        final CompletableFuture<ActorRef> actorRefFuture = identifyFuture.thenApply(
            (ActorIdentity actorIdentity) -> {
                if (actorIdentity.getRef() == null) {
                    throw new CompletionException(new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.'));
                } else {
                    return actorIdentity.getRef();
                }
            });

        final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
            (ActorRef actorRef) -> FutureUtils.toJava(
                Patterns
                    .ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), timeout.toMilliseconds())
                    .<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));

        return actorRefFuture.thenCombineAsync(
            handshakeFuture,
            (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
                InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);

                // Rather than using the System ClassLoader directly, we derive the ClassLoader
                // from this class . That works better in cases where Flink runs embedded and all Flink
                // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
                ClassLoader classLoader = getClass().getClassLoader();

                @SuppressWarnings("unchecked")
                C proxy = (C) Proxy.newProxyInstance(
                    classLoader,
                    new Class<?>[]{clazz},
                    invocationHandler);

                return proxy;
            },
            actorSystem.dispatcher());
    }

    //......
}
  • AkkaRpcService implements RpcService interface, and its constructor requires actorSystem and timeout parameters to be passed in. The connect method creates an AkkaInvocationHandler or FencedAkkaInv ocationHandler, and then calls the connectInternal method to connect using akka
  • The startServer method uses the actorSystem to create ActorRef, then AkkaInvocationHandler or FencedAkkaInvocationHandler, and finally uses Proxy.newProxyInstance to create RpcServer; ; The stopServer method uses PoisonPill to terminate the actor; ; StopService is used to terminate the current RpcService, which executes actorSystem.terminate ()
  • The fenceRpcServer method is used to create a new RPC Server based on the specified fencingToken reuse proxy; Dispatcher () is used by the execute method to schedule execution; The scheduleRunnable method uses ActiorsystemscheduleExecutoradapter for scheduling.

Summary

  • RpcService is used to connect to a remote rpc server or start an rpc server to forward remote calls to rpcEndpoint; ; It provides methods such as connect, startServer, fenceRpcServer, stopServer, stopService, getTerminationFuture, scheduleRunnable, execute, etc.
  • AkkaRpcService implements the RpcService interface. its connect method creates an AkkaInvocationHandler or FencedAkkaInvocationHandler, and then calls the connectInternal method to connect using akka
  • AkkaRpcService’s startServer method uses the actorSystem to create ActorRef, then AkkaInvocationHandler or FencedAkkaInvocationHandler, and finally uses Proxy.newProxyInstance to create RpcServer; ; The stopServer method uses PoisonPill to terminate the actor; ; StopService is used to terminate the current RpcService, which executes actorSystem.terminate ()

doc