NewHandler procedure for TcpClient in reactor-netty

  reactor

Order

This paper mainly studies the newHandler process of TcpClient in reactor-netty

maven

        <dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>0.7.3.RELEASE</version>
        </dependency>

TcpClient.newHandler

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/tcp/TcpClient.java

    /**
     * @param handler
     * @param address
     * @param secure
     * @param onSetup
     *
     * @return a new Mono to connect on subscribe
     */
    protected Mono<NettyContext> newHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler,
            InetSocketAddress address,
            boolean secure,
            Consumer<? super Channel> onSetup) {

        final BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>>
                targetHandler =
                null == handler ? ChannelOperations.noopHandler() : handler;

        return Mono.create(sink -> {
            SocketAddress remote = address != null ? address : options.getAddress();

            ChannelPool pool = null;

            PoolResources poolResources = options.getPoolResources();
            if (poolResources != null) {
                pool = poolResources.selectOrCreate(remote, options,
                        doHandler(null, sink, secure, remote, null, null),
                        options.getLoopResources().onClient(options.preferNative()));
            }

            ContextHandler<SocketChannel> contextHandler =
                    doHandler(targetHandler, sink, secure, remote, pool, onSetup);
            sink.onCancel(contextHandler);

            if (pool == null) {
                Bootstrap b = options.get();
                b.remoteAddress(remote);
                b.handler(contextHandler);
                contextHandler.setFuture(b.connect());
            }
            else {
                contextHandler.setFuture(pool.acquire());
            }
        });
    }
  • Here, Mono’s sink is used to create a return Mono<NettyContext >
  • Selector create is used here to get a channelPool.
  • Then create a contextHandler.
  • Finally, call contextHandler.setFuture to set channel.
  • Note that the doHandler method has been called twice, the first time the pool parameter is null, and the second time the newly created pool is passed in.

TcpResources.selectOrCreate

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/tcp/TcpResources.java

    public ChannelPool selectOrCreate(SocketAddress address,
            Supplier<? extends Bootstrap> bootstrap,
            Consumer<? super Channel> onChannelCreate,
            EventLoopGroup group) {
        return defaultPools.selectOrCreate(address, bootstrap, onChannelCreate, group);
    }

This is entrusted to DefaultPoolResources.

DefaultPoolResources.selectOrCreate

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/resources/DefaultPoolResources.java

    public ChannelPool selectOrCreate(SocketAddress remote,
            Supplier<? extends Bootstrap> bootstrap,
            Consumer<? super Channel> onChannelCreate,
            EventLoopGroup group) {
        SocketAddress address = remote;
        for (; ; ) {
            Pool pool = channelPools.get(remote);
            if (pool != null) {
                return pool;
            }
            Bootstrap b = bootstrap.get();
            if (remote != null) {
                b = b.remoteAddress(remote);
            }
            else {
                address = b.config()
                          .remoteAddress();
            }
            if (log.isDebugEnabled()) {
                log.debug("New {} client pool for {}", name, address);
            }
            pool = new Pool(b, provider, onChannelCreate, group);
            if (channelPools.putIfAbsent(address, pool) == null) {
                return pool;
            }
            pool.close();
        }
    }

You can see here get first, and if you can’t get it, new a Pool and then put it into channelPools.

