Talk about redisTemplate’s Encapsulation of lettuce

  lettuce, redis

Order

This article mainly studies the encapsulation of lettuce by redisTemplate.

RedisTemplate

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/core/RedisTemplate.java

public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>, BeanClassLoaderAware {
    //......
    /**
     * Executes the given action object within a connection that can be exposed or not. Additionally, the connection can
     * be pipelined. Note the results of the pipeline are discarded (making it suitable for write-only scenarios).
     *
     * @param <T> return type
     * @param action callback object to execute
     * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
     * @param pipeline whether to pipeline or not the connection for the execution
     * @return object returned by the action
     */
    @Nullable
    public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {

        Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
        Assert.notNull(action, "Callback object must not be null");

        RedisConnectionFactory factory = getRequiredConnectionFactory();
        RedisConnection conn = null;
        try {

            if (enableTransactionSupport) {
                // only bind resources in case of potential transaction synchronization
                conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
            } else {
                conn = RedisConnectionUtils.getConnection(factory);
            }

            boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);

            RedisConnection connToUse = preProcessConnection(conn, existingConnection);

            boolean pipelineStatus = connToUse.isPipelined();
            if (pipeline && !pipelineStatus) {
                connToUse.openPipeline();
            }

            RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
            T result = action.doInRedis(connToExpose);

            // close pipeline
            if (pipeline && !pipelineStatus) {
                connToUse.closePipeline();
            }

            // TODO: any other connection processing?
            return postProcessResult(result, connToUse, existingConnection);
        } finally {
            RedisConnectionUtils.releaseConnection(conn, factory);
        }
    }

    //......
}
  • There are many methods inside redisTemplate, here take Execute (RedisCallback < t > Action, Boolean Expose Connection, Boolean Pipeline) as an example.
  • The inside of the method is to get the RedisConnection first, and then call the action.doInRedis method.
  • The default connection here is DefaultStringRedisConnection.
  • Finally, RedisconnectionUtils. Release Connection (Conn, Factory) will be called here to release the connection.

RedisConnectionUtils.getConnection

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/core/RedisConnectionUtils.java

    /**
     * Gets a Redis connection from the given factory. Is aware of and will return any existing corresponding connections
     * bound to the current thread, for example when using a transaction manager. Will always create a new connection
     * otherwise.
     *
     * @param factory connection factory for creating the connection
     * @return an active Redis connection without transaction management.
     */
    public static RedisConnection getConnection(RedisConnectionFactory factory) {
        return getConnection(factory, false);
    }

    /**
     * Gets a Redis connection. Is aware of and will return any existing corresponding connections bound to the current
     * thread, for example when using a transaction manager. Will create a new Connection otherwise, if
     * {@code allowCreate} is <tt>true</tt>.
     *
     * @param factory connection factory for creating the connection
     * @param allowCreate whether a new (unbound) connection should be created when no connection can be found for the
     *          current thread
     * @param bind binds the connection to the thread, in case one was created
     * @param enableTransactionSupport
     * @return an active Redis connection
     */
    public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
            boolean enableTransactionSupport) {

        Assert.notNull(factory, "No RedisConnectionFactory specified");

        RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);

        if (connHolder != null) {
            if (enableTransactionSupport) {
                potentiallyRegisterTransactionSynchronisation(connHolder, factory);
            }
            return connHolder.getConnection();
        }

        if (!allowCreate) {
            throw new IllegalArgumentException("No connection found and allowCreate = false");
        }

        if (log.isDebugEnabled()) {
            log.debug("Opening RedisConnection");
        }

        RedisConnection conn = factory.getConnection();

        if (bind) {

            RedisConnection connectionToBind = conn;
            if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) {
                connectionToBind = createConnectionProxy(conn, factory);
            }

            connHolder = new RedisConnectionHolder(connectionToBind);

            TransactionSynchronizationManager.bindResource(factory, connHolder);
            if (enableTransactionSupport) {
                potentiallyRegisterTransactionSynchronisation(connHolder, factory);
            }

            return connHolder.getConnection();
        }

        return conn;
    }
  • Factory.getConnection () to get conn

