Talk about two modes of reactor-netty PoolResources

  reactor

Order

This paper mainly studies two modes of reactor-netty PoolResources, elastic and fixed.

LoopResources and PoolResources

TcpResources is a tool class that can be used to create loopResources and poolResources.

loopResources

The main thing is to create a NioEventLoop and the workerCount NiovementLoops under the group (two parameters are involved here, one is worker thread count and the other is selector thread count)

  • DEFAULT_IO_WORKER_COUNT: this value is used if the environment variable has reactor.ipc.netty.workerCount set; If there is no setting, take Math.max (runtime.getruntime (). availableprocessors (), 4)))
  • DEFAULT_IO_SELECT_COUNT: this value is used if the environment variable has reactor.ipc.netty.selectCount set; If there is no setting, take -1, which means there is no selector thread.
  • DEFAULT_MAX_PENDING_TASKS: specifies the size of taskQueue for NioEventLoop, math.max (16, systempropertytil.getint (“io.nety.eventloop.maxpendingtasks”, integer.max _ value))
  • NioEventLoop inherits SingleThreadEventLoop, while SingleThreadEventLoop inherits SingleThreadEventexecutor, and its agent’s Executor is ThreadPerTaskExecutor. RejectHandler is rejectedexecutionhandlers.reject (), the default taskQueue is LinkedBlockingQueue, which is Integer.MAX_VALUE

poolResources

The main purpose is to create channelPools, the type of which is concurrent map < socket address, pool >. here, we will mainly study its two modes, elastic and fixed.

DefaultPoolResources

reactor-netty-0.7.5.RELEASE-sources.jar! /reactor/ipc/netty/resources/DefaultPoolResources.java

It implements netty-transport-4.1.22.final-sources.jar! The interface of/io/netty/channel/pool/channelpool.java focuses on the following methods:

        @Override
        public Future<Channel> acquire() {
            return acquire(defaultGroup.next().newPromise());
        }

        @Override
        public Future<Channel> acquire(Promise<Channel> promise) {
            return pool.acquire(promise).addListener(this);
        }

        @Override
        public Future<Void> release(Channel channel) {
            return pool.release(channel);
        }

        @Override
        public Future<Void> release(Channel channel, Promise<Void> promise) {
            return pool.release(channel, promise);
        }

        @Override
        public void close() {
            if(compareAndSet(false, true)) {
                pool.close();
            }
        }

Several interfaces here are basically commissioned to operate as specific pool, and their implementations are mainly SimpleChannelPool and FixedChannelPool.

PoolResources.elastic(SimpleChannelPool)

reactor-netty-0.7.5.RELEASE-sources.jar! /reactor/ipc/netty/resources/PoolResources.java

    /**
     * Create an uncapped {@link PoolResources} to provide automatically for {@link
     * ChannelPool}.
     * <p>An elastic {@link PoolResources} will never wait before opening a new
     * connection. The reuse window is limited but it cannot starve an undetermined volume
     * of clients using it.
     *
     * @param name the channel pool map name
     *
     * @return a new {@link PoolResources} to provide automatically for {@link
     * ChannelPool}
     */
    static PoolResources elastic(String name) {
        return new DefaultPoolResources(name, SimpleChannelPool::new);
    }

This is the method used by default in the process of TcpClient.create. SimpleChannelPool is used by default and DefaultPoolResources is created.

reactor-netty-0.7.5.RELEASE-sources.jar! /reactor/ipc/netty/tcp/TcpResources.java

    static <T extends TcpResources> T create(T previous,
            LoopResources loops,
            PoolResources pools,
            String name,
            BiFunction<LoopResources, PoolResources, T> onNew) {
        if (previous == null) {
            loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
            pools = pools == null ? PoolResources.elastic(name) : pools;
        }
        else {
            loops = loops == null ? previous.defaultLoops : loops;
            pools = pools == null ? previous.defaultPools : pools;
        }
        return onNew.apply(loops, pools);
    }

SimpleChannelPool

netty-transport-4.1.22.Final-sources.jar! /io/netty/channel/pool/SimpleChannelPool.java

/**
 * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
 * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
 *
 * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
 *
 */
public class SimpleChannelPool implements ChannelPool {

