[case37] Talk about lettuce’s shareNativeConnection Parameter

  lettuce, redis

Order

This article mainly studies lettuce’s shareNativeConnection parameter.

LettuceConnectionFactory

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

public class LettuceConnectionFactory
        implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {

    private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
            LettuceConverters.exceptionConverter());

    private final Log log = LogFactory.getLog(getClass());
    private final LettuceClientConfiguration clientConfiguration;

    private @Nullable AbstractRedisClient client;
    private @Nullable LettuceConnectionProvider connectionProvider;
    private @Nullable LettuceConnectionProvider reactiveConnectionProvider;
    private boolean validateConnection = false;
    private boolean shareNativeConnection = true;
    private @Nullable SharedConnection<byte[]> connection;
    private @Nullable SharedConnection<ByteBuffer> reactiveConnection;
    private @Nullable LettucePool pool;
    /** Synchronization monitor for the shared Connection */
    private final Object connectionMonitor = new Object();
    private boolean convertPipelineAndTxResults = true;
    private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379);
    private @Nullable RedisSentinelConfiguration sentinelConfiguration;
    private @Nullable RedisClusterConfiguration clusterConfiguration;
    private @Nullable ClusterCommandExecutor clusterCommandExecutor;

    //......

    @Override
    public LettuceReactiveRedisConnection getReactiveConnection() {

        return getShareNativeConnection()
                ? new LettuceReactiveRedisConnection(getSharedReactiveConnection(), reactiveConnectionProvider)
                : new LettuceReactiveRedisConnection(reactiveConnectionProvider);
    }

    @Override
    public LettuceReactiveRedisClusterConnection getReactiveClusterConnection() {

        if (!isClusterAware()) {
            throw new InvalidDataAccessApiUsageException("Cluster is not configured!");
        }

        RedisClusterClient client = (RedisClusterClient) this.client;

        return getShareNativeConnection()
                ? new LettuceReactiveRedisClusterConnection(getSharedReactiveConnection(), reactiveConnectionProvider, client)
                : new LettuceReactiveRedisClusterConnection(reactiveConnectionProvider, client);
    }

    /**
     * Indicates if multiple {@link LettuceConnection}s should share a single native connection.
     *
     * @return native connection shared.
     */
    public boolean getShareNativeConnection() {
        return shareNativeConnection;
    }

    /**
     * @return the shared connection using {@link ByteBuffer} encoding for reactive API use. {@literal null} if
     *         {@link #getShareNativeConnection() connection sharing} is disabled.
     * @since 2.0.1
     */
    @Nullable
    protected StatefulConnection<ByteBuffer, ByteBuffer> getSharedReactiveConnection() {
        return shareNativeConnection ? getOrCreateSharedReactiveConnection().getConnection() : null;
    }

    private SharedConnection<ByteBuffer> getOrCreateSharedReactiveConnection() {

        synchronized (this.connectionMonitor) {

            if (this.reactiveConnection == null) {
                this.reactiveConnection = new SharedConnection<>(reactiveConnectionProvider, true);
            }

            return this.reactiveConnection;
        }
    }
}
  • You can see that shareNativeConnection here defaults to true, which means that multiple LettuceConnection will share a native connection.
  • If the value is true, the getReactiveConnection and getReactiveClusterConnection methods use getSharedReactiveConnection
  • GetSharedReactiveConnection calls getorcreatesharedconnection (). getconnection () when shareNativeConnection is true

