Talk about jdhttpclient’s ConnectionPool

  java

Order

This article mainly studies the ConnectionPool of jdhttpclient.

HttpConnection

HttpConnection.getConnection

java.net.http/jdk/internal/net/http/HttpConnection.java

    /**
     * Factory for retrieving HttpConnections. A connection can be retrieved
     * from the connection pool, or a new one created if none available.
     *
     * The given {@code addr} is the ultimate destination. Any proxies,
     * etc, are determined from the request. Returns a concrete instance which
     * is one of the following:
     *      {@link PlainHttpConnection}
     *      {@link PlainTunnelingConnection}
     *
     * The returned connection, if not from the connection pool, must have its,
     * connect() or connectAsync() method invoked, which ( when it completes
     * successfully ) renders the connection usable for requests.
     */
    public static HttpConnection getConnection(InetSocketAddress addr,
                                               HttpClientImpl client,
                                               HttpRequestImpl request,
                                               Version version) {
        // The default proxy selector may select a proxy whose  address is
        // unresolved. We must resolve the address before connecting to it.
        InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
        HttpConnection c = null;
        boolean secure = request.secure();
        ConnectionPool pool = client.connectionPool();

        if (!secure) {
            c = pool.getConnection(false, addr, proxy);
            if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
                final HttpConnection conn = c;
                if (DEBUG_LOGGER.on())
                    DEBUG_LOGGER.log(conn.getConnectionFlow()
                                     + ": plain connection retrieved from HTTP/1.1 pool");
                return c;
            } else {
                return getPlainConnection(addr, proxy, request, client);
            }
        } else {  // secure
            if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
                c = pool.getConnection(true, addr, proxy);
            }
            if (c != null && c.isOpen()) {
                final HttpConnection conn = c;
                if (DEBUG_LOGGER.on())
                    DEBUG_LOGGER.log(conn.getConnectionFlow()
                                     + ": SSL connection retrieved from HTTP/1.1 pool");
                return c;
            } else {
                String[] alpn = null;
                if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
                    alpn = new String[] { "h2", "http/1.1" };
                }
                return getSSLConnection(addr, proxy, alpn, request, client);
            }
        }
    }
  • Here is not https, https1.1, go pool.getconnection (true, addr, proxy)

HttpConnection.closeOrReturnToCache

java.net.http/jdk/internal/net/http/HttpConnection.java

    void closeOrReturnToCache(HttpHeaders hdrs) {
        if (hdrs == null) {
            // the connection was closed by server, eof
            close();
            return;
        }
        if (!isOpen()) {
            return;
        }
        HttpClientImpl client = client();
        if (client == null) {
            close();
            return;
        }
        ConnectionPool pool = client.connectionPool();
        boolean keepAlive = hdrs.firstValue("Connection")
                .map((s) -> !s.equalsIgnoreCase("close"))
                .orElse(true);

        if (keepAlive) {
            Log.logTrace("Returning connection to the pool: {0}", this);
            pool.returnToPool(this);
        } else {
            close();
        }
    }
  • Call pool.returnToPool(this) to return the connection.

ConnectionPool

java.net.http/jdk/internal/net/http/ConnectionPool.java

/**
 * Http 1.1 connection pool.
 */
final class ConnectionPool {