    @Override
    public final Future<Channel> acquire() {
        return acquire(bootstrap.config().group().next().<Channel>newPromise());
    }

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        checkNotNull(promise, "promise");
        return acquireHealthyFromPoolOrNew(promise);
    }

    /**
     * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
     * @param promise the promise to provide acquire result.
     * @return future for acquiring a channel.
     */
    private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
        try {
            final Channel ch = pollChannel();
            if (ch == null) {
                // No Channel left in the pool bootstrap a new Channel
                Bootstrap bs = bootstrap.clone();
                bs.attr(POOL_KEY, this);
                ChannelFuture f = connectChannel(bs);
                if (f.isDone()) {
                    notifyConnect(f, promise);
                } else {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            notifyConnect(future, promise);
                        }
                    });
                }
                return promise;
            }
            EventLoop loop = ch.eventLoop();
            if (loop.inEventLoop()) {
                doHealthCheck(ch, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doHealthCheck(ch, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }

    @Override
    public final Future<Void> release(Channel channel) {
        return release(channel, channel.eventLoop().<Void>newPromise());
    }

    @Override
    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
        checkNotNull(channel, "channel");
        checkNotNull(promise, "promise");
        try {
            EventLoop loop = channel.eventLoop();
            if (loop.inEventLoop()) {
                doReleaseChannel(channel, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doReleaseChannel(channel, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            closeAndFail(channel, cause, promise);
        }
        return promise;
    }

    @Override
    public void close() {
        for (;;) {
            Channel channel = pollChannel();
            if (channel == null) {
                break;
            }
            channel.close();
        }
    }

    //......
}    

This connection pool implementation creates a (No restrictions), remove the connection (Connection pooling uses a Deque of LIFO to maintain the Channel) will check the validity of the connection.

PoolResources.fixed(FixedChannelPool)

reactor-netty-0.7.5.RELEASE-sources.jar! /reactor/ipc/netty/resources/PoolResources.java

    /**
     * Default max connection, if -1 will never wait to acquire before opening new
     * connection in an unbounded fashion. Fallback to
     * available number of processors.
     */
    int DEFAULT_POOL_MAX_CONNECTION =
            Integer.parseInt(System.getProperty("reactor.ipc.netty.pool.maxConnections",
            "" + Math.max(Runtime.getRuntime()
                        .availableProcessors(), 8) * 2));

    /**
     * Default acquisition timeout before error. If -1 will never wait to
     * acquire before opening new
     * connection in an unbounded fashion. Fallback to
     * available number of processors.
     */
    long DEFAULT_POOL_ACQUIRE_TIMEOUT = Long.parseLong(System.getProperty(
            "reactor.ipc.netty.pool.acquireTimeout",
            "" + 45000));

    /**
     * Create a capped {@link PoolResources} to provide automatically for {@link
     * ChannelPool}.
     * <p>A Fixed {@link PoolResources} will open up to the given max number of
     * processors observed by this jvm (minimum 4).
     * Further connections will be pending acquisition indefinitely.
     *
     * @param name the channel pool map name
     *
     * @return a new {@link PoolResources} to provide automatically for {@link
     * ChannelPool}
     */
    static PoolResources fixed(String name) {
        return fixed(name, DEFAULT_POOL_MAX_CONNECTION);
    }

    /**
     * Create a capped {@link PoolResources} to provide automatically for {@link
     * ChannelPool}.
     * <p>A Fixed {@link PoolResources} will open up to the given max connection value.
     * Further connections will be pending acquisition indefinitely.
     *
     * @param name the channel pool map name
     * @param maxConnections the maximum number of connections before starting pending
     * acquisition on existing ones
     *
     * @return a new {@link PoolResources} to provide automatically for {@link
     * ChannelPool}
     */
    static PoolResources fixed(String name, int maxConnections) {
        return fixed(name, maxConnections, DEFAULT_POOL_ACQUIRE_TIMEOUT);
    }

    /**
     * Create a capped {@link PoolResources} to provide automatically for {@link
     * ChannelPool}.
     * <p>A Fixed {@link PoolResources} will open up to the given max connection value.
     * Further connections will be pending acquisition indefinitely.
     *
     * @param name the channel pool map name
     * @param maxConnections the maximum number of connections before starting pending
     * @param acquireTimeout the maximum time in millis to wait for aquiring
     *
     * @return a new {@link PoolResources} to provide automatically for {@link
     * ChannelPool}
     */
    static PoolResources fixed(String name, int maxConnections, long acquireTimeout) {
        if (maxConnections == -1) {
            return elastic(name);
        }
        if (maxConnections <= 0) {
            throw new IllegalArgumentException("Max Connections value must be strictly " + "positive");
        }
        if (acquireTimeout != -1L && acquireTimeout < 0) {
            throw new IllegalArgumentException("Acquire Timeout value must " + "be " + "positive");
        }
        return new DefaultPoolResources(name,
                (bootstrap, handler, checker) -> new FixedChannelPool(bootstrap,
                        handler,
                        checker,
                        FixedChannelPool.AcquireTimeoutAction.FAIL,
                        acquireTimeout,
                        maxConnections,
                        Integer.MAX_VALUE
                        ));
    }

The last called fixed method has three parameters, one is name, one is maxConnections, and one is acquireTimeout. As you can see, what was created here is FixedChannelPool.

FixedChannelPool

netty-transport-4.1.22.Final-sources.jar! /io/netty/channel/pool/FixedChannelPool.java

/**
 * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
 * number of concurrent connections.
 */
public class FixedChannelPool extends SimpleChannelPool {

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        try {
            if (executor.inEventLoop()) {
                acquire0(promise);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        acquire0(promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.setFailure(cause);
        }
        return promise;
    }

    private void acquire0(final Promise<Channel> promise) {
        assert executor.inEventLoop();

        if (closed) {
            promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
            return;
        }
        if (acquiredChannelCount < maxConnections) {
            assert acquiredChannelCount >= 0;

            // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
            // EventLoop
            Promise<Channel> p = executor.newPromise();
            AcquireListener l = new AcquireListener(promise);
            l.acquired();
            p.addListener(l);
            super.acquire(p);
        } else {
            if (pendingAcquireCount >= maxPendingAcquires) {
                promise.setFailure(FULL_EXCEPTION);
            } else {
                AcquireTask task = new AcquireTask(promise);
                if (pendingAcquireQueue.offer(task)) {
                    ++pendingAcquireCount;

                    if (timeoutTask != null) {
                        task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
                    }
                } else {
                    promise.setFailure(FULL_EXCEPTION);
                }
            }

            assert pendingAcquireCount > 0;
        }
    }

    @Override
    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        final Promise<Void> p = executor.newPromise();
        super.release(channel, p.addListener(new FutureListener<Void>() {

            @Override
            public void operationComplete(Future<Void> future) throws Exception {
                assert executor.inEventLoop();

                if (closed) {
                    // Since the pool is closed, we have no choice but to close the channel
                    channel.close();
                    promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
                    return;
                }

                if (future.isSuccess()) {
                    decrementAndRunTaskQueue();
                    promise.setSuccess(null);
                } else {
                    Throwable cause = future.cause();
                    // Check if the exception was not because of we passed the Channel to the wrong pool.
                    if (!(cause instanceof IllegalArgumentException)) {
                        decrementAndRunTaskQueue();
                    }
                    promise.setFailure(future.cause());
                }
            }
        }));
        return promise;
    }

    @Override
    public void close() {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                if (!closed) {
                    closed = true;
                    for (;;) {
                        AcquireTask task = pendingAcquireQueue.poll();
                        if (task == null) {
                            break;
                        }
                        ScheduledFuture<?> f = task.timeoutFuture;
                        if (f != null) {
                            f.cancel(false);
                        }
                        task.promise.setFailure(new ClosedChannelException());
                    }
                    acquiredChannelCount = 0;
                    pendingAcquireCount = 0;
                    FixedChannelPool.super.close();
                }
            }
        });
    }
    //......
}

