Order
This paper mainly studies two modes of reactor-netty PoolResources, elastic and fixed.
LoopResources and PoolResources
TcpResources is a tool class that can be used to create loopResources and poolResources.
loopResources
The main thing is to create a NioEventLoop and the workerCount NiovementLoops under the group (two parameters are involved here, one is worker thread count and the other is selector thread count)
- DEFAULT_IO_WORKER_COUNT: this value is used if the environment variable has reactor.ipc.netty.workerCount set; If there is no setting, take Math.max (runtime.getruntime (). availableprocessors (), 4)))
- DEFAULT_IO_SELECT_COUNT: this value is used if the environment variable has reactor.ipc.netty.selectCount set; If there is no setting, take -1, which means there is no selector thread.
- DEFAULT_MAX_PENDING_TASKS: specifies the size of taskQueue for NioEventLoop, math.max (16, systempropertytil.getint (“io.nety.eventloop.maxpendingtasks”, integer.max _ value))
- NioEventLoop inherits SingleThreadEventLoop, while SingleThreadEventLoop inherits SingleThreadEventexecutor, and its agent’s Executor is ThreadPerTaskExecutor. RejectHandler is rejectedexecutionhandlers.reject (), the default taskQueue is LinkedBlockingQueue, which is Integer.MAX_VALUE
poolResources
The main purpose is to create channelPools, the type of which is concurrent map < socket address, pool >. here, we will mainly study its two modes, elastic and fixed.
DefaultPoolResources
reactor-netty-0.7.5.RELEASE-sources.jar! /reactor/ipc/netty/resources/DefaultPoolResources.java
It implements netty-transport-4.1.22.final-sources.jar! The interface of/io/netty/channel/pool/channelpool.java focuses on the following methods:
@Override
public Future<Channel> acquire() {
return acquire(defaultGroup.next().newPromise());
}
@Override
public Future<Channel> acquire(Promise<Channel> promise) {
return pool.acquire(promise).addListener(this);
}
@Override
public Future<Void> release(Channel channel) {
return pool.release(channel);
}
@Override
public Future<Void> release(Channel channel, Promise<Void> promise) {
return pool.release(channel, promise);
}
@Override
public void close() {
if(compareAndSet(false, true)) {
pool.close();
}
}
Several interfaces here are basically commissioned to operate as specific pool, and their implementations are mainly SimpleChannelPool and FixedChannelPool.
PoolResources.elastic(SimpleChannelPool
)
reactor-netty-0.7.5.RELEASE-sources.jar! /reactor/ipc/netty/resources/PoolResources.java
/**
* Create an uncapped {@link PoolResources} to provide automatically for {@link
* ChannelPool}.
* <p>An elastic {@link PoolResources} will never wait before opening a new
* connection. The reuse window is limited but it cannot starve an undetermined volume
* of clients using it.
*
* @param name the channel pool map name
*
* @return a new {@link PoolResources} to provide automatically for {@link
* ChannelPool}
*/
static PoolResources elastic(String name) {
return new DefaultPoolResources(name, SimpleChannelPool::new);
}
This is the method used by default in the process of TcpClient.create. SimpleChannelPool is used by default and DefaultPoolResources is created.
reactor-netty-0.7.5.RELEASE-sources.jar! /reactor/ipc/netty/tcp/TcpResources.java
static <T extends TcpResources> T create(T previous,
LoopResources loops,
PoolResources pools,
String name,
BiFunction<LoopResources, PoolResources, T> onNew) {
if (previous == null) {
loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
pools = pools == null ? PoolResources.elastic(name) : pools;
}
else {
loops = loops == null ? previous.defaultLoops : loops;
pools = pools == null ? previous.defaultPools : pools;
}
return onNew.apply(loops, pools);
}
SimpleChannelPool
netty-transport-4.1.22.Final-sources.jar! /io/netty/channel/pool/SimpleChannelPool.java
/**
* Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
* a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
*
* This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
*
*/
public class SimpleChannelPool implements ChannelPool {
@Override
public final Future<Channel> acquire() {
return acquire(bootstrap.config().group().next().<Channel>newPromise());
}
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
checkNotNull(promise, "promise");
return acquireHealthyFromPoolOrNew(promise);
}
/**
* Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
* @param promise the promise to provide acquire result.
* @return future for acquiring a channel.
*/
private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
try {
final Channel ch = pollChannel();
if (ch == null) {
// No Channel left in the pool bootstrap a new Channel
Bootstrap bs = bootstrap.clone();
bs.attr(POOL_KEY, this);
ChannelFuture f = connectChannel(bs);
if (f.isDone()) {
notifyConnect(f, promise);
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notifyConnect(future, promise);
}
});
}
return promise;
}
EventLoop loop = ch.eventLoop();
if (loop.inEventLoop()) {
doHealthCheck(ch, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
doHealthCheck(ch, promise);
}
});
}
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
@Override
public final Future<Void> release(Channel channel) {
return release(channel, channel.eventLoop().<Void>newPromise());
}
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
checkNotNull(channel, "channel");
checkNotNull(promise, "promise");
try {
EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
doReleaseChannel(channel, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
doReleaseChannel(channel, promise);
}
});
}
} catch (Throwable cause) {
closeAndFail(channel, cause, promise);
}
return promise;
}
@Override
public void close() {
for (;;) {
Channel channel = pollChannel();
if (channel == null) {
break;
}
channel.close();
}
}
//......
}
This connection pool implementation creates a (
No restrictions
), remove the connection (Connection pooling uses a Deque of LIFO to maintain the Channel
) will check the validity of the connection.
PoolResources.fixed(FixedChannelPool
)
reactor-netty-0.7.5.RELEASE-sources.jar! /reactor/ipc/netty/resources/PoolResources.java
/**
* Default max connection, if -1 will never wait to acquire before opening new
* connection in an unbounded fashion. Fallback to
* available number of processors.
*/
int DEFAULT_POOL_MAX_CONNECTION =
Integer.parseInt(System.getProperty("reactor.ipc.netty.pool.maxConnections",
"" + Math.max(Runtime.getRuntime()
.availableProcessors(), 8) * 2));
/**
* Default acquisition timeout before error. If -1 will never wait to
* acquire before opening new
* connection in an unbounded fashion. Fallback to
* available number of processors.
*/
long DEFAULT_POOL_ACQUIRE_TIMEOUT = Long.parseLong(System.getProperty(
"reactor.ipc.netty.pool.acquireTimeout",
"" + 45000));
/**
* Create a capped {@link PoolResources} to provide automatically for {@link
* ChannelPool}.
* <p>A Fixed {@link PoolResources} will open up to the given max number of
* processors observed by this jvm (minimum 4).
* Further connections will be pending acquisition indefinitely.
*
* @param name the channel pool map name
*
* @return a new {@link PoolResources} to provide automatically for {@link
* ChannelPool}
*/
static PoolResources fixed(String name) {
return fixed(name, DEFAULT_POOL_MAX_CONNECTION);
}
/**
* Create a capped {@link PoolResources} to provide automatically for {@link
* ChannelPool}.
* <p>A Fixed {@link PoolResources} will open up to the given max connection value.
* Further connections will be pending acquisition indefinitely.
*
* @param name the channel pool map name
* @param maxConnections the maximum number of connections before starting pending
* acquisition on existing ones
*
* @return a new {@link PoolResources} to provide automatically for {@link
* ChannelPool}
*/
static PoolResources fixed(String name, int maxConnections) {
return fixed(name, maxConnections, DEFAULT_POOL_ACQUIRE_TIMEOUT);
}
/**
* Create a capped {@link PoolResources} to provide automatically for {@link
* ChannelPool}.
* <p>A Fixed {@link PoolResources} will open up to the given max connection value.
* Further connections will be pending acquisition indefinitely.
*
* @param name the channel pool map name
* @param maxConnections the maximum number of connections before starting pending
* @param acquireTimeout the maximum time in millis to wait for aquiring
*
* @return a new {@link PoolResources} to provide automatically for {@link
* ChannelPool}
*/
static PoolResources fixed(String name, int maxConnections, long acquireTimeout) {
if (maxConnections == -1) {
return elastic(name);
}
if (maxConnections <= 0) {
throw new IllegalArgumentException("Max Connections value must be strictly " + "positive");
}
if (acquireTimeout != -1L && acquireTimeout < 0) {
throw new IllegalArgumentException("Acquire Timeout value must " + "be " + "positive");
}
return new DefaultPoolResources(name,
(bootstrap, handler, checker) -> new FixedChannelPool(bootstrap,
handler,
checker,
FixedChannelPool.AcquireTimeoutAction.FAIL,
acquireTimeout,
maxConnections,
Integer.MAX_VALUE
));
}
The last called fixed method has three parameters, one is name, one is maxConnections, and one is acquireTimeout. As you can see, what was created here is FixedChannelPool.
FixedChannelPool
netty-transport-4.1.22.Final-sources.jar! /io/netty/channel/pool/FixedChannelPool.java
/**
* {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
* number of concurrent connections.
*/
public class FixedChannelPool extends SimpleChannelPool {
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
try {
if (executor.inEventLoop()) {
acquire0(promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
acquire0(promise);
}
});
}
} catch (Throwable cause) {
promise.setFailure(cause);
}
return promise;
}
private void acquire0(final Promise<Channel> promise) {
assert executor.inEventLoop();
if (closed) {
promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
return;
}
if (acquiredChannelCount < maxConnections) {
assert acquiredChannelCount >= 0;
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
// EventLoop
Promise<Channel> p = executor.newPromise();
AcquireListener l = new AcquireListener(promise);
l.acquired();
p.addListener(l);
super.acquire(p);
} else {
if (pendingAcquireCount >= maxPendingAcquires) {
promise.setFailure(FULL_EXCEPTION);
} else {
AcquireTask task = new AcquireTask(promise);
if (pendingAcquireQueue.offer(task)) {
++pendingAcquireCount;
if (timeoutTask != null) {
task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
}
} else {
promise.setFailure(FULL_EXCEPTION);
}
}
assert pendingAcquireCount > 0;
}
}
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
ObjectUtil.checkNotNull(promise, "promise");
final Promise<Void> p = executor.newPromise();
super.release(channel, p.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assert executor.inEventLoop();
if (closed) {
// Since the pool is closed, we have no choice but to close the channel
channel.close();
promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
return;
}
if (future.isSuccess()) {
decrementAndRunTaskQueue();
promise.setSuccess(null);
} else {
Throwable cause = future.cause();
// Check if the exception was not because of we passed the Channel to the wrong pool.
if (!(cause instanceof IllegalArgumentException)) {
decrementAndRunTaskQueue();
}
promise.setFailure(future.cause());
}
}
}));
return promise;
}
@Override
public void close() {
executor.execute(new Runnable() {
@Override
public void run() {
if (!closed) {
closed = true;
for (;;) {
AcquireTask task = pendingAcquireQueue.poll();
if (task == null) {
break;
}
ScheduledFuture<?> f = task.timeoutFuture;
if (f != null) {
f.cancel(false);
}
task.promise.setFailure(new ClosedChannelException());
}
acquiredChannelCount = 0;
pendingAcquireCount = 0;
FixedChannelPool.super.close();
}
}
});
}
//......
}
Acquire here, if the current thread is not in the eventLoop, it is put into the queue to wait for the execution of acquire0. it may explode taskQueue of eventLoop, but the value of queue size is Math.max(16, Getint (“io.netty.eventloop.maxpendingtasks”, Integer.MAX_VALUE “), default is integer.max _ value.
FixedChannelPool inherits SimpleChannelPool and rewrites acquire, release, close methods. It restricts access to connections and has the following parameters:
- maxConnections
This value is first taken from the system variable reactor.ipc.netty.pool.maxconnections (If it is set to -1, it means there is no limit and return to elastic mode.
), if it is not set, take Math.max (runtime.getruntime (). available processors (), 8) * 2, i.e. twice the maximum value of the kernel and 8.
- acquireTimeout
This value is first taken from the system variable reactor.ipc.netty.pool.acquiretimeout (If set to -1, it means immediate execution without waiting.
), 45,000 milliseconds if not set.
- maxPendingAcquires
The setting here is Integer.MAX_VALUE
- AcquireTimeoutAction
This is set to fixedchannelbook.acquiretimeoutaction.fail, i.e. timeoutTask is
timeoutTask = new TimeoutTask() {
@Override
public void onTimeout(AcquireTask task) {
// Fail the promise as we timed out.
task.promise.setFailure(TIMEOUT_EXCEPTION);
}
};
If the current connection exceeds maxConnections, enter pendingacquire to wait for the connection, and before entering pendingacquire, return FULL_EXCEPTION (
Too many outstanding acquire operations
), the setting here is Integer.MAX_VALUE, so there will be no exception. After entering PendingAcquire, there is another acquireTimeout parameter, that is, enter PendingAcquire to wait for acquireTimeout time, and return TIMEOUT_EXCEPTION (Acquire operation took longer then configured maximum time
)。
Summary
The PoolResources created by the default TcpClient uses the elastic mode, that is, the connection pool is implemented as SimpleChannelPool. By default, a LIFO Deque is used to maintain the Channel. If a connection cannot be obtained from the connection pool, a new connection will be created. The upper limit should be the number of file resources set by the system that can be opened. If it exceeds the limit, a SocketException: TOOMANY OPEN FILES will be reported. PoolResources also provides a FixedChannelPool implementation, which uses a fixed mode, i.e. limits the maximum number of connection pool connections and the maximum waiting timeout to avoid excessive connection creation, bursting memory or reporting socket exception: too many openfiles exception.
Note that for fixed mode, if reactor.ipc.netty.pool.maxconnections is set to -1, then fallback to elastic mode.
doc
- Usage Posture of Netty Connection Pool
- Netty connection pool ChannelPool,FixedChannelPool application
- Teach you to use Netty to set up connection pool correctly.
- Create procedure for TcpClient in reactor-netty
- NewHandler procedure for TcpClient in reactor-netty
- Too many files open
- The solution of linux server reporting Too many open files