DefaultPoolResources#Pool

    final static class Pool extends AtomicBoolean
            implements ChannelPoolHandler, ChannelPool, ChannelHealthChecker {

        final ChannelPool               pool;
        final Consumer<? super Channel> onChannelCreate;
        final EventLoopGroup            defaultGroup;

        final AtomicInteger activeConnections = new AtomicInteger();

        final Future<Boolean> HEALTHY;
        final Future<Boolean> UNHEALTHY;

        @SuppressWarnings("unchecked")
        Pool(Bootstrap bootstrap,
                PoolFactory provider,
                Consumer<? super Channel> onChannelCreate,
                EventLoopGroup group) {
            this.pool = provider.newPool(bootstrap, this, this);
            this.onChannelCreate = onChannelCreate;
            this.defaultGroup = group;
            HEALTHY = group.next()
                           .newSucceededFuture(true);
            UNHEALTHY = group.next()
                             .newSucceededFuture(false);
        }

        @Override
        public Future<Boolean> isHealthy(Channel channel) {
            return channel.isActive() ? HEALTHY : UNHEALTHY;
        }

        @Override
        public Future<Channel> acquire() {
            return pool.acquire();
        }

        @Override
        public Future<Channel> acquire(Promise<Channel> promise) {
            return pool.acquire(promise);
        }

        @Override
        public Future<Void> release(Channel channel) {
            return pool.release(channel);
        }

        @Override
        public Future<Void> release(Channel channel, Promise<Void> promise) {
            return pool.release(channel, promise);
        }

        @Override
        public void close() {
            if(compareAndSet(false, true)) {
                pool.close();
            }
        }

        @Override
        public void channelReleased(Channel ch) throws Exception {
            activeConnections.decrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug("Released {}, now {} active connections",
                        ch.toString(),
                        activeConnections);
            }
        }

        @Override
        public void channelAcquired(Channel ch) throws Exception {
            activeConnections.incrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug("Acquired {}, now {} active connections",
                        ch.toString(),
                        activeConnections);
            }
        }

        @Override
        public void channelCreated(Channel ch) throws Exception {
            activeConnections.incrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug("Created {}, now {} active connections",
                        ch.toString(),
                        activeConnections);
            }
            if (onChannelCreate != null) {
                onChannelCreate.accept(ch);
            }
        }

        @Override
        public String toString() {
            return pool.getClass()
                       .getSimpleName() + "{" + "activeConnections=" + activeConnections + '}';
        }
    }

As you can see, the underlying ChannelPool is created using provider.newPool
The provider here is a Lambda expression, SimpleChannelPool::new

    interface PoolFactory {

        ChannelPool newPool(Bootstrap b,
                ChannelPoolHandler handler,
                ChannelHealthChecker checker);
    }

The constructor of three parameters, namely bootstrap bootstrap, final channelpoolhandler handler and channelhealthhecker healthcheck of SimpleChannelPool, is used.
Pool itself implements ChannelPoolHandler and ChannelHealthChecker interfaces.

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/pool/SimpleChannelPool.java

    /**
     * Creates a new instance.
     *
     * @param bootstrap         the {@link Bootstrap} that is used for connections
     * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
     * @param healthCheck       the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
     *                          still healthy when obtain from the {@link ChannelPool}
     */
    public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
        this(bootstrap, handler, healthCheck, true);
    }

ChannelPoolHandler

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/pool/ChannelPoolHandler.java

/**
 * Handler which is called for various actions done by the {@link ChannelPool}.
 */
public interface ChannelPoolHandler {
    /**
     * Called once a {@link Channel} was released by calling {@link ChannelPool#release(Channel)} or
     * {@link ChannelPool#release(Channel, Promise)}.
     *
     * This method will be called by the {@link EventLoop} of the {@link Channel}.
     */
    void channelReleased(Channel ch) throws Exception;

    /**
     * Called once a {@link Channel} was acquired by calling {@link ChannelPool#acquire()} or
     * {@link ChannelPool#acquire(Promise)}.
     *
     * This method will be called by the {@link EventLoop} of the {@link Channel}.
     */
    void channelAcquired(Channel ch) throws Exception;

    /**
     * Called once a new {@link Channel} is created in the {@link ChannelPool}.
     *
     * This method will be called by the {@link EventLoop} of the {@link Channel}.
     */
    void channelCreated(Channel ch) throws Exception;
}

ChannelHealthChecker

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/pool/ChannelHealthChecker.java

/**
 * Called before a {@link Channel} will be returned via {@link ChannelPool#acquire()} or
 * {@link ChannelPool#acquire(Promise)}.
 */
public interface ChannelHealthChecker {

