[case39] Talk about executor of jdk httpclient

  java

Order

This article mainly studies the executor of jdhttpclient.

HttpClientImpl

java.net.http/jdk/internal/net/http/HttpClientImpl.java

    private HttpClientImpl(HttpClientBuilderImpl builder,
                           SingleFacadeFactory facadeFactory) {
        id = CLIENT_IDS.incrementAndGet();
        dbgTag = "HttpClientImpl(" + id +")";
        if (builder.sslContext == null) {
            try {
                sslContext = SSLContext.getDefault();
            } catch (NoSuchAlgorithmException ex) {
                throw new InternalError(ex);
            }
        } else {
            sslContext = builder.sslContext;
        }
        Executor ex = builder.executor;
        if (ex == null) {
            ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
            isDefaultExecutor = true;
        } else {
            isDefaultExecutor = false;
        }
        delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
        facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
        client2 = new Http2ClientImpl(this);
        cookieHandler = builder.cookieHandler;
        connectTimeout = builder.connectTimeout;
        followRedirects = builder.followRedirects == null ?
                Redirect.NEVER : builder.followRedirects;
        this.userProxySelector = Optional.ofNullable(builder.proxy);
        this.proxySelector = userProxySelector
                .orElseGet(HttpClientImpl::getDefaultProxySelector);
        if (debug.on())
            debug.log("proxySelector is %s (user-supplied=%s)",
                      this.proxySelector, userProxySelector.isPresent());
        authenticator = builder.authenticator;
        if (builder.version == null) {
            version = HttpClient.Version.HTTP_2;
        } else {
            version = builder.version;
        }
        if (builder.sslParams == null) {
            sslParams = getDefaultParams(sslContext);
        } else {
            sslParams = builder.sslParams;
        }
        connections = new ConnectionPool(id);
        connections.start();
        timeouts = new TreeSet<>();
        try {
            selmgr = new SelectorManager(this);
        } catch (IOException e) {
            // unlikely
            throw new InternalError(e);
        }
        selmgr.setDaemon(true);
        filters = new FilterFactory();
        initFilters();
        assert facadeRef.get() != null;
    }
  • Here, if the executor of HttpClientBuilderImpl is null, executors.newcachedthreadpool (newdefaultthreadfactory (id)) will be created

HttpClientImpl.sendAsync

