Create procedure for TcpClient in reactor-netty

  reactor

Order

This article mainly studies the process of creating TcpClient in reactor-netty.

maven

        <dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>0.7.3.RELEASE</version>
        </dependency>

TcpClient

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/tcp/TcpClient.java

    protected TcpClient(TcpClient.Builder builder) {
        ClientOptions.Builder<?> clientOptionsBuilder = ClientOptions.builder();
        if (Objects.nonNull(builder.options)) {
            builder.options.accept(clientOptionsBuilder);
        }
        if (!clientOptionsBuilder.isLoopAvailable()) {
            clientOptionsBuilder.loopResources(TcpResources.get());
        }
        if (!clientOptionsBuilder.isPoolAvailable() && !clientOptionsBuilder.isPoolDisabled()) {
            clientOptionsBuilder.poolResources(TcpResources.get());
        }
        this.options = clientOptionsBuilder.build();
    }

LoopResources and poolResources were actually created through TcpResources.
After the above loopResources are created, the following poolResources actually return directly

clientOptionsBuilder.isLoopAvailable()

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/options/NettyOptions.java

        public final boolean isLoopAvailable() {
            return this.loopResources != null;
        }

First null, so call TcpResources.get () to create

TcpResources.get()

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/tcp/TcpResources.java

    /**
     * Return the global HTTP resources for event loops and pooling
     *
     * @return the global HTTP resources for event loops and pooling
     */
    public static TcpResources get() {
        return getOrCreate(tcpResources, null, null, ON_TCP_NEW,  "tcp");
    }
    /**
     * Safely check if existing resource exist and proceed to update/cleanup if new
     * resources references are passed.
     *
     * @param ref the resources atomic reference
     * @param loops the eventual new {@link LoopResources}
     * @param pools the eventual new {@link PoolResources}
     * @param onNew a {@link TcpResources} factory
     * @param name a name for resources
     * @param <T> the reified type of {@link TcpResources}
     *
     * @return an existing or new {@link TcpResources}
     */
    protected static <T extends TcpResources> T getOrCreate(AtomicReference<T> ref,
            LoopResources loops,
            PoolResources pools,
            BiFunction<LoopResources, PoolResources, T> onNew,
            String name) {
        T update;
        for (; ; ) {
            T resources = ref.get();
            if (resources == null || loops != null || pools != null) {
                update = create(resources, loops, pools, name, onNew);
                if (ref.compareAndSet(resources, update)) {
                    if(resources != null){
                        if(loops != null){
                            resources.defaultLoops.dispose();
                        }
                        if(pools != null){
                            resources.defaultPools.dispose();
                        }
                    }
                    return update;
                }
                else {
                    update._dispose();
                }
            }
            else {
                return resources;
            }
        }
    }

Enter create here, create loops and pools.

    static final AtomicReference<TcpResources>                          tcpResources;
    static final BiFunction<LoopResources, PoolResources, TcpResources> ON_TCP_NEW;

    static {
        ON_TCP_NEW = TcpResources::new;
        tcpResources  = new AtomicReference<>();
    }

    final PoolResources defaultPools;
    final LoopResources defaultLoops;

    protected TcpResources(LoopResources defaultLoops, PoolResources defaultPools) {
        this.defaultLoops = defaultLoops;
        this.defaultPools = defaultPools;
    }

    static <T extends TcpResources> T create(T previous,
            LoopResources loops,
            PoolResources pools,
            String name,
            BiFunction<LoopResources, PoolResources, T> onNew) {
        if (previous == null) {
            loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
            pools = pools == null ? PoolResources.elastic(name) : pools;
        }
        else {
            loops = loops == null ? previous.defaultLoops : loops;
            pools = pools == null ? previous.defaultPools : pools;
        }
        return onNew.apply(loops, pools);
    }

OnNew here is to create TcpResources, and the constructor used is tcpresources (loopresources defaultloops, poolresources defaultpools)

LoopResources.create

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/resources/LoopResources.java

    /**
     * Default worker thread count, fallback to available processor
     */
    int DEFAULT_IO_WORKER_COUNT = Integer.parseInt(System.getProperty(
            "reactor.ipc.netty.workerCount",
            "" + Math.max(Runtime.getRuntime()
                        .availableProcessors(), 4)));
    /**
     * Default selector thread count, fallback to -1 (no selector thread)
     */
    int DEFAULT_IO_SELECT_COUNT = Integer.parseInt(System.getProperty(
            "reactor.ipc.netty.selectCount",
            "" + -1));
    /**
     * Create a simple {@link LoopResources} to provide automatically for {@link
     * EventLoopGroup} and {@link Channel} factories
     *
     * @param prefix the event loop thread name prefix
     *
     * @return a new {@link LoopResources} to provide automatically for {@link
     * EventLoopGroup} and {@link Channel} factories
     */
    static LoopResources create(String prefix) {
        return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT,
                DEFAULT_IO_WORKER_COUNT,
                true);
    }

There are two parameters here, one is worker thread count and the other is selector thread count.

  • DEFAULT_IO_WORKER_COUNT

This value is used if the environment variable has reactor.ipc.netty.workerCount set; If there is no setting, take Math.max (runtime.getruntime (). availableprocessors (), 4)))

  • DEFAULT_IO_SELECT_COUNT

If the environment variable has reactor.ipc.netty.selectCount set, this value is used; If there is no setting, take -1, which means there is no selector thread.

