Order
Activiti created a job mechanism and solved several problems of the job in the cluster:
-
How to Ensure Only One Machine to Execute a job in a Cluster
-
Job failed during processing (other exceptions or server restart). how to handle these failed job at this time and run again
ACT_RU_JOB table structure
Field name | Field description | Data type | Primary key | Empty | explain | |
---|---|---|---|---|---|---|
ID_ | varchar(64) | √ | ||||
REV_ | integer | √ | ||||
TYPE_ | varchar(255) | |||||
LOCK_EXP_TIME_ | Lock release time | timestamp(3) | √ | |||
LOCK_OWNER_ | Suspended by | varchar(255) | √ | |||
EXCLUSIVE_ | boolean | √ | ||||
EXECUTION_ID_ | varchar(64) | √ | ||||
PROCESS_INSTANCE_ID_ | varchar(64) | √ | ||||
PROC_DEF_ID_ | varchar(64) | √ | ||||
RETRIES_ | integer | √ | ||||
EXCEPTION_STACK_ID_ | Exception id | varchar(64) | √ | |||
EXCEPTION_MSG_ | abnormal information | varchar(4000) | √ | |||
DUEDATE_ | Expiration time | timestamp(3) | √ | |||
REPEAT_ | Repeat | varchar(255) | √ | |||
HANDLER_TYPE_ | Processing type | varchar(255) | √ | |||
HANDLER_CFG_ | varchar(4000) | √ | ||||
TENANT_ID_ | varchar(255) | √ |
Query pending job
SELECT RES.* FROM
ACT_RU_JOB RES
LEFT OUTER JOIN
ACT_RU_EXECUTION PI ON PI.ID_ = RES.PROCESS_INSTANCE_ID_
WHERE (RETRIES_ > 0)
AND (DUEDATE_ is null or DUEDATE_ <= ?)
AND (LOCK_OWNER_ is null or LOCK_EXP_TIME_ <= ?)
AND ( (RES.EXECUTION_ID_ is null) or (PI.SUSPENSION_STATE_ = 1) );
The query criteria are
-
Retries value > 0
-
Dueddate is empty or dueddate is less than current.
-
Lock_owner is empty or lock_exp_time is less than current
Seize a task
AcquireJobsCmd
protected void lockJob(CommandContext commandContext, JobEntity job, String lockOwner, int lockTimeInMillis) {
job.setLockOwner(lockOwner);
GregorianCalendar gregorianCalendar = new GregorianCalendar();
gregorianCalendar.setTime(commandContext.getProcessEngineConfiguration().getClock().getCurrentTime());
gregorianCalendar.add(Calendar.MILLISECOND, lockTimeInMillis);
job.setLockExpirationTime(gregorianCalendar.getTime());
}
Set lock_owner and lock_exp_time
LockTimeInMillis e.g. 5601000,5 minutes
Job executed successfully
private void removeJobs() {
for (Job job: getJobs()) {
((JobEntity) job).delete();
}
}
Remove from job Table
Job execution failed
if (activity == null || activity.getFailedJobRetryTimeCycleValue() == null) {
log.debug("activitiy or FailedJobRetryTimerCycleValue is null in job " + jobId + "'. only decrementing retries.");
job.setRetries(job.getRetries() - 1);
job.setLockOwner(null);
job.setLockExpirationTime(null);
if (job.getDuedate() == null) {
// add wait time for failed async job
job.setDuedate(calculateDueDate(commandContext, processEngineConfig.getAsyncFailedJobWaitTime(), null));
} else {
// add default wait time for failed job
job.setDuedate(calculateDueDate(commandContext, processEngineConfig.getDefaultFailedJobWaitTime(), job.getDuedate()));
}
}
-
Retries minus 1
-
Clear lock_owner
-
Clear lock_exp_time
-
Reset due_date, delay retry
/** define the default wait time for a failed async job in seconds */
protected int asyncFailedJobWaitTime = 10;