Talk about checking connection pool of spring-data-redis

  lettuce, redis

Order

This article mainly studies the verification of spring-data-redis connection pool.

lettuce

LettucePoolingConnectionProvider

spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java

class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {
    private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);
    private final LettuceConnectionProvider connectionProvider;
    private final GenericObjectPoolConfig poolConfig;
    private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap(32);
    private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap(32);

    LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) {
        Assert.notNull(connectionProvider, "ConnectionProvider must not be null!");
        Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!");
        this.connectionProvider = connectionProvider;
        this.poolConfig = clientConfiguration.getPoolConfig();
    }

    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
        GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> {
            return ConnectionPoolSupport.createGenericObjectPool(() -> {
                return this.connectionProvider.getConnection(connectionType);
            }, this.poolConfig, false);
        });

        try {
            StatefulConnection<?, ?> connection = (StatefulConnection)pool.borrowObject();
            this.poolRef.put(connection, pool);
            return (StatefulConnection)connectionType.cast(connection);
        } catch (Exception var4) {
            throw new PoolException("Could not get a resource from the pool", var4);
        }
    }

    public AbstractRedisClient getRedisClient() {
        if (this.connectionProvider instanceof RedisClientProvider) {
            return ((RedisClientProvider)this.connectionProvider).getRedisClient();
        } else {
            throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider!", this.connectionProvider.getClass().getName()));
        }
    }

    public void release(StatefulConnection<?, ?> connection) {
        GenericObjectPool<StatefulConnection<?, ?>> pool = (GenericObjectPool)this.poolRef.remove(connection);
        if (pool == null) {
            throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider");
        } else {
            pool.returnObject(connection);
        }
    }

    public void destroy() throws Exception {
        if (!this.poolRef.isEmpty()) {
            log.warn("LettucePoolingConnectionProvider contains unreleased connections");
            this.poolRef.forEach((connection, pool) -> {
                pool.returnObject(connection);
            });
            this.poolRef.clear();
        }

        this.pools.forEach((type, pool) -> {
            pool.close();
        });
        this.pools.clear();
    }
}
  • CreateGenericObjectPool is called here to create a connection pool.

ConnectionPoolSupport.createGenericObjectPool

lettuce-core-5.0.5.RELEASE-sources.jar! /io/lettuce/core/support/ConnectionPoolSupport.java

    public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
            Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) {

        LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
        LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");

        AtomicReference<ObjectPool<T>> poolRef = new AtomicReference<>();

        GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {

            @Override
            public T borrowObject() throws Exception {
                return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject();
            }

            @Override
            public void returnObject(T obj) {

                if (wrapConnections && obj instanceof HasTargetConnection) {
                    super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection());
                    return;
                }
                super.returnObject(obj);
            }
        };

        poolRef.set(pool);

        return pool;
    }
  • RedisPooledObjectFactory is used here

ConnectionPoolSupport.RedisPooledObjectFactory

lettuce-core-5.0.5.RELEASE-sources.jar! /io/lettuce/core/support/ConnectionPoolSupport.java

    private static class RedisPooledObjectFactory<T extends StatefulConnection<?, ?>> extends BasePooledObjectFactory<T> {

        private final Supplier<T> connectionSupplier;

        RedisPooledObjectFactory(Supplier<T> connectionSupplier) {
            this.connectionSupplier = connectionSupplier;
        }

        @Override
        public T create() throws Exception {
            return connectionSupplier.get();
        }

        @Override
        public void destroyObject(PooledObject<T> p) throws Exception {
            p.getObject().close();
        }

        @Override
        public PooledObject<T> wrap(T obj) {
            return new DefaultPooledObject<>(obj);
        }

        @Override
        public boolean validateObject(PooledObject<T> p) {
            return p.getObject().isOpen();
        }
    }
  • BasePooledObjectFactory is inherited here, and methods such as validate are rewritten. validate is judged by isOpen here.