    static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
            "jdk.httpclient.keepalive.timeout", 1200); // seconds
    static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty(
            "jdk.httpclient.connectionPoolSize", 0); // unbounded
    final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);

    // Pools of idle connections

    private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
    private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
    private final ExpiryList expiryList;
    private final String dbgTag; // used for debug
    boolean stopped;

    //......
    /**
     * Entries in connection pool are keyed by destination address and/or
     * proxy address:
     * case 1: plain TCP not via proxy (destination only)
     * case 2: plain TCP via proxy (proxy only)
     * case 3: SSL not via proxy (destination only)
     * case 4: SSL over tunnel (destination and proxy)
     */
    static class CacheKey {
        final InetSocketAddress proxy;
        final InetSocketAddress destination;

        CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
            this.proxy = proxy;
            this.destination = destination;
        }

        @Override
        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            final CacheKey other = (CacheKey) obj;
            if (!Objects.equals(this.proxy, other.proxy)) {
                return false;
            }
            if (!Objects.equals(this.destination, other.destination)) {
                return false;
            }
            return true;
        }

        @Override
        public int hashCode() {
            return Objects.hash(proxy, destination);
        }
    }

    synchronized HttpConnection getConnection(boolean secure,
                                              InetSocketAddress addr,
                                              InetSocketAddress proxy) {
        if (stopped) return null;
        CacheKey key = new CacheKey(addr, proxy);
        HttpConnection c = secure ? findConnection(key, sslPool)
                                  : findConnection(key, plainPool);
        //System.out.println ("getConnection returning: " + c);
        return c;
    }

    private HttpConnection
    findConnection(CacheKey key,
                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
        LinkedList<HttpConnection> l = pool.get(key);
        if (l == null || l.isEmpty()) {
            return null;
        } else {
            HttpConnection c = l.removeFirst();
            expiryList.remove(c);
            return c;
        }
    }
    /**
     * Returns the connection to the pool.
     */
    void returnToPool(HttpConnection conn) {
        returnToPool(conn, Instant.now(), KEEP_ALIVE);
    }

    // Called also by whitebox tests
    void returnToPool(HttpConnection conn, Instant now, long keepAlive) {

        // Don't call registerCleanupTrigger while holding a lock,
        // but register it before the connection is added to the pool,
        // since we don't want to trigger the cleanup if the connection
        // is not in the pool.
        CleanupTrigger cleanup = registerCleanupTrigger(conn);

        // it's possible that cleanup may have been called.
        HttpConnection toClose = null;
        synchronized(this) {
            if (cleanup.isDone()) {
                return;
            } else if (stopped) {
                conn.close();
                return;
            }
            if (MAX_POOL_SIZE > 0 && expiryList.size() >= MAX_POOL_SIZE) {
                toClose = expiryList.removeOldest();
                if (toClose != null) removeFromPool(toClose);
            }
            if (conn instanceof PlainHttpConnection) {
                putConnection(conn, plainPool);
            } else {
                assert conn.isSecure();
                putConnection(conn, sslPool);
            }
            expiryList.add(conn, now, keepAlive);
        }
        if (toClose != null) {
            if (debug.on()) {
                debug.log("Maximum pool size reached: removing oldest connection %s",
                          toClose.dbgString());
            }
            close(toClose);
        }
        //System.out.println("Return to pool: " + conn);
    }

    private void removeFromPool(HttpConnection c) {
        assert Thread.holdsLock(this);
        if (c instanceof PlainHttpConnection) {
            removeFromPool(c, plainPool);
        } else {
            assert c.isSecure();
            removeFromPool(c, sslPool);
        }
    }

    private boolean
    removeFromPool(HttpConnection c,
                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
        //System.out.println("cacheCleaner removing: " + c);
        assert Thread.holdsLock(this);
        CacheKey k = c.cacheKey();
        List<HttpConnection> l = pool.get(k);
        if (l == null || l.isEmpty()) {
            pool.remove(k);
            return false;
        }
        return l.remove(c);
    }

    private void
    putConnection(HttpConnection c,
                  HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
        CacheKey key = c.cacheKey();
        LinkedList<HttpConnection> l = pool.get(key);
        if (l == null) {
            l = new LinkedList<>();
            pool.put(key, l);
        }
        l.add(c);
    }

    void stop() {
        List<HttpConnection> closelist = Collections.emptyList();
        try {
            synchronized (this) {
                stopped = true;
                closelist = expiryList.stream()
                    .map(e -> e.connection)
                    .collect(Collectors.toList());
                expiryList.clear();
                plainPool.clear();
                sslPool.clear();
            }
        } finally {
            closelist.forEach(this::close);
        }
    }
}
  • Borrow a connection to call the getConnection method, and finally call the findConnection method to remove the first one from the LinkedList<HttpConnection > > and then remove the connection from the expiryList.
  • Return connection calls the returnToPool method. if the current expiryList exceeds MAX_POOL_SIZE, remove the oldest one, then remove it from ExpiryList, hashmap < cachekey, linkedlist < http connection > > and close it. Then call putConnection to add the connection to HashMap < cachekey, linkedlist < http connection > > and finally add the connection to expiryList.
  • It can be seen that ConnectionPool maintains two important properties of HashMap < CacheKey, LinkedList < HTTP Connection > > and ExpiryList, which are removed from these two places when borrowing and added to these two places when returning. The difference is that if MAX_POOL_SIZE is great er than 0 when returning, the size of the expiryList will be judged. If the maximum value is exceeded, the oldest connection will be removed and removed from the two places.
  • MAX_POOL_SIZE reads jdk.http client.connectionpoolsize, but the default value is 0, which means infinite.
  • ConnectionPool has a stop method that is called when HttpClient stops (Triggered when SelectorManager thread exits.), the stop method clears the connection pool and closes the connection