DefaultLoopResources

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/resources/DefaultLoopResources.java

    DefaultLoopResources(String prefix,
            int selectCount,
            int workerCount,
            boolean daemon) {
        this.running = new AtomicBoolean(true);
        this.daemon = daemon;
        this.workerCount = workerCount;
        this.prefix = prefix;

        this.serverLoops = new NioEventLoopGroup(workerCount,
                threadFactory(this, "nio"));

        this.clientLoops = LoopResources.colocate(serverLoops);

        this.cacheNativeClientLoops = new AtomicReference<>();
        this.cacheNativeServerLoops = new AtomicReference<>();

        if (selectCount == -1) {
            this.selectCount = workerCount;
            this.serverSelectLoops = this.serverLoops;
            this.cacheNativeSelectLoops = this.cacheNativeServerLoops;
        }
        else {
            this.selectCount = selectCount;
            this.serverSelectLoops =
                    new NioEventLoopGroup(selectCount, threadFactory(this, "select-nio"));
            this.cacheNativeSelectLoops = new AtomicReference<>();
        }
    }

Here prefix is reactor-tcp, selectCount is -1, workerCount is 4, daemon is true
You can see that NioEventLoopGroup is created here, and workerCount is 4; ServerSelectLoops was not created separately because selectCount=-1

NioEventLoopGroup

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/nio/NioEventLoopGroup.java

    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
        final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

Note that the rejectHandler here is rejectedexecutionhandlers.reject ()

netty-common-4.1.20.Final-sources.jar! /io/netty/util/concurrent/MultithreadEventExecutorGroup.java

    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
    }

New NioEventLoopGroup called MultithreadEventExecutorGroup
The threadFactory here is reactor.ipc.netty.resources.defaultloopresources $ eventloopselectorfactory
Executor here is ThreadPerTaskExecutor.

netty-common-4.1.20.Final-sources.jar! /io/netty/util/concurrent/ThreadPerTaskExecutor.java

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

MultithreadEventExecutorGroup

    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

Note that the for loop is used here to go to newChild.

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/nio/NioEventLoopGroup.java

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

Every child is a NioEventLoop

NioEventLoop

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/nio/NioEventLoop.java

    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

Note the DEFAULT_MAX_PENDING_TASKS parameter here, which specifies the size of the queue.
If io.netty.eventloop.maxpendingtasks is specified, it takes the maximum value of 16; Max _ value if not specified
It is not specified here; the default is Integer.MAX_VALUE

NioEventLoop extends SingleThreadEventLoop

netty-transport-4.1.20.Final-sources.jar! /io/netty/channel/SingleThreadEventLoop.java

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        tailTasks = newTaskQueue(maxPendingTasks);
    }

The parent here is NioEventLoopGroup.
Executor here is ThreadPerTaskExecutor.
The rejectHandler here is rejectedexecutionhandlers.reject ()

SingleThreadEventLoop extends SingleThreadEventExecutor

    /**
     * Create a new instance
     *
     * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
     * @param executor          the {@link Executor} which will be used for executing
     * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
     *                          executor thread
     * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
     * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
     */
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

    /**
     * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
     * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
     * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
     * implementation that does not support blocking operations at all.
     */
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
    }    

MaxPendingTasks here is Integer.MAX_VALUE, and the created taskQueue is integer.max _ value.
AddTaskWakesUp here is false

PoolResources.elastic(name)

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/resources/PoolResources.java

    /**
     * Create an uncapped {@link PoolResources} to provide automatically for {@link
     * ChannelPool}.
     * <p>An elastic {@link PoolResources} will never wait before opening a new
     * connection. The reuse window is limited but it cannot starve an undetermined volume
     * of clients using it.
     *
     * @param name the channel pool map name
     *
     * @return a new {@link PoolResources} to provide automatically for {@link
     * ChannelPool}
     */
    static PoolResources elastic(String name) {
        return new DefaultPoolResources(name, SimpleChannelPool::new);
    }

DefaultPoolResources

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/resources/DefaultPoolResources.java

    final ConcurrentMap<SocketAddress, Pool> channelPools;
    final String                             name;
    final PoolFactory                        provider;

    DefaultPoolResources(String name, PoolFactory provider) {
        this.name = name;
        this.provider = provider;
        this.channelPools = PlatformDependent.newConcurrentHashMap();
    }

Create map for channelPools, key is SocketAddress, value is Pool

Summary

TcpClient’s create method mainly creates TcpResources, while TcpResources creates loopResources and poolResources.

loopResources

This loopResources is mainly used group create NioEventLoopGroup and the workerCount NioEventLoop (There are two parameters involved here, one is worker thread count and the other is selector thread count.)

  • DEFAULT_IO_WORKER_COUNT: this value is used if the environment variable has reactor.ipc.netty.workerCount set; If there is no setting, take Math.max (runtime.getruntime (). availableprocessors (), 4)))
  • DEFAULT_IO_SELECT_COUNT: this value is used if the environment variable has reactor.ipc.netty.selectCount set; If there is no setting, take -1, which means there is no selector thread.
  • DEFAULT_MAX_PENDING_TASKS: specifies the size of taskQueue for NioEventLoop, math.max (16, systempropertytil.getint (“io.nety.eventloop.maxpendingtasks”, integer.max _ value))
  • NioEventLoop inherits SingleThreadEventLoop, while SingleThreadEventLoop inherits SingleThreadEventexecutor, and its agent’s Executor is ThreadPerTaskExecutor. RejectHandler is rejectedexecutionhandlers.reject (), the default taskQueue is LinkedBlockingQueue, which is Integer.MAX_VALUE

poolResources

This is mainly to create channelPools, the type of which is concurrent map < socketaddress, pool >