    /**
     * {@link ChannelHealthChecker} implementation that checks if {@link Channel#isActive()} returns {@code true}.
     */
    ChannelHealthChecker ACTIVE = new ChannelHealthChecker() {
        @Override
        public Future<Boolean> isHealthy(Channel channel) {
            EventLoop loop = channel.eventLoop();
            return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE);
        }
    };

    /**
     * Check if the given channel is healthy which means it can be used. The returned {@link Future} is notified once
     * the check is complete. If notified with {@link Boolean#TRUE} it can be used {@link Boolean#FALSE} otherwise.
     *
     * This method will be called by the {@link EventLoop} of the {@link Channel}.
     */
    Future<Boolean> isHealthy(Channel channel);
}

SimpleChannelPool

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/pool/SimpleChannelPool.java

/**
 * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
 * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
 *
 * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
 *
 */
public class SimpleChannelPool implements ChannelPool {
    private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool");
    private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new IllegalStateException("ChannelPool full"), SimpleChannelPool.class, "releaseAndOffer(...)");

    private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
    private final ChannelPoolHandler handler;
    private final ChannelHealthChecker healthCheck;
    private final Bootstrap bootstrap;
    private final boolean releaseHealthCheck;
    private final boolean lastRecentUsed;

    //......
    /**
     * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no
     * {@link Channel} is ready to be reused.
     *
     * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
     * implementations of these methods needs to be thread-safe!
     */
    protected Channel pollChannel() {
        return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
    }

    /**
     * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel}
     * could be added, {@code false} otherwise.
     *
     * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
     * implementations of these methods needs to be thread-safe!
     */
    protected boolean offerChannel(Channel channel) {
        return deque.offer(channel);
    }
}

SimpleChannelPool uses a Deque of LIFO to maintain the Channel

SimpleChannelPool.acquire

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/pool/SimpleChannelPool.java

    @Override
    public final Future<Channel> acquire() {
        return acquire(bootstrap.config().group().next().<Channel>newPromise());
    }

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        checkNotNull(promise, "promise");
        return acquireHealthyFromPoolOrNew(promise);
    }

    /**
     * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
     * @param promise the promise to provide acquire result.
     * @return future for acquiring a channel.
     */
    private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
        try {
            final Channel ch = pollChannel();
            if (ch == null) {
                // No Channel left in the pool bootstrap a new Channel
                Bootstrap bs = bootstrap.clone();
                bs.attr(POOL_KEY, this);
                ChannelFuture f = connectChannel(bs);
                if (f.isDone()) {
                    notifyConnect(f, promise);
                } else {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            notifyConnect(future, promise);
                        }
                    });
                }
                return promise;
            }
            EventLoop loop = ch.eventLoop();
            if (loop.inEventLoop()) {
                doHealthCheck(ch, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doHealthCheck(ch, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }

Note that pollChannel is called here to get from deque and perform healthCheck. If null, a new one is created.

SimpleChannelPool.release

    @Override
    public final Future<Void> release(Channel channel) {
        return release(channel, channel.eventLoop().<Void>newPromise());
    }

    @Override
    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
        checkNotNull(channel, "channel");
        checkNotNull(promise, "promise");
        try {
            EventLoop loop = channel.eventLoop();
            if (loop.inEventLoop()) {
                doReleaseChannel(channel, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doReleaseChannel(channel, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            closeAndFail(channel, cause, promise);
        }
        return promise;
    }

    private void doReleaseChannel(Channel channel, Promise<Void> promise) {
        assert channel.eventLoop().inEventLoop();
        // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
        if (channel.attr(POOL_KEY).getAndSet(null) != this) {
            closeAndFail(channel,
                         // Better include a stacktrace here as this is an user error.
                         new IllegalArgumentException(
                                 "Channel " + channel + " was not acquired from this ChannelPool"),
                         promise);
        } else {
            try {
                if (releaseHealthCheck) {
                    doHealthCheckOnRelease(channel, promise);
                } else {
                    releaseAndOffer(channel, promise);
                }
            } catch (Throwable cause) {
                closeAndFail(channel, cause, promise);
            }
        }
    }

    private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
        final Future<Boolean> f = healthCheck.isHealthy(channel);
        if (f.isDone()) {
            releaseAndOfferIfHealthy(channel, promise, f);
        } else {
            f.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    releaseAndOfferIfHealthy(channel, promise, f);
                }
            });
        }
    }

    /**
     * Adds the channel back to the pool only if the channel is healthy.
     * @param channel the channel to put back to the pool
     * @param promise offer operation promise.
     * @param future the future that contains information fif channel is healthy or not.
     * @throws Exception in case when failed to notify handler about release operation.
     */
    private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future)
            throws Exception {
        if (future.getNow()) { //channel turns out to be healthy, offering and releasing it.
            releaseAndOffer(channel, promise);
        } else { //channel not healthy, just releasing it.
            handler.channelReleased(channel);
            promise.setSuccess(null);
        }
    }

    private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception {
        if (offerChannel(channel)) {
            handler.channelReleased(channel);
            promise.setSuccess(null);
        } else {
            closeAndFail(channel, FULL_EXCEPTION, promise);
        }
    }