LettuceConnectionFactory.SharedConnection

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

    /**
     * Wrapper for shared connections. Keeps track of the connection lifecycleThe wrapper is thread-safe as it
     * synchronizes concurrent calls by blocking.
     *
     * @param <E> connection encoding.
     * @author Mark Paluch
     * @author Christoph Strobl
     * @since 2.1
     */
    @RequiredArgsConstructor
    class SharedConnection<E> {

        private final LettuceConnectionProvider connectionProvider;
        private final boolean shareNativeClusterConnection;

        /** Synchronization monitor for the shared Connection */
        private final Object connectionMonitor = new Object();

        private @Nullable StatefulConnection<E, E> connection;

        /**
         * Returns a valid Lettuce connection. Initializes and validates the connection if
         * {@link #setValidateConnection(boolean) enabled}.
         *
         * @return the connection.
         */
        @Nullable
        StatefulConnection<E, E> getConnection() {

            synchronized (this.connectionMonitor) {

                if (this.connection == null) {
                    this.connection = getNativeConnection();
                }

                if (getValidateConnection()) {
                    validateConnection();
                }

                return this.connection;
            }
        }

        /**
         * Obtain a connection from the associated {@link LettuceConnectionProvider}.
         *
         * @return the connection.
         */
        @Nullable
        private StatefulConnection<E, E> getNativeConnection() {

            try {

                if (isClusterAware() && !shareNativeClusterConnection) {
                    return null;
                }

                StatefulConnection<E, E> connection = connectionProvider.getConnection(StatefulConnection.class);
                if (connection instanceof StatefulRedisConnection && getDatabase() > 0) {
                    ((StatefulRedisConnection) connection).sync().select(getDatabase());
                }

                return connection;
            } catch (RedisException e) {
                throw new RedisConnectionFailureException("Unable to connect to Redis", e);
            }
        }

        /**
         * Validate the connection. Invalid connections will be closed and the connection state will be reset.
         */
        void validateConnection() {

            synchronized (this.connectionMonitor) {

                boolean valid = false;

                if (connection != null && connection.isOpen()) {
                    try {

                        if (connection instanceof StatefulRedisConnection) {
                            ((StatefulRedisConnection) connection).sync().ping();
                        }

                        if (connection instanceof StatefulRedisClusterConnection) {
                            ((StatefulRedisConnection) connection).sync().ping();
                        }
                        valid = true;
                    } catch (Exception e) {
                        log.debug("Validation failed", e);
                    }
                }

                if (!valid) {

                    if (connection != null) {
                        connectionProvider.release(connection);
                    }

                    log.warn("Validation of shared connection failed. Creating a new connection.");

                    resetConnection();
                    this.connection = getNativeConnection();
                }
            }
        }

        /**
         * Reset the underlying shared Connection, to be reinitialized on next access.
         */
        void resetConnection() {

            synchronized (this.connectionMonitor) {

                if (this.connection != null) {
                    this.connectionProvider.release(this.connection);
                }

                this.connection = null;
            }
        }
    }
  • Note that StatefulConnection is maintained here. when the first is null, getNativeConnection is called to get it
  • In addition, note that getValidateconnection here is false by default, that is, as long as connection is not null, it will not be returned and the same Connection will be used every time.
  • If validate is turned on, it will be validated every time get. Its validate method not only judges isOpen, but also judges ping. If timeout occurs, the connection will be released/returned and retrieved again (If connection pool is used, borrow will be repeated.)
  • There is something wrong with the validateConnection method here, calling connectionprovider.release (connection) twice.

LettucePoolingConnectionProvider.release

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java

    @Override
    public void release(StatefulConnection<?, ?> connection) {

        GenericObjectPool<StatefulConnection<?, ?>> pool = poolRef.remove(connection);

        if (pool == null) {
            throw new PoolException("Returned connection " + connection
                    + " was either previously returned or does not belong to this connection provider");
        }

        pool.returnObject(connection);
    }
  • The second time the same connection is remove, the pool is null, and then a PoolException is thrown, too late to execute the returnObject method.

ConnectionWatchdog

lettuce-core-5.0.5.RELEASE-sources.jar! /io/lettuce/core/protocol/ConnectionWatchdog.java

/**
 * A netty {@link ChannelHandler} responsible for monitoring the channel and reconnecting when the connection is lost.
 *
 * @author Will Glozer
 * @author Mark Paluch
 * @author Koji Lin
 */
@ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
    //......
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        logger.debug("{} channelInactive()", logPrefix());
        if (!armed) {
            logger.debug("{} ConnectionWatchdog not armed", logPrefix());
            return;
        }

        channel = null;

        if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
            scheduleReconnect();
        } else {
            logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
        }

        super.channelInactive(ctx);
    }

    /**
     * Schedule reconnect if channel is not available/not active.
     */
    public void scheduleReconnect() {

        logger.debug("{} scheduleReconnect()", logPrefix());

        if (!isEventLoopGroupActive()) {
            logger.debug("isEventLoopGroupActive() == false");
            return;
        }

        if (!isListenOnChannelInactive()) {
            logger.debug("Skip reconnect scheduling, listener disabled");
            return;
        }

        if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {

            attempts++;
            final int attempt = attempts;
            int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();
            logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);

            this.reconnectScheduleTimeout = timer.newTimeout(it -> {

                reconnectScheduleTimeout = null;

                if (!isEventLoopGroupActive()) {
                    logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");
                    return;
                }

                reconnectWorkers.submit(() -> {
                    ConnectionWatchdog.this.run(attempt);
                    return null;
                });
            }, timeout, TimeUnit.MILLISECONDS);

            // Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.
            if (!reconnectSchedulerSync.get()) {
                reconnectScheduleTimeout = null;
            }
        } else {
            logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());
        }
    }

    /**
     * Reconnect to the remote address that the closed channel was connected to. This creates a new {@link ChannelPipeline} with
     * the same handler instances contained in the old channel's pipeline.
     *
     * @param attempt attempt counter
     *
     * @throws Exception when reconnection fails.
     */
    public void run(int attempt) throws Exception {

        reconnectSchedulerSync.set(false);
        reconnectScheduleTimeout = null;

        if (!isEventLoopGroupActive()) {
            logger.debug("isEventLoopGroupActive() == false");
            return;
        }

        if (!isListenOnChannelInactive()) {
            logger.debug("Skip reconnect scheduling, listener disabled");
            return;
        }

        if (isReconnectSuspended()) {
            logger.debug("Skip reconnect scheduling, reconnect is suspended");
            return;
        }

        boolean shouldLog = shouldLog();

        InternalLogLevel infoLevel = InternalLogLevel.INFO;
        InternalLogLevel warnLevel = InternalLogLevel.WARN;

        if (shouldLog) {
            lastReconnectionLogging = System.currentTimeMillis();
        } else {
            warnLevel = InternalLogLevel.DEBUG;
            infoLevel = InternalLogLevel.DEBUG;
        }

        InternalLogLevel warnLevelToUse = warnLevel;

        try {
            reconnectionListener.onReconnect(new ConnectionEvents.Reconnect(attempt));
            logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);

            ChannelFuture future = reconnectionHandler.reconnect();

            future.addListener(it -> {

                if (it.isSuccess() || it.cause() == null) {
                    return;
                }

                Throwable throwable = it.cause();

                if (ReconnectionHandler.isExecutionException(throwable)) {
                    logger.log(warnLevelToUse, "Cannot reconnect: {}", throwable.toString());
                } else {
                    logger.log(warnLevelToUse, "Cannot reconnect: {}", throwable.toString(), throwable);
                }

                if (!isReconnectSuspended()) {
                    scheduleReconnect();
                }
            });
        } catch (Exception e) {
            logger.log(warnLevel, "Cannot reconnect: {}", e.toString());
        }
    }
}
  • This ConnectionWatchdog is specially used to handle the channel that is abnormally close and then reconnect at regular intervals.
  • ReconnectionHandler.reconnect method is adopted for reconnection.

ReconnectionHandler.reconnect

lettuce-core-5.0.5.RELEASE-sources.jar! /io/lettuce/core/protocol/ReconnectionHandler.java

class ReconnectionHandler {
    //......

