Job design of activiti

  job

Order

Activiti created a job mechanism and solved several problems of the job in the cluster:

  • How to Ensure Only One Machine to Execute a job in a Cluster

  • Job failed during processing (other exceptions or server restart). how to handle these failed job at this time and run again

ACT_RU_JOB table structure

Field name Field description Data type Primary key Empty explain
ID_ varchar(64)
REV_ integer
TYPE_ varchar(255)
LOCK_EXP_TIME_ Lock release time timestamp(3)
LOCK_OWNER_ Suspended by varchar(255)
EXCLUSIVE_ boolean
EXECUTION_ID_ varchar(64)
PROCESS_INSTANCE_ID_ varchar(64)
PROC_DEF_ID_ varchar(64)
RETRIES_ integer
EXCEPTION_STACK_ID_ Exception id varchar(64)
EXCEPTION_MSG_ abnormal information varchar(4000)
DUEDATE_ Expiration time timestamp(3)
REPEAT_ Repeat varchar(255)
HANDLER_TYPE_ Processing type varchar(255)
HANDLER_CFG_ varchar(4000)
TENANT_ID_ varchar(255)

Query pending job

SELECT RES.* FROM
ACT_RU_JOB RES
LEFT OUTER JOIN
ACT_RU_EXECUTION PI ON PI.ID_ = RES.PROCESS_INSTANCE_ID_ 
WHERE (RETRIES_ > 0)
AND (DUEDATE_ is null or DUEDATE_ <= ?)
AND (LOCK_OWNER_ is null or LOCK_EXP_TIME_ <= ?)
AND ( (RES.EXECUTION_ID_ is null) or (PI.SUSPENSION_STATE_ = 1) );

The query criteria are

  • Retries value > 0

  • Dueddate is empty or dueddate is less than current.

  • Lock_owner is empty or lock_exp_time is less than current

Seize a task

AcquireJobsCmd

protected void lockJob(CommandContext commandContext, JobEntity job, String lockOwner, int lockTimeInMillis) {    
    job.setLockOwner(lockOwner);
    GregorianCalendar gregorianCalendar = new GregorianCalendar();
    gregorianCalendar.setTime(commandContext.getProcessEngineConfiguration().getClock().getCurrentTime());
    gregorianCalendar.add(Calendar.MILLISECOND, lockTimeInMillis);
    job.setLockExpirationTime(gregorianCalendar.getTime());    
  }

Set lock_owner and lock_exp_time
LockTimeInMillis e.g. 5601000,5 minutes

Job executed successfully

private void removeJobs() {
    for (Job job: getJobs()) {
      ((JobEntity) job).delete();
    }
  }

Remove from job Table

Job execution failed

if (activity == null || activity.getFailedJobRetryTimeCycleValue() == null) {
      log.debug("activitiy or FailedJobRetryTimerCycleValue is null in job " + jobId + "'. only decrementing retries.");
      job.setRetries(job.getRetries() - 1);
      job.setLockOwner(null);
      job.setLockExpirationTime(null);
      if (job.getDuedate() == null) {
        // add wait time for failed async job
        job.setDuedate(calculateDueDate(commandContext, processEngineConfig.getAsyncFailedJobWaitTime(), null));
      } else {
        // add default wait time for failed job
        job.setDuedate(calculateDueDate(commandContext, processEngineConfig.getDefaultFailedJobWaitTime(), job.getDuedate()));
      }
      
    }
  • Retries minus 1

  • Clear lock_owner

  • Clear lock_exp_time

  • Reset due_date, delay retry

/** define the default wait time for a failed async job in seconds */
  protected int asyncFailedJobWaitTime = 10;