Acquire here, if the current thread is not in the eventLoop, it is put into the queue to wait for the execution of acquire0. it may explode taskQueue of eventLoop, but the value of queue size is Math.max(16, Getint (“io.netty.eventloop.maxpendingtasks”, Integer.MAX_VALUE “), default is integer.max _ value.

FixedChannelPool inherits SimpleChannelPool and rewrites acquire, release, close methods. It restricts access to connections and has the following parameters:

  • maxConnections

This value is first taken from the system variable reactor.ipc.netty.pool.maxconnections (If it is set to -1, it means there is no limit and return to elastic mode.), if it is not set, take Math.max (runtime.getruntime (). available processors (), 8) * 2, i.e. twice the maximum value of the kernel and 8.

  • acquireTimeout

This value is first taken from the system variable reactor.ipc.netty.pool.acquiretimeout (If set to -1, it means immediate execution without waiting.), 45,000 milliseconds if not set.

  • maxPendingAcquires

The setting here is Integer.MAX_VALUE

  • AcquireTimeoutAction

This is set to fixedchannelbook.acquiretimeoutaction.fail, i.e. timeoutTask is

                timeoutTask = new TimeoutTask() {
                    @Override
                    public void onTimeout(AcquireTask task) {
                        // Fail the promise as we timed out.
                        task.promise.setFailure(TIMEOUT_EXCEPTION);
                    }
                };

If the current connection exceeds maxConnections, enter pendingacquire to wait for the connection, and before entering pendingacquire, return FULL_EXCEPTION (Too many outstanding acquire operations), the setting here is Integer.MAX_VALUE, so there will be no exception. After entering PendingAcquire, there is another acquireTimeout parameter, that is, enter PendingAcquire to wait for acquireTimeout time, and return TIMEOUT_EXCEPTION (Acquire operation took longer then configured maximum time)。

Summary

The PoolResources created by the default TcpClient uses the elastic mode, that is, the connection pool is implemented as SimpleChannelPool. By default, a LIFO Deque is used to maintain the Channel. If a connection cannot be obtained from the connection pool, a new connection will be created. The upper limit should be the number of file resources set by the system that can be opened. If it exceeds the limit, a SocketException: TOOMANY OPEN FILES will be reported. PoolResources also provides a FixedChannelPool implementation, which uses a fixed mode, i.e. limits the maximum number of connection pool connections and the maximum waiting timeout to avoid excessive connection creation, bursting memory or reporting socket exception: too many openfiles exception.

Note that for fixed mode, if reactor.ipc.netty.pool.maxconnections is set to -1, then fallback to elastic mode.

doc