Talk about DelayedQueue of redisson.

  redis

Order

This article mainly studies redisson’s DelayedQueue.

maven

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.8.1</version>
        </dependency>

Example

    @Test
    public void testDelayedQueue() throws InterruptedException {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://192.168.99.100:6379");
        RedissonClient redisson = Redisson.create(config);
        RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue1");
        RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
        delayedQueue.offer("demo", 10, TimeUnit.SECONDS);
        Assert.assertFalse(blockingQueue.contains("demo"));
        TimeUnit.SECONDS.sleep(15);
        Assert.assertTrue(blockingQueue.contains("demo"));
    }
  • Two queues are used here. The offer operation to the delayedQueue is to directly enter the delayedQueue, but delay acts on the target queue, which is RBlockingQueue here.

Source code analysis

RDelayedQueue.offer

redisson-3.8.1-sources.jar! /org/redisson/RedissonDelayedQueue.java

public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {

    private final QueueTransferService queueTransferService;
    private final String channelName;
    private final String queueName;
    private final String timeoutSetName;
    
    protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
        super(codec, commandExecutor, name);
        channelName = prefixName("redisson_delay_queue_channel", getName());
        queueName = prefixName("redisson_delay_queue", getName());
        timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
        
        //QueueTransferTask task = ......
        
        queueTransferService.schedule(queueName, task);
        
        this.queueTransferService = queueTransferService;
    }

    public void offer(V e, long delay, TimeUnit timeUnit) {
        get(offerAsync(e, delay, timeUnit));
    }
    
    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
        long delayInMs = timeUnit.toMillis(delay);
        long timeout = System.currentTimeMillis() + delayInMs;
     
        long randomId = PlatformDependent.threadLocalRandom().nextLong();
        return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"
              + "redis.call('rpush', KEYS[3], value);"
              // if new object added to queue head when publish its startTime 
              // to all scheduler workers 
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
              + "if v[1] == value then "
                 + "redis.call('publish', KEYS[4], ARGV[1]); "
              + "end;"
                 ,
              Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), 
              timeout, randomId, encode(e));
    }

    public ByteBuf encode(Object value) {
        if (commandExecutor.isRedissonReferenceSupportEnabled()) {
            RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
            if (reference != null) {
                value = reference;
            }
        }
        
        try {
            return codec.getValueEncoder().encode(value);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public static String prefixName(String prefix, String name) {
        if (name.contains("{")) {
            return prefix + ":" + name;
        }
        return prefix + ":{" + name + "}";
    }

    //......
}
  • A lua script is used here, in which the keys parameter array has four values, KEYS[1] is getName(), KEYS[2] is timeoutSetName, KEYS[3] is queuename, and keys [4] is channelName
  • There are three variables, ARGV[1] is timeout, ARGV[2] is randomId, and ARGV[3] is encode(e)
  • This lua script adds a structure to zset of timeoutSetName, whose score is timeout value; Add a structure to the footer of queueName’s list; Then judge whether the first element of zset of timeoutSetName is the current structure, and if so, issue timeout message to channel.

queueTransferService.schedule

redisson-3.8.1-sources.jar! /org/redisson/RedissonDelayedQueue.java

        QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
            
            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                        "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                      + "if #expiredValues > 0 then "
                          + "for i, v in ipairs(expiredValues) do "
                              + "local randomId, value = struct.unpack('dLc0', v);"
                              + "redis.call('rpush', KEYS[1], value);"
                              + "redis.call('lrem', KEYS[3], 1, v);"
                          + "end; "
                          + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                      + "end; "
                        // get startTime from scheduler queue head task
                      + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                      + "if v[1] ~= nil then "
                         + "return v[2]; "
                      + "end "
                      + "return nil;",
                      Arrays.<Object>asList(getName(), timeoutSetName, queueName), 
                      System.currentTimeMillis(), 100);
            }
            
            @Override
            protected RTopic<Long> getTopic() {
                return new RedissonTopic<Long>(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };
        
        queueTransferService.schedule(queueName, task);
  • The QueueTransferTask is scheduled in the RedissonDelayedQueue constructor
  • PushTaskAsync method is used for scheduling and execution, which mainly moves expired elements from the element queue to the target queue.
  • A lua script is used here, KEYS[1] is getName (), KEYS[2] is timeoutSetName, and KEYS[3] is queueName; ; ARGV[1] is the current timestamp and ARGV[2] is 100
  • Zrangebyscore is called here to sort the zset of timeoutSetName using the timeout parameter to obtain elements with scores between 0 and the current timestamp, taking the first 200
  • If there is a value indicating that the element needs to be handed over to the target queue, then call rpush to hand over to the target queue, then call lrem to remove from the element queue, and finally delete the processed elements from zset of timeoutSetName
  • After the element transfer has been processed, the score of the first element of zset of timeoutSetName is taken back, if no nil is returned

QueueTransferService.schedule

redisson-3.8.1-sources.jar! /org/redisson/QueueTransferService.java

public class QueueTransferService {

    private final ConcurrentMap<String, QueueTransferTask> tasks = PlatformDependent.newConcurrentHashMap();
    
    public synchronized void schedule(String name, QueueTransferTask task) {
        QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
        if (oldTask == null) {
            task.start();
        } else {
            oldTask.incUsage();
        }
    }
    
    public synchronized void remove(String name) {
        QueueTransferTask task = tasks.get(name);
        if (task != null) {
            if (task.decUsage() == 0) {
                tasks.remove(name, task);
                task.stop();
            }
        }
    }
}
  • The schedule method here is first added to the ConcurrentMap. if the task already exists, oldTask.incUsage () is called, and if it does not exist, the task is started.

QueueTransferTask.start

redisson-3.8.1-sources.jar! /org/redisson/QueueTransferTask.java

    public void start() {
        RTopic<Long> schedulerTopic = getTopic();
        statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
            @Override
            public void onSubscribe(String channel) {
                pushTask();
            }
        });
        
        messageListenerId = schedulerTopic.addListener(new MessageListener<Long>() {
            @Override
            public void onMessage(CharSequence channel, Long startTime) {
                scheduleTask(startTime);
            }
        });
    }

    private void scheduleTask(final Long startTime) {
        TimeoutTask oldTimeout = lastTimeout.get();
        if (startTime == null) {
            return;
        }
        
        if (oldTimeout != null) {
            oldTimeout.getTask().cancel();
        }
        
        long delay = startTime - System.currentTimeMillis();
        if (delay > 10) {
            Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
                @Override
                public void run(Timeout timeout) throws Exception {
                    pushTask();
                    
                    TimeoutTask currentTimeout = lastTimeout.get();
                    if (currentTimeout.getTask() == timeout) {
                        lastTimeout.compareAndSet(currentTimeout, null);
                    }
                }
            }, delay, TimeUnit.MILLISECONDS);
            if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                timeout.cancel();
            }
        } else {
            pushTask();
        }
    }

    private void pushTask() {
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(io.netty.util.concurrent.Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    if (future.cause() instanceof RedissonShutdownException) {
                        return;
                    }
                    log.error(future.cause().getMessage(), future.cause());
                    scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                    return;
                }
                
                if (future.getNow() != null) {
                    scheduleTask(future.getNow());
                }
            }
        });
    }
  • RTopic is used here, with StatusListener and MessageListener added.
  • The StatusListener triggers pushTask when subscribing, and MessageListener mainly calls scheduleTask.
  • PushTaskAsync’s implementation in RedissonDelayedQueue is the transfer of the above-mentioned implementation elements in the original queue and the target queue.
  • The scheduleTask method recalculates the delay. for delays greater than 10, pushTask will be triggered; for delays less than or equal to 10, pushTask will be triggered immediately.
  • PushTask will call back the pushTaskAsync operation. If the execution is unsuccessful, it will trigger the scheduleTask again. If the execution is successful but the return value (The score of the first element of the zset of timeoutSetName.) is not null, the scheduleTask is triggered with this value

Summary

  • Redisson’s DelayedQueue uses elements and delay information to queue, and then scheduled tasks transfer expired elements to the target queue.
  • Three structures are used here to store, one is the target queue list; ; One is the native queue list, adding structures with delay information; One is zset of timeoutSetName, the element is a structure, and its score is timeout value.
  • Redisson uses many asynchronous callbacks to operate, which makes the overall code reading relatively difficult.

doc