RedisChannelHandler.isOpen

lettuce-core-5.0.5.RELEASE-sources.jar! /io/lettuce/core/RedisChannelHandler.java

public abstract class RedisChannelHandler<K, V> implements Closeable, ConnectionFacade {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class);

    private Duration timeout;
    private CloseEvents closeEvents = new CloseEvents();

    private final RedisChannelWriter channelWriter;
    private final boolean debugEnabled = logger.isDebugEnabled();

    private volatile boolean closed;
    private volatile boolean active = true;
    private volatile ClientOptions clientOptions;

    //......

    /**
     * Notification when the connection becomes active (connected).
     */
    public void activated() {
        active = true;
        closed = false;
    }

    /**
     * Notification when the connection becomes inactive (disconnected).
     */
    public void deactivated() {
        active = false;
    }

    /**
     *
     * @return true if the connection is active and not closed.
     */
    public boolean isOpen() {
        return active;
    }

    @Override
    public synchronized void close() {

        if (debugEnabled) {
            logger.debug("close()");
        }

        if (closed) {
            logger.warn("Connection is already closed");
            return;
        }

        if (!closed) {
            active = false;
            closed = true;
            channelWriter.close();
            closeEvents.fireEventClosed(this);
            closeEvents = new CloseEvents();
        }
    }
}
  • IsOpen is judged by the active field, while active becomes false when deactivated or close, and becomes true when initialized and activated.
  • It can be seen that active cannot detect the timeout caused by docker pause.

LettuceConnectionFactory.SharedConnection.validateConnection

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

        /**
         * Validate the connection. Invalid connections will be closed and the connection state will be reset.
         */
        void validateConnection() {

            synchronized (this.connectionMonitor) {

                boolean valid = false;

                if (connection != null && connection.isOpen()) {
                    try {

                        if (connection instanceof StatefulRedisConnection) {
                            ((StatefulRedisConnection) connection).sync().ping();
                        }

                        if (connection instanceof StatefulRedisClusterConnection) {
                            ((StatefulRedisConnection) connection).sync().ping();
                        }
                        valid = true;
                    } catch (Exception e) {
                        log.debug("Validation failed", e);
                    }
                }

                if (!valid) {

                    if (connection != null) {
                        connectionProvider.release(connection);
                    }

                    log.warn("Validation of shared connection failed. Creating a new connection.");

                    resetConnection();
                    this.connection = getNativeConnection();
                }
            }
        }
  • This is the default method to open shareNativeConnection of LettuceConnectionFactory to get the connection.
  • If LettuceConnectionFactory’s validateConnection is true (The default is false.), it will execute validateConnection on its own every get.

DefaultLettucePool.LettuceFactory

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/lettuce/DefaultLettucePool.java

private static class LettuceFactory extends BasePooledObjectFactory<StatefulConnection<byte[], byte[]>> {
        private final RedisClient client;
        private int dbIndex;

        public LettuceFactory(RedisClient client, int dbIndex) {
            this.client = client;
            this.dbIndex = dbIndex;
        }

        public void activateObject(PooledObject<StatefulConnection<byte[], byte[]>> pooledObject) throws Exception {
            if (pooledObject.getObject() instanceof StatefulRedisConnection) {
                ((StatefulRedisConnection)pooledObject.getObject()).sync().select(this.dbIndex);
            }

        }

        public void destroyObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) throws Exception {
            try {
                ((StatefulConnection)obj.getObject()).close();
            } catch (Exception var3) {
                ;
            }

        }

        public boolean validateObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) {
            try {
                if (obj.getObject() instanceof StatefulRedisConnection) {
                    ((StatefulRedisConnection)obj.getObject()).sync().ping();
                }

                return true;
            } catch (Exception var3) {
                return false;
            }
        }

        public StatefulConnection<byte[], byte[]> create() throws Exception {
            return this.client.connect(LettuceConnection.CODEC);
        }

        public PooledObject<StatefulConnection<byte[], byte[]>> wrap(StatefulConnection<byte[], byte[]> obj) {
            return new DefaultPooledObject(obj);
        }
    }
  • The abandoned DefaultLettucePool Factory has a LettuceFactory in it. Its validate is judged by ping, so it is more accurate.

