How does the TcpClient of reactor-netty submit a task to eventLoop?

  reactor

Order

This article mainly studies how the TcpClient of reactor-netty submits task to eventLoop.

Example

        TcpClient client = TcpClient.create("localhost", 8888);
        LOGGER.info("client:{}",client.getClass());
        Mono<? extends NettyContext> handler = client.newHandler((inbound,outbound) -> {
            return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                    .asString().next().log().then());
        });
        LOGGER.info("handler:{}",handler.getClass()); //NOTE reactor.core.publisher.MonoCreate
        handler.subscribe();

TcpClient.newHandler

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/tcp/TcpClient.java

    protected Mono<NettyContext> newHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler,
            InetSocketAddress address,
            boolean secure,
            Consumer<? super Channel> onSetup) {

        final BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>>
                targetHandler =
                null == handler ? ChannelOperations.noopHandler() : handler;

        return Mono.create(sink -> {
            SocketAddress remote = address != null ? address : options.getAddress();

            ChannelPool pool = null;

            PoolResources poolResources = options.getPoolResources();
            if (poolResources != null) {
                pool = poolResources.selectOrCreate(remote, options,
                        doHandler(null, sink, secure, remote, null, null),
                        options.getLoopResources().onClient(options.preferNative()));
            }

            ContextHandler<SocketChannel> contextHandler =
                    doHandler(targetHandler, sink, secure, remote, pool, onSetup);
            sink.onCancel(contextHandler);

            if (pool == null) {
                Bootstrap b = options.get();
                b.remoteAddress(remote);
                b.handler(contextHandler);
                contextHandler.setFuture(b.connect());
            }
            else {
                contextHandler.setFuture(pool.acquire());
            }
        });
    }

Note pool.acquire () or b.connect () here

SimpleChannelPool.acquireHealthyFromPoolOrNew

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/pool/SimpleChannelPool.java

    /**
     * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
     * @param promise the promise to provide acquire result.
     * @return future for acquiring a channel.
     */
    private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
        try {
            final Channel ch = pollChannel();
            if (ch == null) {
                // No Channel left in the pool bootstrap a new Channel
                Bootstrap bs = bootstrap.clone();
                bs.attr(POOL_KEY, this);
                ChannelFuture f = connectChannel(bs);
                if (f.isDone()) {
                    notifyConnect(f, promise);
                } else {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            notifyConnect(future, promise);
                        }
                    });
                }
                return promise;
            }
            EventLoop loop = ch.eventLoop();
            if (loop.inEventLoop()) {
                doHealthCheck(ch, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doHealthCheck(ch, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }

    /**
     * Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()}, sub-classes may
     * override this.
     * <p>
     * The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()}, so it is safe to modify.
     */
    protected ChannelFuture connectChannel(Bootstrap bs) {
        return bs.connect();
    }

Pool.acquire () finally calls simplechannelpool.acquire healthfromolornew, and it finally calls connectChannel and Bootstrap.connect

Bootstrap.connect

netty-transport-4.1.20.Final-sources.jar! /io/netty/bootstrap/Bootstrap.java

    /**
     * Connect a {@link Channel} to the remote peer.
     */
    public ChannelFuture connect() {
        validate();
        SocketAddress remoteAddress = this.remoteAddress;
        if (remoteAddress == null) {
            throw new IllegalStateException("remoteAddress not set");
        }

        return doResolveAndConnect(remoteAddress, config.localAddress());
    }

    /**
     * @see #connect()
     */
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                    // failure.
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }    

Note that initAndRegister is called here
Then call doResolveAndConnect0

initAndRegister

netty-transport-4.1.20.Final-sources.jar! /io/netty/bootstrap/AbstractBootstrap.java

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

New channel () is called here to create a channel and then initialized. socket exception (“too many open files”) may be thrown here. in case of exception, fail is directly set and DefaultChannelPromise is returned.
Note that config (). group (). register (channel) is called here. In reactor-netty, this group is MultithreadEventLoopGroup.java

io.netty.channel.ReflectiveChannelFactory.newChannel()

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/ReflectiveChannelFactory.java

    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

New here is NioSocketChannel.class
netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/socket/nio/NioSocketChannel.java

    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    private static SocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openSocketChannel();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a socket.", e);
        }
    }
    /**
     * Create a new instance using the given {@link SelectorProvider}.
     */
    public NioSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }

The provider here in mac operating system is sun.nio.ch.kqueueselectorprovider, and openSocketChannel calls the SelectorProviderImpl method.

jre/lib/rt.jar! /sun/nio/ch/SelectorProviderImpl.class

    public SocketChannel openSocketChannel() throws IOException {
        return new SocketChannelImpl(this);
    }

jre/lib/rt.jar! /sun/nio/ch/SocketChannelImpl.class

    SocketChannelImpl(SelectorProvider var1) throws IOException {
        super(var1);
        this.fd = Net.socket(true);
        this.fdVal = IOUtil.fdVal(this.fd);
        this.state = 0;
    }

Note that Net.socket(true) is called here to create FileDescriptor, which may throw socket exception (“too many openfiles”)

Bootstrap.init(channel)

netty-transport-4.1.20.Final-sources.jar! /io/netty/bootstrap/Bootstrap.java

    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(config.handler());

        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }

Here is mainly to set some options and properties

MultithreadEventLoopGroup.register(channel)

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/MultithreadEventLoopGroup.java

    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

Next here returns SingleThreadEventLoop.

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/SingleThreadEventLoop.java

    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }    

Unsafe here is AbstractChannel$AbstractUnsafe.

AbstractChannel$AbstractUnsafe

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/AbstractChannel.java

        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

You can see here that the eventLoop.execute is called, this eventLoop is NioEventLoop, and the parent class SingleThreadEventLoop.execute is called.
Note that the ChannelPromise is repackaged here, calling the register0 operation.

SingleThreadEventLoop.execute

    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

    /**
     * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
     * before.
     */
    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }    

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }    

It can be seen here that execute calls addTask, while addTask calls offerTask and finally goes to taskQueue to offer the task.

register0

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/AbstractChannel.java

        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

TaskQueue takes out the register0 operation, mainly fireChannelregistered and fireChannelActive. This method sets the Registered field to true

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/socket/nio/NioSocketChannel.java

    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

Whether active mainly determines whether open and connected

Bootstrap.doResolveAndConnect0

netty-transport-4.1.20.Final-sources.jar! /io/netty/bootstrap/Bootstrap.java

    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise) {
        try {
            final EventLoop eventLoop = channel.eventLoop();
            final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

            if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                // Resolver has no idea about what to do with the specified remote address or it's resolved already.
                doConnect(remoteAddress, localAddress, promise);
                return promise;
            }

            final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

            if (resolveFuture.isDone()) {
                final Throwable resolveFailureCause = resolveFuture.cause();

                if (resolveFailureCause != null) {
                    // Failed to resolve immediately
                    channel.close();
                    promise.setFailure(resolveFailureCause);
                } else {
                    // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                    doConnect(resolveFuture.getNow(), localAddress, promise);
                }
                return promise;
            }

            // Wait until the name resolution is finished.
            resolveFuture.addListener(new FutureListener<SocketAddress>() {
                @Override
                public void operationComplete(Future<SocketAddress> future) throws Exception {
                    if (future.cause() != null) {
                        channel.close();
                        promise.setFailure(future.cause());
                    } else {
                        doConnect(future.getNow(), localAddress, promise);
                    }
                }
            });
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }

    private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (localAddress == null) {
                    channel.connect(remoteAddress, connectPromise);
                } else {
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        });
    }

Here DoresolvandeConnect 0 will be triggered after initAndRegister executes successfully, and this is the real connect operation.

Main steps of Bootstrap.connect

How many connect can be executed and there are so many levels:

  • Creation and initialization of channel: the ChannelFuture returned by newChannel and init () in initAndRegister method of abstract bootstrap is not failed, possibly because socket exception (“too many open files”) cannot create FileDescriptor.
  • Submit the task of registering channel: register this register task with eventLoop. here, the taskQueue queue is required to be able to accommodate it. the default is Integer.MAX_VALUE. there is no problem. This task will be reject if it cannot accommodate it, throwing a RejectedExecutionException (Force-closing a channel whose registration task was not accepted by an event loop), the promise is set to failure, and if initAndRegister fails, the channel is directly close.

Register0(promise) in taskQueue changes its status to REGISTERED, triggers the following task to connect, then determines whether to open and connect, and if so, changes its status to ACTIVE

  • Register the operationComplete callback for this register ChannelFuture: call doresolveandannect 0, which performs channel connect.