Call offerChannel at release to put Channel back into deque
SimpleChannelPool created by a constructor with three parameters has a releaseHealthCheck value of true, that is, health check is performed when releasing.

TcpClient.doHandler

    /**
     * Create a {@link ContextHandler} for {@link Bootstrap#handler()}
     *
     * @param handler user provided in/out handler
     * @param sink user provided bind handler
     * @param secure if operation should be secured
     * @param pool if channel pool
     * @param onSetup if operation has local setup callback
     *
     * @return a new {@link ContextHandler}
     */
    protected ContextHandler<SocketChannel> doHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler,
            MonoSink<NettyContext> sink,
            boolean secure,
            SocketAddress providedAddress,
            ChannelPool pool,
            Consumer<? super Channel> onSetup) {
        return ContextHandler.newClientContext(sink,
                options,
                loggingHandler,
                secure,
                providedAddress,
                pool,
                handler == null ? EMPTY :
                        (ch, c, msg) -> ChannelOperations.bind(ch, handler, c));
    }

New clientcontext is called here to create a ContextHandler<SocketChannel >

ContextHandler.newClientContext

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/channel/ContextHandler.java

    /**
     * Create a new client context with optional pool support
     *
     * @param sink
     * @param options
     * @param loggingHandler
     * @param secure
     * @param providedAddress
     * @param channelOpFactory
     * @param pool
     * @param <CHANNEL>
     *
     * @return a new {@link ContextHandler} for clients
     */
    public static <CHANNEL extends Channel> ContextHandler<CHANNEL> newClientContext(
            MonoSink<NettyContext> sink,
            ClientOptions options,
            LoggingHandler loggingHandler,
            boolean secure,
            SocketAddress providedAddress,
            ChannelPool pool, ChannelOperations.OnNew<CHANNEL> channelOpFactory) {
        if (pool != null) {
            return new PooledClientContextHandler<>(channelOpFactory,
                    options,
                    sink,
                    loggingHandler,
                    secure,
                    providedAddress,
                    pool);
        }
        return new ClientContextHandler<>(channelOpFactory,
                options,
                sink,
                loggingHandler,
                secure,
                providedAddress);
    }

Note that the Lambda expression of newHandler is registered here as channeloperations. onnew < channel > channelOpFactory.
Pool was null when doHandler was first called, creating ClientContextHandler; When the pool is created and doHandler is called for the second time, the pool is not null and the PooledClientContextHandler is created.