DefaultStringRedisConnection

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/DefaultStringRedisConnection.java

public class DefaultStringRedisConnection implements StringRedisConnection, DecoratedRedisConnection {

    private static final byte[][] EMPTY_2D_BYTE_ARRAY = new byte[0][];

    private final Log log = LogFactory.getLog(DefaultStringRedisConnection.class);
    private final RedisConnection delegate;
    private final RedisSerializer<String> serializer;
    private Converter<byte[], String> bytesToString = new DeserializingConverter();
    private SetConverter<Tuple, StringTuple> tupleToStringTuple = new SetConverter<>(new TupleConverter());
    private SetConverter<StringTuple, Tuple> stringTupleToTuple = new SetConverter<>(new StringTupleConverter());
    private ListConverter<byte[], String> byteListToStringList = new ListConverter<>(bytesToString);
    private MapConverter<byte[], String> byteMapToStringMap = new MapConverter<>(bytesToString);
    private SetConverter<byte[], String> byteSetToStringSet = new SetConverter<>(bytesToString);
    private Converter<GeoResults<GeoLocation<byte[]>>, GeoResults<GeoLocation<String>>> byteGeoResultsToStringGeoResults;

    @SuppressWarnings("rawtypes") private Queue<Converter> pipelineConverters = new LinkedList<>();
    @SuppressWarnings("rawtypes") private Queue<Converter> txConverters = new LinkedList<>();
    private boolean deserializePipelineAndTxResults = false;
    private IdentityConverter<Object, ?> identityConverter = new IdentityConverter<>();

    //......
    @Override
    public Boolean set(byte[] key, byte[] value) {
        return convertAndReturn(delegate.set(key, value), identityConverter);
    }
    //......
}
  • This class is actually delegated to the RedisConnection delegate to execute.
  • For lettuce’s class library, this delegate is LettuceConnection.

LettuceConnection

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/lettuce/LettuceConnection.java

public class LettuceConnection extends AbstractRedisConnection {
    //......
    @Override
    @Deprecated
    default Boolean set(byte[] key, byte[] value) {
        return stringCommands().set(key, value);
    }

    @Override
    public RedisStringCommands stringCommands() {
        return new LettuceStringCommands(this);
    }

    //......
}
  • Take the set(byte[] key, byte[] value) method as an example, which calls the stringCommands().set method

LettuceStringCommands

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/lettuce/LettuceStringCommands.java

@RequiredArgsConstructor
class LettuceStringCommands implements RedisStringCommands {
    //......
    @Override
    public Boolean set(byte[] key, byte[] value) {

        Assert.notNull(key, "Key must not be null!");
        Assert.notNull(value, "Value must not be null!");

        try {
            if (isPipelined()) {
                pipeline(
                        connection.newLettuceResult(getAsyncConnection().set(key, value), Converters.stringToBooleanConverter()));
                return null;
            }
            if (isQueueing()) {
                transaction(
                        connection.newLettuceResult(getAsyncConnection().set(key, value), Converters.stringToBooleanConverter()));
                return null;
            }
            return Converters.stringToBoolean(getConnection().set(key, value));
        } catch (Exception ex) {
            throw convertLettuceAccessException(ex);
        }
    }

    public RedisClusterCommands<byte[], byte[]> getConnection() {
        return connection.getConnection();
    }
    //......
}
  • The getConnection().set method is directly called here, and the getConnection () here actually calls the getConnection method of LettuceConnection.

LettuceConnection.getConnection

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/lettuce/LettuceConnection.java