jedis

JedisConnectionFactory

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java

public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
    //......
    private Pool<Jedis> createPool() {

        if (isRedisSentinelAware()) {
            return createRedisSentinelPool(this.sentinelConfig);
        }
        return createRedisPool();
    }

    /**
     * Creates {@link JedisSentinelPool}.
     *
     * @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}.
     * @return the {@link Pool} to use. Never {@literal null}.
     * @since 1.4
     */
    protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config) {

        GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
        return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
                poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
    }

    /**
     * Creates {@link JedisPool}.
     *
     * @return the {@link Pool} to use. Never {@literal null}.
     * @since 1.4
     */
    protected Pool<Jedis> createRedisPool() {

        return new JedisPool(getPoolConfig(), getHostName(), getPort(), getConnectTimeout(), getReadTimeout(),
                getPassword(), getDatabase(), getClientName(), isUseSsl(),
                clientConfiguration.getSslSocketFactory().orElse(null), //
                clientConfiguration.getSslParameters().orElse(null), //
                clientConfiguration.getHostnameVerifier().orElse(null));
    }
    //......
}
  • Whether it’s JedisPool or JedisSentinelPool, it uses JedisFactory.

JedisFactory.validateObject

jedis-2.9.0-sources.jar! /redis/clients/jedis/JedisFactory.java

class JedisFactory implements PooledObjectFactory<Jedis> {
  private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>();
  private final int connectionTimeout;
  private final int soTimeout;
  private final String password;
  private final int database;
  private final String clientName;
  private final boolean ssl;
  private final SSLSocketFactory sslSocketFactory;
  private SSLParameters sslParameters;
  private HostnameVerifier hostnameVerifier;

  //......

  @Override
  public boolean validateObject(PooledObject<Jedis> pooledJedis) {
    final BinaryJedis jedis = pooledJedis.getObject();
    try {
      HostAndPort hostAndPort = this.hostAndPort.get();

      String connectionHost = jedis.getClient().getHost();
      int connectionPort = jedis.getClient().getPort();

      return hostAndPort.getHost().equals(connectionHost)
          && hostAndPort.getPort() == connectionPort && jedis.isConnected()
          && jedis.ping().equals("PONG");
    } catch (final Exception e) {
      return false;
    }
  }
}
  • JedisFactory implements PooledObjectFactory interface, and its validateObject method not only verifies isConnected, but also verifies ping method.
  • As long as the ping method times out, it will throw an exception, thus the verification fails, thus it can sense the timeout brought by docker pause, thus removing the connection from the connection pool.

Summary

  • Spring-date-redis version 2.0 and above abandoned the original LettucePool and replaced it with Lettuce ePoolingClientConfiguration.
  • There is a problem here, that is, the old version of connection pool check is ping, while the new version of connection pool check is identified by active field, which is not recognized by docker pause.
  • For lettuce, its shareNativeConnection parameter defaults to true and validateConnection is false. After the connection pool borrow is connected for the first time , the underlying connection has been reused and has not been returned. If you want to use connection pool to obtain and return every connection, you need to set shareNativeConnection to false.
  • Jedis connection pool implementation, its validateObject method not only verifies isConnected, but also verifies the ping method, so it can sense the timeout brought by doc pause, thus eliminating the connection from the connection pool.
  • For lettuce, if you want to identify the exception of docker pause, there are two schemes. One is to fix the validateObject method of RedisPooledObjectFactory in ConnectionPoolSupport, which not only determines isOpen, but also needs to ping. The other is not to open the connection pool and set the validateConnection parameter of LettuceConnectionFactory to true.

doc