Talk about NettyConnector’s start and shutdown.

  reactor

Order

This article mainly studies NettyConnector’s start and shutdown.

NettyConnector

reactor-netty-0.7.6.RELEASE-sources.jar! /reactor/ipc/netty/NettyConnector.java

/**
 * A Netty connector is an inbound/outbound factory sharing configuration but usually no
 * runtime
 * (connection...) state at the exception of shared connection pool setups. Subscribing
 * to the returned {@link Mono} will effectively
 * create a new stateful "client" or "server" socket depending on the implementation.
 * It might also be working on top of a socket pool or connection pool as well, but the
 * state should be safely handled by the pool itself.
 * <p>
 * <p>Clients or Receivers will onSubscribe when their connection is established. They
 * will complete when the unique returned closing {@link Publisher} completes itself or if
 * the connection is remotely terminated. Calling the returned {@link
 * Disposable#dispose()} from {@link Mono#subscribe()} will terminate the subscription
 * and underlying connection from the local peer.
 * <p>
 * <p>Servers or Producers will onSubscribe when their socket is bound locally. They will
 * never complete as many {@link Publisher} close selectors will be expected. Disposing
 * the returned {@link Mono} will safely call shutdown.
 *
 * @param <INBOUND> incoming traffic API such as server request or client response
 * @param <OUTBOUND> outgoing traffic API such as server response or client request
 * @author Stephane Maldini
 * @since 0.6
 */
public interface NettyConnector<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> {

    /**
     * Prepare a {@link BiFunction} IO handler that will react on a new connected state
     * each
     * time
     * the returned  {@link Mono} is subscribed. This {@link NettyConnector} shouldn't assume
     * any state related to the individual created/cleaned resources.
     * <p>
     * The IO handler will return {@link Publisher} to signal when to terminate the
     * underlying resource channel.
     *
     * @param ioHandler the in/out callback returning a closing publisher
     *
     * @return a {@link Mono} completing with a {@link Disposable} token to dispose
     * the active handler (server, client connection...) or failing with the connection
     * error.
     */
    Mono<? extends NettyContext> newHandler(BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> ioHandler);

    /**
     * Start a Client or Server in a blocking fashion, and wait for it to finish initializing.
     * The returned {@link BlockingNettyContext} class offers a simplified API around operating
     * the client/server in a blocking fashion, including to {@link BlockingNettyContext#shutdown() shut it down}.
     *
     * @param handler the handler to start the client or server with.
     * @param <T>
     * @return a {@link BlockingNettyContext}
     */
    default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
    BlockingNettyContext start(T handler) {
        return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());
    }

    /**
     * Start a Client or Server in a blocking fashion, and wait for it to finish initializing.
     * The returned {@link BlockingNettyContext} class offers a simplified API around operating
     * the client/server in a blocking fashion, including to {@link BlockingNettyContext#shutdown() shut it down}.
     *
     * @param handler the handler to start the client or server with.
     * @param timeout wait for Client/Server to start for the specified timeout.
     * @param <T>
     * @return a {@link BlockingNettyContext}
     */
    default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
    BlockingNettyContext start(T handler, Duration timeout) {
        return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName(), timeout);
    }

    /**
     * Start a Client or Server in a fully blocking fashion, not only waiting for it to
     * initialize but also blocking during the full lifecycle of the client/server.
     * Since most servers will be long-lived, this is more adapted to running a server
     * out of a main method, only allowing shutdown of the servers through sigkill.
     * <p>
     * Note that a {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} is added
     * by this method in order to properly disconnect the client/server upon receiving
     * a sigkill signal.
     *
     * @param handler the handler to execute.
     */
    default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
    void startAndAwait(T handler) {
        startAndAwait(handler, null);
    }

    /**
     * Start a Client or Server in a fully blocking fashion, not only waiting for it to
     * initialize but also blocking during the full lifecycle of the client/server.
     * Since most servers will be long-lived, this is more adapted to running a server
     * out of a main method, only allowing shutdown of the servers through sigkill.
     * <p>
     * Note that a {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} is added
     * by this method in order to properly disconnect the client/server upon receiving
     * a sigkill signal.
     *
     * @param handler the handler to execute.
     * @param onStart an optional callback to be invoked once the client/server has finished
     * initializing.
     */
    default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
    void startAndAwait(T handler, @Nullable Consumer<BlockingNettyContext> onStart) {
        BlockingNettyContext facade = new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());

        facade.installShutdownHook();

        if (onStart != null) {
            onStart.accept(facade);
        }

        facade.getContext()
              .onClose()
              .block();
    }
}