    /**
     * Initiate reconnect and return a {@link ChannelFuture} for synchronization. The resulting future either succeeds or fails.
     * It can be {@link ChannelFuture#cancel(boolean) canceled} to interrupt reconnection and channel initialization. A failed
     * {@link ChannelFuture} will close the channel.
     *
     * @return reconnect {@link ChannelFuture}.
     */
    protected ChannelFuture reconnect() {

        SocketAddress remoteAddress = socketAddressSupplier.get();

        logger.debug("Reconnecting to Redis at {}", remoteAddress);

        ChannelFuture connectFuture = bootstrap.connect(remoteAddress);
        ChannelPromise initFuture = connectFuture.channel().newPromise();

        initFuture.addListener((ChannelFuture it) -> {

            if (it.cause() != null) {

                connectFuture.cancel(true);
                close(it.channel());
            }
        });

        connectFuture.addListener((ChannelFuture it) -> {

            if (it.cause() != null) {

                initFuture.tryFailure(it.cause());
                return;
            }

            ChannelPipeline pipeline = it.channel().pipeline();

            RedisChannelInitializer channelInitializer = pipeline.get(RedisChannelInitializer.class);

            if (channelInitializer == null) {

                initFuture.tryFailure(new IllegalStateException(
                        "Reconnection attempt without a RedisChannelInitializer in the channel pipeline"));
                return;
            }

            channelInitializer.channelInitialized().whenComplete(
                    (state, throwable) -> {

                        if (throwable != null) {

                            if (isExecutionException(throwable)) {
                                initFuture.tryFailure(throwable);
                                return;
                            }

                            if (clientOptions.isCancelCommandsOnReconnectFailure()) {
                                connectionFacade.reset();
                            }

                            if (clientOptions.isSuspendReconnectOnProtocolFailure()) {

                                logger.error("Disabling autoReconnect due to initialization failure", throwable);
                                setReconnectSuspended(true);
                            }

                            initFuture.tryFailure(throwable);

                            return;
                        }

                        if (logger.isDebugEnabled()) {
                            logger.info("Reconnected to {}, Channel {}", remoteAddress,
                                    ChannelLogDescriptor.logDescriptor(it.channel()));
                        } else {
                            logger.info("Reconnected to {}", remoteAddress);
                        }

                        initFuture.trySuccess();
                    });
        });

        Runnable timeoutAction = () -> {
            initFuture.tryFailure(new TimeoutException(String.format("Reconnection attempt exceeded timeout of %d %s ",
                    timeout, timeoutUnit)));
        };

        Timeout timeoutHandle = timer.newTimeout(it -> {

            if (connectFuture.isDone() && initFuture.isDone()) {
                return;
            }

            if (reconnectWorkers.isShutdown()) {
                timeoutAction.run();
                return;
            }

            reconnectWorkers.submit(timeoutAction);

        }, this.timeout, timeoutUnit);

        initFuture.addListener(it -> timeoutHandle.cancel());

        return this.currentFuture = initFuture;
    }
}
  • ReconnectionHandler.reconnect is to reconnect based on netty, and the connection failed
  • If the reconnection is successful, cancel the timeoutHandle; otherwise, try again with delay.

Summary

  • Lettuce default shareNativeConnection parameter is true and validateConnection is false
  • If a thread pool is used, the default is borrow once, and then it is reused and not returned. However, the scene of docker pause cannot be effectively recognized and is always reported to command timeout.
  • For shareNativeConnection that is not returned, lettuce has a ConnectionWatchdog for continuous reconnection processing.
  • If validateConnection is true, a check will be made every time a get connection is made, and the check failure will theoretically be returned to the connection pool, and then the connection will be reconnected to obtain a new nativeConnection (If the connection pool fails to establish the connection, it will throw out org.springframework.data.redis.connection.poolexception: couldnotgetaresource from the pool; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to 192.168.99.100:6379)

However, because the validateConnection method of letterconnectionfactory.sharedconnection repeatedly calls connectionprovider.release (connection) when the check fails. Resulting in the throw of org.springframework.data.redis.connection.poolsexception: returnedconnectio.letter.core.statefulredisconnectionimpl @ 1e4ad4awas eitherprevious An exception occurred in this connection provider.

doc