ExpiryList

java.net.http/jdk/internal/net/http/ConnectionPool.java

    /**
     * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
     * deadline is at the tail of the list, and the entry with the farther
     * deadline is at the head. In the most common situation, new elements
     * will need to be added at the head (or close to it), and expired elements
     * will need to be purged from the tail.
     */
    private static final class ExpiryList {
        private final LinkedList<ExpiryEntry> list = new LinkedList<>();
        private volatile boolean mayContainEntries;

        int size() { return list.size(); }

        // A loosely accurate boolean whose value is computed
        // at the end of each operation performed on ExpiryList;
        // Does not require synchronizing on the ConnectionPool.
        boolean purgeMaybeRequired() {
            return mayContainEntries;
        }

        // Returns the next expiry deadline
        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        Optional<Instant> nextExpiryDeadline() {
            if (list.isEmpty()) return Optional.empty();
            else return Optional.of(list.getLast().expiry);
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        HttpConnection removeOldest() {
            ExpiryEntry entry = list.pollLast();
            return entry == null ? null : entry.connection;
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        void add(HttpConnection conn) {
            add(conn, Instant.now(), KEEP_ALIVE);
        }

        // Used by whitebox test.
        void add(HttpConnection conn, Instant now, long keepAlive) {
            Instant then = now.truncatedTo(ChronoUnit.SECONDS)
                    .plus(keepAlive, ChronoUnit.SECONDS);

            // Elements with the farther deadline are at the head of
            // the list. It's more likely that the new element will
            // have the farthest deadline, and will need to be inserted
            // at the head of the list, so we're using an ascending
            // list iterator to find the right insertion point.
            ListIterator<ExpiryEntry> li = list.listIterator();
            while (li.hasNext()) {
                ExpiryEntry entry = li.next();

                if (then.isAfter(entry.expiry)) {
                    li.previous();
                    // insert here
                    li.add(new ExpiryEntry(conn, then));
                    mayContainEntries = true;
                    return;
                }
            }
            // last (or first) element of list (the last element is
            // the first when the list is empty)
            list.add(new ExpiryEntry(conn, then));
            mayContainEntries = true;
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        void remove(HttpConnection c) {
            if (c == null || list.isEmpty()) return;
            ListIterator<ExpiryEntry> li = list.listIterator();
            while (li.hasNext()) {
                ExpiryEntry e = li.next();
                if (e.connection.equals(c)) {
                    li.remove();
                    mayContainEntries = !list.isEmpty();
                    return;
                }
            }
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool.
        // Purge all elements whose deadline is before now (now included).
        List<HttpConnection> purgeUntil(Instant now) {
            if (list.isEmpty()) return Collections.emptyList();

            List<HttpConnection> closelist = new ArrayList<>();

            // elements with the closest deadlines are at the tail
            // of the queue, so we're going to use a descending iterator
            // to remove them, and stop when we find the first element
            // that has not expired yet.
            Iterator<ExpiryEntry> li = list.descendingIterator();
            while (li.hasNext()) {
                ExpiryEntry entry = li.next();
                // use !isAfter instead of isBefore in order to
                // remove the entry if its expiry == now
                if (!entry.expiry.isAfter(now)) {
                    li.remove();
                    HttpConnection c = entry.connection;
                    closelist.add(c);
                } else break; // the list is sorted
            }
            mayContainEntries = !list.isEmpty();
            return closelist;
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        java.util.stream.Stream<ExpiryEntry> stream() {
            return list.stream();
        }

        // should only be called while holding a synchronization
        // lock on the ConnectionPool
        void clear() {
            list.clear();
            mayContainEntries = false;
        }
    }

    static final class ExpiryEntry {
        final HttpConnection connection;
        final Instant expiry; // absolute time in seconds of expiry time
        ExpiryEntry(HttpConnection connection, Instant expiry) {
            this.connection = connection;
            this.expiry = expiry;
        }
    }
  • LinkedList<ExpiryEntry > is used inside ExpiryList, and ExpiryEntry is used to wrap connection.
  • Besides HttpConnection, expiry time is also maintained in ExpiryEntry, which indicates the expiration time of the connection.
  • The addition of ExpiryList calculates the expiry time according to the number of seconds of the current time +KEEP_ALIVE parameter. KEEP_ALIVE reads JD K.HTTP Client.KEEPALIVE.TIMEOUT, and the default is 1200 seconds if it cannot be read. After that, it is inserted into the LinkedList<ExpiryEntry > according to the expiry time, with the long expiry time at the head of the list and the fast expiry at the tail of the list
  • There are two types of removal operations on the expiratory list. one is to remove the oldest one, which is completed through the pollLast operation, and the other is to remove the specified connection, i.e. use ListIterator to traverse linked list < expiratory entry > for matching and removal.
  • The mayContainEntries variable is maintained here. it is updated when LinkedList<ExpiryEntry > is operated. it is used to return whether ExpiryList has connection or not, avoiding calling ConnectionPool to calculate synchronously when necessary.

ConnectionPool.purgeExpiredConnectionsAndReturnNextDeadline

java.net.http/jdk/internal/net/http/ConnectionPool.java

   /**
     * Purge expired connection and return the number of milliseconds
     * in which the next connection is scheduled to expire.
     * If no connections are scheduled to be purged return 0.
     * @return the delay in milliseconds in which the next connection will
     *         expire.
     */
    long purgeExpiredConnectionsAndReturnNextDeadline() {
        if (!expiryList.purgeMaybeRequired()) return 0;
        return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
    }

    // Used for whitebox testing
    long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
        long nextPurge = 0;

        // We may be in the process of adding new elements
        // to the expiry list - but those elements will not
        // have outlast their keep alive timer yet since we're
        // just adding them.
        if (!expiryList.purgeMaybeRequired()) return nextPurge;

        List<HttpConnection> closelist;
        synchronized (this) {
            closelist = expiryList.purgeUntil(now);
            for (HttpConnection c : closelist) {
                if (c instanceof PlainHttpConnection) {
                    boolean wasPresent = removeFromPool(c, plainPool);
                    assert wasPresent;
                } else {
                    boolean wasPresent = removeFromPool(c, sslPool);
                    assert wasPresent;
                }
            }
            nextPurge = now.until(
                    expiryList.nextExpiryDeadline().orElse(now),
                    ChronoUnit.MILLIS);
        }
        closelist.forEach(this::close);
        return nextPurge;
    }
  • Since the connection of the ExpiryList has an expiration time, there is a step to clean up the expired connection. This step is completed by purgeExpired ConnectionSandreturnedDeadline.
  • The purgeexpiredconnectionsandreturnxtdeadline method is called by SelectorManager to calculate the timeout time of selector.select
  • The method first calls expiryList.purgeMaybeRequired () to access mayContainEntries to see if expiryList is connected. If there is no connection, it directly returns 0. After that, call expiryList.purgeUntil(now) to remove and obtain the currently expired connections, then remove and calculate nextPurge from hashmap < cachekey, linkedlist < http connection > > one by one, and finally close the removed connections one by one

CleanupTrigger

java.net.http/jdk/internal/net/http/ConnectionPool.java


    private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
        // Connect the connection flow to a pub/sub pair that will take the
        // connection out of the pool and close it if anything happens
        // while the connection is sitting in the pool.
        CleanupTrigger cleanup = new CleanupTrigger(conn);
        FlowTube flow = conn.getConnectionFlow();
        if (debug.on()) debug.log("registering %s", cleanup);
        flow.connectFlows(cleanup, cleanup);
        return cleanup;
    }

    void cleanup(HttpConnection c, Throwable error) {
        if (debug.on())
            debug.log("%s : ConnectionPool.cleanup(%s)",
                    String.valueOf(c.getConnectionFlow()), error);
        synchronized(this) {
            removeFromPool(c);
            expiryList.remove(c);
        }
        c.close();
    }

    /**
     * An object that subscribes to the flow while the connection is in
     * the pool. Anything that comes in will cause the connection to be closed
     * and removed from the pool.
     */
    private final class CleanupTrigger implements
            FlowTube.TubeSubscriber, FlowTube.TubePublisher,
            Flow.Subscription {

        private final HttpConnection connection;
        private volatile boolean done;

        public CleanupTrigger(HttpConnection connection) {
            this.connection = connection;
        }

        public boolean isDone() { return done;}

        private void triggerCleanup(Throwable error) {
            done = true;
            cleanup(connection, error);
        }

        @Override public void request(long n) {}
        @Override public void cancel() {}

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            subscription.request(1);
        }
        @Override
        public void onError(Throwable error) { triggerCleanup(error); }
        @Override
        public void onComplete() { triggerCleanup(null); }
        @Override
        public void onNext(List<ByteBuffer> item) {
            triggerCleanup(new IOException("Data received while in pool"));
        }

        @Override
        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
            subscriber.onSubscribe(this);
        }

        @Override
        public String toString() {
            return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
        }
    }
  • When calling returnToPool, it will call registerCleanupTrigger, create a CleanupTrigger, then call conn.getConnectionFlow () to get the flow, and then call flow.connectionflow (clean up)
  • CleanupTrigger is both FlowTube.TubeSubscriber and FlowTube.TubePublisher, calling clean up method in onComplete and onError methods to remove connections from hashmap < cachekey, linkedlist < http connection > > and expiryList.
  • The function of this CleanupTrigger may be similar to an active connection health check. When the underlying connection is abnormally closed, it will notify the connection pool to trigger the cleaning of these dirty connections.

Summary

  • Jdk httpclient’s ConnectionPool is simpler than apache common pools, with several parameters (Actual effect on ExpiryList):

MAX_POOL_SIZE(jdk.httpclient.connectionPoolSize), the default is 0, which means infinite.
KEEP_ALIVE(jdk.httpclient.keepalive.timeout), the default is 1200 seconds

  • ConnectionPool maintains two attributes at the same time: HashMap < CacheKey, linkedlist < http connection > > and expiryList. the former uses the target ip address and proxy address as cachekey, and each address maintains a connection pool. The latter, regardless of cacheKey, wraps each connection in the connection pool and records the expiration time according to KEEP_ALIVE.
  • SelectorManager calls purgeexpiredconnectionsandreturnxtdeadline to calculate the timeout time of the select. this method cleans up (Remove and close) expired connections
  • In addition to the SelectorManager cleaning up expired connections, connection indirectly triggers CleanupTrigger through FlowTube to clean up closed or abnormal connections

doc