PooledClientContextHandler

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/channel/PooledClientContextHandler.java

    @Override
    public void fireContextActive(NettyContext context) {
        if (!fired) {
            fired = true;
            if (context != null) {
                sink.success(context);
            }
            else {
                sink.success();
            }
        }
    }

    @Override
    @SuppressWarnings("unchecked")
    public void setFuture(Future<?> future) {
        Objects.requireNonNull(future, "future");

        Future<CHANNEL> f;
        for (; ; ) {
            f = this.future;

            if (f == DISPOSED) {
                if (log.isDebugEnabled()) {
                    log.debug("Cancelled existing channel from pool: {}",
                            pool.toString());
                }
                sink.success();
                return;
            }

            if (FUTURE.compareAndSet(this, f, future)) {
                break;
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Acquiring existing channel from pool: {} {}",
                    future,
                    pool.toString());
        }
        ((Future<CHANNEL>) future).addListener(this);
    }

    final void connectOrAcquire(CHANNEL c) {
        if (DISPOSED == this.future) {
            if (log.isDebugEnabled()) {
                log.debug("Dropping acquisition {} because of {}",
                        "asynchronous user cancellation");
            }
            disposeOperationThenRelease(c);
            sink.success();
            return;
        }

        if (!c.isActive()) {
            log.debug("Immediately aborted pooled channel, re-acquiring new " + "channel: {}",
                    c.toString());
            release(c);
            setFuture(pool.acquire());
            return;
        }

        ChannelOperationsHandler op = c.pipeline()
                                       .get(ChannelOperationsHandler.class);

        if (op == null) {
            if (log.isDebugEnabled()) {
                log.debug("Created new pooled channel: " + c.toString());
            }
            c.closeFuture()
             .addListener(ff -> release(c));
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Acquired active channel: " + c.toString());
        }
        if (createOperations(c, null) == null) {
            setFuture(pool.acquire());
        }
    }

    public void operationComplete(Future<CHANNEL> future) throws Exception {
        if (future.isCancelled()) {
            if (log.isDebugEnabled()) {
                log.debug("Cancelled {}", future.toString());
            }
            return;
        }

        if (DISPOSED == this.future) {
            if (log.isDebugEnabled()) {
                log.debug("Dropping acquisition {} because of {}",
                        future,
                        "asynchronous user cancellation");
            }
            if (future.isSuccess()) {
                disposeOperationThenRelease(future.get());
            }
            sink.success();
            return;
        }

        if (!future.isSuccess()) {
            if (future.cause() != null) {
                fireContextError(future.cause());
            }
            else {
                fireContextError(new AbortedException("error while acquiring connection"));
            }
            return;
        }

        CHANNEL c = future.get();

        if (c.eventLoop()
             .inEventLoop()) {
            connectOrAcquire(c);
        }
        else {
            c.eventLoop()
             .execute(() -> connectOrAcquire(c));
        }
    }    

FireContextActive, setFuture, connectOrAcquire, operationComplete all call MonoCreate’s success method to generate data.

Mono.subscribe

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/Mono.java

    /**
     * Subscribe to this {@link Mono} and request unbounded demand.
     * <p>
     * This version doesn't specify any consumption behavior for the events from the
     * chain, especially no error handling, so other variants should usually be preferred.
     *
     * <p>
     * <img width="500" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/unbounded1.png" alt="">
     * <p>
     *
     * @return a new {@link Disposable} that can be used to cancel the underlying {@link Subscription}
     */
    public final Disposable subscribe() {
        if(this instanceof MonoProcessor){
            MonoProcessor<T> s = (MonoProcessor<T>)this;
            s.connect();
            return s;
        }
        else{
            return subscribeWith(new LambdaMonoSubscriber<>(null, null, null, null));
        }
    }

What is created here is LambdaMonoSubscriber, and the last call is MonoCreate’s subscribe(actual) method.

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/MonoCreate.java

    public void subscribe(CoreSubscriber<? super T> actual) {
        DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual);

        actual.onSubscribe(emitter);

        try {
            callback.accept(emitter);
        }
        catch (Throwable ex) {
            emitter.error(Operators.onOperatorError(ex, actual.currentContext()));
        }
    }

The actual here is LambdaMonoSubscriber.
The callback.accept here is to call the Lambda expression in Mono.create in newHandler, that is, mono’s sink, to trigger the connection establishment and sending request.

Summary

TcpClient.newHandler returns a Mono, and triggers the Lambda expression that executes MonoCreate when subscribe.

  • Inside, get or create a new channelPool from channelPools.
  • The Lambda expression in newHandler is registered as ChannelOperations.OnNew’s channelOpFactory, and is executed after the connection is established, i.e. data is sent.
  • Then call the acquire method of channelPool (Establish a good connection)
  • When the connection is finally released, return the channel to the channelPool of the corresponding address.