Talking about redisson’s Distributed Lock

  redis

Order

This article mainly studies redisson’s distributed locks.

maven

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.8.1</version>
        </dependency>

Example

    @Test
    public void testDistributedLock(){
        Config config = new Config();
//        config.setTransportMode(TransportMode.EPOLL);
        config.useSingleServer()
                .setAddress("redis://192.168.99.100:6379");
        RedissonClient redisson = Redisson.create(config);


        IntStream.rangeClosed(1,5)
                .parallel()
                .forEach(i -> {
                    executeLock(redisson);
                });

        executeLock(redisson);
    }

    public void executeLock(RedissonClient redisson){
        RLock lock = redisson.getLock("myLock");
        boolean locked = false;
        try{
            LOGGER.info("try lock");
            locked = lock.tryLock();
//            locked = lock.tryLock(1,2,TimeUnit.MINUTES);
            LOGGER.info("get lock result:{}",locked);
            if(locked){
                TimeUnit.HOURS.sleep(1);
                LOGGER.info("get lock and finish");
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            LOGGER.info("enter unlock");
            if(locked){
                lock.unlock();
            }
        }
    }

Source code analysis

RedissonLock.tryLock

redisson-3.8.1-sources.jar! /org/redisson/RedissonLock.java

    @Override
    public boolean tryLock() {
        return get(tryLockAsync());
    }

    @Override
    public RFuture<Boolean> tryLockAsync() {
        return tryLockAsync(Thread.currentThread().getId());
    }

    @Override
    public RFuture<Boolean> tryLockAsync(long threadId) {
        return tryAcquireOnceAsync(-1, null, threadId);
    }

    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Boolean ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

    protected String getLockName(long threadId) {
        return id + ":" + threadId;
    }

    private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }

        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
            task.cancel();
        }
    }

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
    }

    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
  • If leaseTime is not set here, the default is -1, and commandexecutor.getconnectionmanager (). getcfg (). getlockatchdogotimeout (), the default is 30 seconds.
  • TryLockInnerAsync uses a lua script that has 3 parameters, the first parameter is the KEYS array, and the following parameters are the elements of the ARGV array
  • Here, the value of key is the name of the redissonLock specified by the caller, and two variables, the first is leaseTime and the second is the name of the lock, using the id of redissonLock+thread id
  • The first method of lua script determines whether the hashmap of redissonLock exists. If it does not exist, it is created. The hashmap has a key of entry as the lock name and valude as 1, and then sets the hashmap expiration time as leaseTime.
  • The second method of lua script is to increase the value of the lock name by 1 in the presence of hashmap of redissonLock and set the expiration time to leaseTime.
  • Finally, the ttl of the key of the redissonLock name is returned.
  • After successful execution, it is determined whether ttl still has a value, and if so, it is called scheduleExpirationRenewal to prevent lock from becoming invalid before it is completed.
  • ScheduleExpirationRenewal is to register a delay task, which is triggered when internalLockLeaseTime/3. The execution method is renewExpirationAsync, which resets the lock expiration time back to InternalLockaseTime.
  • SchedulexpirationRenewal adds a listener to the SchedulexpirationRenewal task. If the setting is successful, it will recursively call SchedulexpirationRenewal again to re-register the delayed task.
  • TryAcquireSync (long Lead Time, Time Unit, Final Long Threaded) method is the method called when specifying the automatic unlocking time. It is different from tryAcquireOnceAsync in that it uses Long value to judge the square return value of ttl. If it is null, Only then does it execute the timing task of prolonging the expiration time, while the tryAcquireOnceAsync method uses BooleanNullReplayConvertor, and returns true as long as the return is not null

RedissonLock.unlock

redisson-3.8.1-sources.jar! /org/redisson/RedissonLock.java

    @Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException)e.getCause();
            } else {
                throw e;
            }
        }
        
//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

    @Override
    public RFuture<Void> unlockAsync(final long threadId) {
        final RPromise<Void> result = new RedissonPromise<Void>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    cancelExpirationRenewal(threadId);
                    result.tryFailure(future.cause());
                    return;
                }

                Boolean opStatus = future.getNow();
                if (opStatus == null) {
                    IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + threadId);
                    result.tryFailure(cause);
                    return;
                }
                if (opStatus) {
                    cancelExpirationRenewal(null);
                }
                result.trySuccess(null);
            }
        });

        return result;
    }

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }

    String getChannelName() {
        return prefixName("redisson_lock__channel", getName());
    }

    void cancelExpirationRenewal(Long threadId) {
        ExpirationEntry task = expirationRenewalMap.get(getEntryName());
        if (task != null && (threadId == null || task.getThreadId() == threadId)) {
            expirationRenewalMap.remove(getEntryName());
            task.getTimeout().cancel();
        }
    }
  • UnlockInnerAsync releases the lock through a lua script that uses two keys, one is the redissonLock name and the other is the channelName
  • There are three variables used by this lua, one is the unlockMessage of pubSub, which defaults to 0, the other is internalLockLeaseTime, which defaults to commandexecutor.getconnectionmanager (). getcfg (). getlockatchdogtimeout (), and the other is the lock name
  • If the redissonLock does not exist, directly publishing the unlock message returns 1; If the lock does not exist, nil; is returned;
  • If the lock exists, count it to -1; if the counter is greater than 0, reset the lower failure time and return to 0; If the counter is not greater than 0, delete the redissonLock lock, issue unlockMessage, and return 1; If none of the above conditions hit the nil
  • In unlockAsync, the FutureListener is registered for unlockInnerAsync, mainly calling cancelExpirationRenewal to cancel the scheduleExpirationRenewal task.

LockPubSub

redisson-3.8.1-sources.jar! /org/redisson/pubsub/LockPubSub.java

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long unlockMessage = 0L;

    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

            value.getLatch().release();
        }
    }

}
  • When receiving unlockMessage, it will call listener of RedissonLockEntry and then trigger release of latch.
  • The tryAcquireOnceAsync method does not create LockPubSub by default, and no automatic unlocking time is specified, then the timed task will extend the expiration time all the time, which may have the risk that the lock has not been released.

Summary

Locking has the following precautions:

  • Locking requires setting a timeout to prevent deadlock.
  • When locking and setting the timeout time, it is necessary to ensure the atomicity of the two operations, so it is better to use lua script or set method supporting NX and EX
  • When locking, you need to record the caller information of the lock, such as thread id, which you need to use when unlocking.
  • For tasks with uncertain locking duration, in order to prevent the overtime from being released due to incomplete execution of tasks, it is necessary to extend the expiration time of tasks that have not yet been completed.

There are the following precautions for unlocking:

  • Unlock a series of operations (Judging whether the key exists, deleting the key if it exists, etc.) Atomicity is required, so it is best to use lua scripts
  • Unlocking requires judging whether the caller is consistent with the record when locking, so as to prevent the lock from being deleted by mistake.
  • If there is a delay task that extends the expiration time, the task needs to be terminated when unlocking.

doc