public class LettuceConnection extends AbstractRedisConnection {
    //......
    protected RedisClusterCommands<byte[], byte[]> getConnection() {

        if (isQueueing()) {
            return getDedicatedConnection();
        }
        if (asyncSharedConn != null) {

            if (asyncSharedConn instanceof StatefulRedisConnection) {
                return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).sync();
            }
            if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
                return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).sync();
            }
        }
        return getDedicatedConnection();
    }

    RedisClusterCommands<byte[], byte[]> getDedicatedConnection() {

        if (asyncDedicatedConn == null) {

            asyncDedicatedConn = doGetAsyncDedicatedConnection();

            if (asyncDedicatedConn instanceof StatefulRedisConnection) {
                ((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync().select(dbIndex);
            }
        }

        if (asyncDedicatedConn instanceof StatefulRedisConnection) {
            return ((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync();
        }
        if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) {
            return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncDedicatedConn).sync();
        }

        throw new IllegalStateException(
                String.format("%s is not a supported connection type.", asyncDedicatedConn.getClass().getName()));
    }

    protected StatefulConnection<byte[], byte[]> doGetAsyncDedicatedConnection() {
        return connectionProvider.getConnection(StatefulConnection.class);
    }
    //......
}
  • If asyncDedicatedConn is null, doGetAsyncDedicatedConnection will be called to obtain StatefulConnection
  • Getconnection encapsulates connection pool related operations.
  • It can be seen that whether statefulredistrictionconnection or statefulredistrictionconnection, the sync method calling the async connection is converted to the synchronization method.

RedisConnectionUtils.releaseConnection

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/core/RedisConnectionUtils.java

    /**
     * Closes the given connection, created via the given factory if not managed externally (i.e. not bound to the
     * thread).
     *
     * @param conn the Redis connection to close.
     * @param factory the Redis factory that the connection was created with.
     */
    public static void releaseConnection(@Nullable RedisConnection conn, RedisConnectionFactory factory) {

        if (conn == null) {
            return;
        }

        RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);

        if (connHolder != null && connHolder.isTransactionSyncronisationActive()) {
            if (log.isDebugEnabled()) {
                log.debug("Redis Connection will be closed when transaction finished.");
            }
            return;
        }

        // release transactional/read-only and non-transactional/non-bound connections.
        // transactional connections for read-only transactions get no synchronizer registered
        if (isConnectionTransactional(conn, factory) && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
            unbindConnection(factory);
        } else if (!isConnectionTransactional(conn, factory)) {
            if (log.isDebugEnabled()) {
                log.debug("Closing Redis Connection");
            }
            conn.close();
        }
    }
  • What is called here is the close method of lettuceConnection.

LettuceConnection.close

spring-data-redis-2.0.10.RELEASE-sources.jar! /org/springframework/data/redis/connection/lettuce/LettuceConnection.java

    @Override
    public void close() throws DataAccessException {

        super.close();

        if (isClosed) {
            return;
        }

        isClosed = true;

        if (asyncDedicatedConn != null) {
            try {
                connectionProvider.release(asyncDedicatedConn);
            } catch (RuntimeException ex) {
                throw convertLettuceAccessException(ex);
            }
        }

        if (subscription != null) {
            if (subscription.isAlive()) {
                subscription.doClose();
            }
            subscription = null;
        }

        this.dbIndex = defaultDbIndex;
    }
  • It can be seen that this is only the release operation for asyncDedicatedConn, and no operation for asyncSharedConn

Summary

  • Lettuce is a reactive asynchronous class library, and the underlying encapsulation of lettuce by redisTemplate of spring-date-redis is to call asynConn’s sync method to convert it into a synchronous method.
  • In addition, redisTemplate encapsulates the acquisition and release of connection. The acquisition calls RedisconnectionUtils. GetConnection and the release calls RedisconnectionUtils. ReleasConnection.
  • Getconnection is called factory.getConnection () to get conn
  • At the bottom of RedisconnectionUtils.releaseConnection is the call to LettuceConnection.close Here, only asyncDedicatedConn is release.

doc