Talk about several daos of jesque.

  redis

Order

This article mainly talks about several daos of jesque.

Dao list

  • FailureDAO

  • KeysDAO

  • QueueInfoDAO

  • WorkerInfoDAO

FailureDAO

jesque-2.1.0-sources.jar! /net/greghaines/jesque/meta/dao/FailureDAO.java

/**
 * FailureDAO provides access to job failures.
 * 
 * @author Greg Haines
 */
public interface FailureDAO {
    
    /**
     * @return total number of failures
     */
    long getCount();

    /**
     * @param offset offset into the failures
     * @param count number of failures to return
     * @return a sub-list of the failures
     */
    List<JobFailure> getFailures(long offset, long count);

    /**
     * Clear the list of failures.
     */
    void clear();

    /**
     * Re-queue a job for execution.
     * @param index the index into the failure list
     * @return the date the job was re-queued
     */
    Date requeue(long index);

    /**
     * Remove a failure from the list.
     * @param index the index of the failure to remove
     */
    void remove(long index);
}

The main manipulation is namespace:failed, which is a list type

  • count

Use the llen method to get the queue length

  • clear

Delete namespace:failed queue using del

  • getFailures

Query using the rangecommand

  • requeue

According to index, take out the failed job, reset the retry time and put it into the queue.

  • remove

According to index deletion, lrem is used, here is a random value of lset first, then lrem according to this random value

KeysDAO

jesque-2.1.0-sources.jar! /net/greghaines/jesque/meta/dao/KeysDAO.java

/**
 * KeysDAO provides access to available keys.
 * 
 * @author Greg Haines
 */
public interface KeysDAO {
    
    /**
     * Get basic key info.
     * @param key the key name
     * @return the key information or null if the key did not exist
     */
    KeyInfo getKeyInfo(String key);

    /**
     * Get basic key info plus a sub-list of the array value for the key, if applicable.
     * @param key the key name
     * @param offset the offset into the array
     * @param count the number of values to return
     * @return the key information or null if the key did not exist
     */
    KeyInfo getKeyInfo(String key, int offset, int count);

    /**
     * Get basic info on all keys.
     * @return a list of key informations
     */
    List<KeyInfo> getKeyInfos();

    /**
     * @return information about the backing Redis database
     */
    Map<String, String> getRedisInfo();
}
  • getKeyInfo

Use type to get type

  • getKeyInfos

Use the keys * method

  • getRedisInfo

Use info

QueueInfoDAO

jesque-2.1.0-sources.jar! /net/greghaines/jesque/meta/dao/QueueInfoDAO.java

/**
 * QueueInfoDAO provides access to the queues in use by Jesque.
 * 
 * @author Greg Haines
 */
public interface QueueInfoDAO {
    
    /**
     * @return the list of queue names
     */
    List<String> getQueueNames();

    /**
     * @return total number of jobs pending in all queues
     */
    long getPendingCount();

    /**
     * @return total number of jobs processed
     */
    long getProcessedCount();

    /**
     * @return the list of queue informations
     */
    List<QueueInfo> getQueueInfos();

    /**
     * @param name the queue name
     * @param jobOffset the offset into the queue
     * @param jobCount the number of jobs to return
     * @return the queue information or null if the queue does not exist
     */
    QueueInfo getQueueInfo(String name, long jobOffset, long jobCount);

    /**
     * Delete the given queue.
     * @param name the name of the queue
     */
    void removeQueue(String name);
}
  • getQueueNames

Use the smembers method to operate namespace:queues

  • getPendingCount

Calculate the size for each queue and divide it into queue types

private long size(final Jedis jedis, final String queueName) {
        final String key = key(QUEUE, queueName);
        final long size;
        if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZCARD
            size = jedis.zcard(key);
        } else { // Else, use LLEN
            size = jedis.llen(key);
        }
        return size;
    }

The delay queue uses the zcard operation SortSet.
The non-delayed queue uses the llen action list.

  • getProcessedCount

String object directly querying stat

  • getQueueInfos

Incidentally, calculate the size of each queue.

  • removeQueue

public void removeQueue(final String name) {
        PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() {
            /**
             * {@inheritDoc}
             */
            @Override
            public Void doWork(final Jedis jedis) throws Exception {
                jedis.srem(key(QUEUES), name);
                jedis.del(key(QUEUE, name));
                return null;
            }
        });
    }

Queries and Queries were manipulated

WorkerInfoDAO

jesque-2.1.0-sources.jar! /net/greghaines/jesque/meta/dao/WorkerInfoDAO.java

/**
 * WorkerInfoDAO provides access to information about workers.
 * 
 * @author Greg Haines
 */
public interface WorkerInfoDAO {
    
    /**
     * @return total number of workers known
     */
    long getWorkerCount();

    /**
     * @return number of active workers
     */
    long getActiveWorkerCount();

    /**
     * @return number of paused workers
     */
    long getPausedWorkerCount();

    /**
     * @return information about all active workers
     */
    List<WorkerInfo> getActiveWorkers();

    /**
     * @return information about all paused workers
     */
    List<WorkerInfo> getPausedWorkers();

    /**
     * @return information about all workers
     */
    List<WorkerInfo> getAllWorkers();

    /**
     * @param workerName the name of the worker
     * @return information about the given worker or null if that worker does not exist
     */
    WorkerInfo getWorker(String workerName);

    /**
     * @return a map of worker informations by hostname
     */
    Map<String, List<WorkerInfo>> getWorkerHostMap();

    /**
     * Removes the metadata about a worker.
     * 
     * @param workerName
     *            The worker name to remove
     */
    void removeWorker(String workerName);
}
  • getAllWorkers

Smembers operation namespace:workers

  • getActiveWorkers

Smembers operate namespace:workers, and then come out that state is working.

  • getPausedWorkers

Smembers operate namespace:workers, and then come out state is paused.

  • getWorkerCount

Direct scard operation namespace:workers

  • getActiveWorkerCount

Smembers operate namespace:workers, and then come out that state is working.

  • getPausedWorkerCount

Smembers operate namespace:workers, and then come out state is paused.

  • getWorkerHostMap

Smembers operate namespace:workers and then divide maps by host.
This is basically universal, and the other count is basically derived from this

  • removeWorker

public void removeWorker(final String workerName) {
        PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() {
            /**
             * {@inheritDoc}
             */
            @Override
            public Void doWork(final Jedis jedis) throws Exception {
                jedis.srem(key(WORKERS), workerName);
                jedis.del(key(WORKER, workerName), key(WORKER, workerName, STARTED), 
                        key(STAT, FAILED, workerName), key(STAT, PROCESSED, workerName));
                return null;
            }
        });
    }

Operate works and other related objects