The state change of channel is created-> registered-> connect-> active

21:53:50.934 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.resources.DefaultPoolResources - Created [id: 0x1ebe331c], now 1 active connections
21:53:50.941 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.ContextHandler - After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
21:53:50.942 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c] REGISTERED
21:54:49.561 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c] CONNECT: localhost/127.0.0.1:8888
21:54:49.571 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] ACTIVE

Send and receive data and close channel

After becoming active, it will trigger Lambda expression in newHandler to send data to channel for writing.

22:13:12.174 [main] DEBUG reactor.ipc.netty.channel.PooledClientContextHandler - Acquiring existing channel from pool: DefaultPromise@97e93f1(success: [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888]) SimpleChannelPool{activeConnections=1}
22:13:19.773 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.PooledClientContextHandler - Acquired active channel: [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888]
22:13:25.291 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.ChannelOperations - [Channel] [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] handler is being applied: com.example.demo.TcpTest$$Lambda$7/1541049864@41d1fa89
22:15:17.748 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.ChannelOperationsHandler - [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] Writing object FluxMapFuseable
22:15:21.719 [reactor-tcp-nio-4] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
22:15:21.719 [reactor-tcp-nio-4] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
22:15:21.719 [reactor-tcp-nio-4] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
22:15:21.719 [reactor-tcp-nio-4] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
22:15:21.742 [reactor-tcp-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
22:15:21.756 [reactor-tcp-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@5c2a00d6
22:15:23.010 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] WRITE: 12B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21             |Hello World!    |
+--------+-------------------------------------------------+----------------+
22:15:25.042 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] FLUSH
22:15:27.861 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.FluxReceive - [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
22:15:27.864 [reactor-tcp-nio-4] INFO reactor.Mono.Next.1 - onSubscribe(MonoNext.NextSubscriber)
22:15:27.869 [reactor-tcp-nio-4] INFO reactor.Mono.Next.1 - request(unbounded)
22:15:32.557 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] READ: 12B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21             |Hello World!    |
+--------+-------------------------------------------------+----------------+
22:15:34.292 [reactor-tcp-nio-4] INFO reactor.Mono.Next.1 - onNext(Hello World!)
22:15:34.292 [reactor-tcp-nio-4] INFO reactor.Mono.Next.1 - onComplete()
22:15:34.293 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.ChannelOperations - [Channel] [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] User Handler requesting close connection
22:15:34.296 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] USER_EVENT: [Handler Terminated]
22:15:34.296 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.ChannelOperationsHandler - [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@28add41a
22:15:34.296 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.PooledClientContextHandler - Releasing channel: [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888]
22:15:34.297 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c, L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] CLOSE
22:15:35.967 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.resources.DefaultPoolResources - Released [id: 0x1ebe331c, L:/127.0.0.1:55386 ! R:localhost/127.0.0.1:8888], now 0 active connections
22:15:35.968 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c, L:/127.0.0.1:55386 ! R:localhost/127.0.0.1:8888] READ COMPLETE
22:15:35.969 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c, L:/127.0.0.1:55386 ! R:localhost/127.0.0.1:8888] INACTIVE
22:15:35.969 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c, L:/127.0.0.1:55386 ! R:localhost/127.0.0.1:8888] UNREGISTERED

Note that the state/operation transition of channel here is active-> write-> flush-> read-> close-> released-> readcomplete-> inactive-> unregistered

Summary

  • It can be seen that TcpClient.newHandler triggers Lambda expression to trigger connection when subscribe, and the last call is Bootstrap.connect
  • However, Bootstrap.connect called the MultiThreadEventLoopgroup. register (Channel) method, which finally turned into DefaultChannelPromise to register through AbstractChannel$AbstractUnsafe.
  • AbstractChannel$AbstractUnsafe calls taskQueue.offer(task) to put this register0(promise) into taskQueue of eventLoop.

TaskQueue is LinkedBlockingQueue, and its size is specified by the parameter DEFAULT_MAX_PENDING_TASKS: Math.MAX (16, SystemPropertyUtil.GetInt (“IO.Netty.EventLoop.MaxPENDING TASKS”, Integer.MAX_VALUE)), which is equivalent to unbounded by default.

doc