java.net.http/jdk/internal/net/http/HttpClientImpl.java

    @Override
    public <T> CompletableFuture<HttpResponse<T>>
    sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
    {
        return sendAsync(userRequest, responseHandler, null);
    }

    @Override
    public <T> CompletableFuture<HttpResponse<T>>
    sendAsync(HttpRequest userRequest,
              BodyHandler<T> responseHandler,
              PushPromiseHandler<T> pushPromiseHandler) {
        return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
    }

    private <T> CompletableFuture<HttpResponse<T>>
    sendAsync(HttpRequest userRequest,
              BodyHandler<T> responseHandler,
              PushPromiseHandler<T> pushPromiseHandler,
              Executor exchangeExecutor)    {

        Objects.requireNonNull(userRequest);
        Objects.requireNonNull(responseHandler);

        AccessControlContext acc = null;
        if (System.getSecurityManager() != null)
            acc = AccessController.getContext();

        // Clone the, possibly untrusted, HttpRequest
        HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);
        if (requestImpl.method().equals("CONNECT"))
            throw new IllegalArgumentException("Unsupported method CONNECT");

        long start = DEBUGELAPSED ? System.nanoTime() : 0;
        reference();
        try {
            if (debugelapsed.on())
                debugelapsed.log("ClientImpl (async) send %s", userRequest);

            // When using sendAsync(...) we explicitly pass the
            // executor's delegate as exchange executor to force
            // asynchronous scheduling of the exchange.
            // When using send(...) we don't specify any executor
            // and default to using the client's delegating executor
            // which only spawns asynchronous tasks if it detects
            // that the current thread is the selector manager
            // thread. This will cause everything to execute inline
            // until we need to schedule some event with the selector.
            Executor executor = exchangeExecutor == null
                    ? this.delegatingExecutor : exchangeExecutor;

            MultiExchange<T> mex = new MultiExchange<>(userRequest,
                                                            requestImpl,
                                                            this,
                                                            responseHandler,
                                                            pushPromiseHandler,
                                                            acc);
            CompletableFuture<HttpResponse<T>> res =
                    mex.responseAsync(executor).whenComplete((b,t) -> unreference());
            if (DEBUGELAPSED) {
                res = res.whenComplete(
                        (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
            }

            // makes sure that any dependent actions happen in the CF default
            // executor. This is only needed for sendAsync(...), when
            // exchangeExecutor is non-null.
            if (exchangeExecutor != null) {
                res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
            }
            return res;
        } catch(Throwable t) {
            unreference();
            debugCompleted("ClientImpl (async)", start, userRequest);
            throw t;
        }
    }
  • If this is sendAsync, the executor parameter passes delegatingExecutor.delegate; . If it is a synchronous send method, the value passed by executor is null
  • A MultiExchange is created here, then mex.responsesaync (executor). when complete ((b, t)-> unreference ()) is called, and executor is used here.

MultiExchange.responseAsync

java.net.http/jdk/internal/net/http/MultiExchange.java

    public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) {
        CompletableFuture<Void> start = new MinimalFuture<>();
        CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
        start.completeAsync( () -> null, executor); // trigger execution
        return cf;
    }

    private CompletableFuture<HttpResponse<T>>
    responseAsync0(CompletableFuture<Void> start) {
        return start.thenCompose( v -> responseAsyncImpl())
                    .thenCompose((Response r) -> {
                        Exchange<T> exch = getExchange();
                        return exch.readBodyAsync(responseHandler)
                            .thenApply((T body) -> {
                                this.response =
                                    new HttpResponseImpl<>(r.request(), r, this.response, body, exch);
                                return this.response;
                            });
                    });
    }
  • You can see that the completeAsync method of CompletableFuture (Note that this method is unique to java9), executor is also used here
  • Since executor is created by default using Executors.newCachedThreadPool, attention should be paid to controlling the number of concurrent tasks and the execution time of tasks to prevent unlimited growth of threads and excessive consumption of system resources.
    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     *
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

RejectedExecutionException

  • Instance code
    @Test
    public void testAsyncPool(){
        ThreadPoolExecutor executor = ThreadPoolBuilder.fixedPool()
                .setPoolSize(2)
                .setQueueSize(5)
                .setThreadNamePrefix("test-")
                .build();

        List<CompletableFuture<String>> futureList = IntStream.rangeClosed(1,100)
                .mapToObj(i -> new CompletableFuture<String>())
                .collect(Collectors.toList());
        futureList.stream()
                .forEach(future -> {
                    future.completeAsync(() -> {
                        try {
                            TimeUnit.SECONDS.sleep(3);
                        } catch (InterruptedException e1) {
                            e1.printStackTrace();
                        }
                        return "message";
                    },executor);
                });
        CompletableFuture.allOf(futureList
                .toArray(new CompletableFuture<?>[futureList.size()]))
                .join();
    }

Here, a fixedPool is created, specifying queueSize as 5.

  • Log output
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@76b10754 rejected from java.util.concurrent.ThreadPoolExecutor@2bea5ab4[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]

    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at java.base/java.util.concurrent.CompletableFuture.completeAsync(CompletableFuture.java:2591)

It can be seen that the thread pool queue size plays a limiting role.

Summary

The executor of jdk httpclient is used when performing asynchronous operations. By default, the executor is created by using Executors.newCachedThreadPool, and its thread pool size is Integer.MAX_VALUE. Therefore, when using it, it should be noted that it is better to change to a bounded queue, and then add the monitoring of thread pool.

doc