You can see that there are five methods for this class, one newHandler is in non-blocking mode, and the others start with blocking mode (The duration parameter is used to specify the timeout period to wait for initialization to complete.), using BlockingNettyContext

newHandler

NewHandler returned a Mono <? Extends NettyContext > will dispose itself when this mono is finished.

Examples are as follows

    @Test
    public void testNewHandler() throws InterruptedException {
        TcpClient client = TcpClient.create("localhost", 9090);
        Mono<? extends NettyContext> mono = client.newHandler((inbound,outbound) -> {
            return outbound.sendString(Mono.just("Hello World!")).then();
        });
        
        CountDownLatch latch = new CountDownLatch(1);

        Disposable disposable = mono
                .doFinally(e -> {
                    System.out.println("finish:"+e);
                    latch.countDown();
                })
                .subscribe();

        latch.await();
        System.out.println(disposable.isDisposed());
    }

start

The start method returns BlockingNettyContext. Users can call the shutdown method of BlockingNettyContext to dispose nettyContext, for example

    @Test
    public void testShutdown(){
        TcpClient client = TcpClient.create("localhost", 9090);
        CountDownLatch latch = new CountDownLatch(1);
        BlockingNettyContext context = client.start((inbound, outbound) -> {
            latch.countDown();
            return outbound.sendString(Mono.just("hello world"))
                    .then();
        });
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            context.shutdown();
        }

    }

reactor-netty-0.7.6.RELEASE-sources.jar! /reactor/ipc/netty/tcp/BlockingNettyContext.java

    /**
     * Shut down the {@link NettyContext} and wait for its termination, up to the
     * {@link #setLifecycleTimeout(Duration) lifecycle timeout}.
     */
    public void shutdown() {
        if (context.isDisposed()) {
            return;
        }

        removeShutdownHook(); //only applies if not called from the hook's thread

        context.dispose();
        context.onClose()
               .doOnError(e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e))
               .doOnTerminate(() -> LOG.info("Stopped {} on {}", description, context.address()))
               .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
               .block();
    }

    /**
     * Remove a {@link Runtime#removeShutdownHook(Thread) JVM shutdown hook} if one was
     * {@link #installShutdownHook() installed} by this {@link BlockingNettyContext}.
     *
     * @return true if there was a hook and it was removed, false otherwise.
     */
    public boolean removeShutdownHook() {
        if (this.shutdownHook != null && Thread.currentThread() != this.shutdownHook) {
            Thread sdh = this.shutdownHook;
            this.shutdownHook = null;
            return Runtime.getRuntime().removeShutdownHook(sdh);
        }
        return false;
    }

Shutdown here is mainly to remove the current shutdownHook and then dispose nettyContext.

startAndAwait

The startAndAwait method called installShutdownHook BlockingNettyContext to close.
reactor-netty-0.7.6.RELEASE-sources.jar! /reactor/ipc/netty/tcp/BlockingNettyContext.java

    /**
     * Install a {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} that will
     * shutdown this {@link BlockingNettyContext} if the JVM is terminated externally.
     * <p>
     * The hook is removed if shutdown manually, and subsequent calls to this method are
     * no-op.
     */
    public void installShutdownHook() {
        //don't return the hook to discourage uninstalling it externally
        if (this.shutdownHook != null) {
            return;
        }
        this.shutdownHook = new Thread(this::shutdownFromJVM);
        Runtime.getRuntime().addShutdownHook(this.shutdownHook);
    }

    protected void shutdownFromJVM() {
        if (context.isDisposed()) {
            return;
        }

        final String hookDesc = Thread.currentThread().toString();

        context.dispose();
        context.onClose()
               .doOnError(e -> LOG.error("Stopped {} on {} with an error {} from JVM hook {}",
                       description, context.address(), e, hookDesc))
               .doOnTerminate(() -> LOG.info("Stopped {} on {} from JVM hook {}",
                       description, context.address(), hookDesc))
               .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description +
                       " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
               .block();
    }    

The shutdownHook has registered the shutdownFromJVM method to shut down NettyContext.

Example

    @Test
    public void testStartAndAwait(){
        TcpClient client = TcpClient.create("localhost", 9090);
        client.startAndAwait((inbound, outbound) -> {
            return outbound.sendString(Mono.just("hello world"))
                    .then();
        });
    }

Summary

NettyConnector provides two usage modes of non-blocking and blocking. if non-blocking is used, use newHandler to return a Mono <? Extends NettyContext >, when it will be completed, dispose nettyContext; itself; Blocking, the startAndAwait method will automatically help you register the shutdownHook to dispose nettyContext, while the start method returns BlockingNettyContext, allowing you to call the shutdown method to dispose nettyContext.

doc