Talk about jesque’s event mechanism.

  redis

Order

This article mainly introduces jesque’s event mechanism.

WorkerEvent

jesque-2.1.2-sources.jar! /net/greghaines/jesque/worker/WorkerEvent.java

/**
 * The possible WorkerEvents that a WorkerListener may register for.
 */
public enum WorkerEvent {
    
    /**
     * The Worker just finished starting up and is about to start running.
     */
    WORKER_START,
    /**
     * The Worker is polling the queue.
     */
    WORKER_POLL,
    /**
     * The Worker is processing a Job.
     */
    JOB_PROCESS,
    /**
     * The Worker is about to execute a materialized Job.
     */
    JOB_EXECUTE,
    /**
     * The Worker successfully executed a materialized Job.
     */
    JOB_SUCCESS,
    /**
     * The Worker caught an Exception during the execution of a materialized Job.
     */
    JOB_FAILURE,
    /**
     * The Worker caught an Exception during normal operation.
     */
    WORKER_ERROR,
    /**
     * The Worker just finished running and is about to shutdown.
     */
    WORKER_STOP;
}

JOB_PROCESS and JOB_EXECUTE can be a bit confusing. There is an operation between the two to update the status of redis and to instantiate the job, while JOB_EXECUTE means before execute
JOB_SUCCESS and JOB_FAILURE mean after execute.

WorkerEventEmitter

jesque-2.1.2-sources.jar! /net/greghaines/jesque/worker/WorkerEventEmitter.java

/**
 * A WorkerEventEmitter allows WorkerListeners to register for WorkerEvents.
 */
public interface WorkerEventEmitter {
    
    /**
     * Register a WorkerListener for all WorkerEvents. 
     * @param listener the WorkerListener to register
     */
    void addListener(WorkerListener listener);

    /**
     * Register a WorkerListener for the specified WorkerEvents.
     * @param listener the WorkerListener to register
     * @param events the WorkerEvents to be notified of
     */
    void addListener(WorkerListener listener, WorkerEvent... events);

    /**
     * Unregister a WorkerListener for all WorkerEvents.
     * @param listener the WorkerListener to unregister
     */
    void removeListener(WorkerListener listener);

    /**
     * Unregister a WorkerListener for the specified WorkerEvents.
     * @param listener the WorkerListener to unregister
     * @param events the WorkerEvents to no longer be notified of
     */
    void removeListener(WorkerListener listener, WorkerEvent... events);

    /**
     * Unregister all WorkerListeners for all WorkerEvents.
     */
    void removeAllListeners();

    /**
     * Unregister all WorkerListeners for the specified WorkerEvents.
     * @param events the WorkerEvents to no longer be notified of
     */
    void removeAllListeners(WorkerEvent... events);
}

Defines the interface of event emitter.

WorkerListenerDelegate

jesque-2.1.0-sources.jar! /net/greghaines/jesque/worker/WorkerListenerDelegate.java

/**
 * WorkerListenerDelegate keeps track of WorkerListeners and notifies each listener when fireEvent() is invoked.
 */
public class WorkerListenerDelegate implements WorkerEventEmitter {
    
    private static final Logger log = LoggerFactory.getLogger(WorkerListenerDelegate.class);

    private final Map<WorkerEvent, ConcurrentSet<WorkerListener>> eventListenerMap;

    /**
     * Constructor.
     */
    public WorkerListenerDelegate() {
        final Map<WorkerEvent, ConcurrentSet<WorkerListener>> elp = 
                new EnumMap<WorkerEvent, ConcurrentSet<WorkerListener>>(WorkerEvent.class);
        for (final WorkerEvent event : WorkerEvent.values()) {
            elp.put(event, new ConcurrentHashSet<WorkerListener>());
        }
        this.eventListenerMap = Collections.unmodifiableMap(elp);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void addListener(final WorkerListener listener) {
        addListener(listener, WorkerEvent.values());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void addListener(final WorkerListener listener, final WorkerEvent... events) {
        if (listener != null) {
            for (final WorkerEvent event : events) {
                final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
                if (listeners != null) {
                    listeners.add(listener);
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeListener(final WorkerListener listener) {
        removeListener(listener, WorkerEvent.values());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeListener(final WorkerListener listener, final WorkerEvent... events) {
        if (listener != null) {
            for (final WorkerEvent event : events) {
                final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
                if (listeners != null) {
                    listeners.remove(listener);
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeAllListeners() {
        removeAllListeners(WorkerEvent.values());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeAllListeners(final WorkerEvent... events) {
        for (final WorkerEvent event : events) {
            final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
            if (listeners != null) {
                listeners.clear();
            }
        }
    }

    /**
     * Notify all WorkerListeners currently registered for the given WorkerEvent.
     * @param event the WorkerEvent that occurred
     * @param worker the Worker that the event occurred in
     * @param queue the queue the Worker is processing
     * @param job the Job related to the event (only supply for JOB_PROCESS, JOB_EXECUTE, JOB_SUCCESS, and 
     * JOB_FAILURE events)
     * @param runner the materialized object that the Job specified (only supply for JOB_EXECUTE and 
     * JOB_SUCCESS events)
     * @param result the result of the successful execution of the Job (only set for JOB_SUCCESS and if the Job was 
     * a Callable that returned a value)
     * @param t the Throwable that caused the event (only supply for JOB_FAILURE and ERROR events)
     */
    public void fireEvent(final WorkerEvent event, final Worker worker, final String queue, final Job job, 
            final Object runner, final Object result, final Throwable t) {
        final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
        if (listeners != null) {
            for (final WorkerListener listener : listeners) {
                if (listener != null) {
                    try {
                        listener.onEvent(event, worker, queue, job, runner, result, t);
                    } catch (Exception e) {
                        log.error("Failure executing listener " + listener + " for event " + event 
                                + " from queue " + queue + " on worker " + worker, e);
                    }
                }
            }
        }
    }
}

The implementation class of event emitter uses EnumMap to store the listener, key is the WorkerEvent enumeration, and value is the concurrent of the listener, i.e. the same event can have multiple listeners.

Triggering of events

jesque-2.1.2-sources.jar! /net/greghaines/jesque/worker/WorkerImpl.java

protected final WorkerListenerDelegate listenerDelegate = new WorkerListenerDelegate();

//......
protected void process(final Job job, final String curQueue) {
        try {
            this.processingJob.set(true);
            if (threadNameChangingEnabled) {
                renameThread("Processing " + curQueue + " since " + System.currentTimeMillis());
            }
            this.listenerDelegate.fireEvent(JOB_PROCESS, this, curQueue, job, null, null, null);
            this.jedis.set(key(WORKER, this.name), statusMsg(curQueue, job));
            final Object instance = this.jobFactory.materializeJob(job);
            final Object result = execute(job, curQueue, instance);
            success(job, instance, result, curQueue);
        } catch (Throwable thrwbl) {
            failure(thrwbl, job, curQueue);
        } finally {
            removeInFlight(curQueue);
            this.jedis.del(key(WORKER, this.name));
            this.processingJob.set(false);
        }
    }

In the wokerImpl class, the implementation class of WorkerEventEmitter is combined, and then the corresponding listener is triggered/notified in the corresponding method (The default is synchronous execution)

Summary

In fact, the essence is the observer mode. workerImpl is the observed and listener is the observer. wokerImpl will trigger the corresponding event at the corresponding execution point and synchronously notify Listener to execute the relevant logic.

doc