Talk about spring’s async comments

  java

Order

This article mainly talks about async annotations in spring.

AsyncConfigurer

@EnableAsync(proxyTargetClass = true)
@Configuration
public class AsyncConfig implements AsyncConfigurer{

    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfig.class);

    @Override
    public Executor getAsyncExecutor() {
        return null;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
            @Override
            public void handleUncaughtException(Throwable throwable, Method method, Object... params) {
                LOGGER.error(throwable);
            }
        };
    }
}

getAsyncExecutor

Let’s discuss the case where getAsyncExecutor defines null here.
spring-context-4.3.9.RELEASE-sources.jar! /org/springframework/scheduling/annotation/AbstractAsyncConfiguration.java

@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {

    protected AnnotationAttributes enableAsync;

    protected Executor executor;

    protected AsyncUncaughtExceptionHandler exceptionHandler;


    @Override
    public void setImportMetadata(AnnotationMetadata importMetadata) {
        this.enableAsync = AnnotationAttributes.fromMap(
                importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
        if (this.enableAsync == null) {
            throw new IllegalArgumentException(
                    "@EnableAsync is not present on importing class " + importMetadata.getClassName());
        }
    }

    /**
     * Collect any {@link AsyncConfigurer} beans through autowiring.
     */
    @Autowired(required = false)
    void setConfigurers(Collection<AsyncConfigurer> configurers) {
        if (CollectionUtils.isEmpty(configurers)) {
            return;
        }
        if (configurers.size() > 1) {
            throw new IllegalStateException("Only one AsyncConfigurer may exist");
        }
        AsyncConfigurer configurer = configurers.iterator().next();
        this.executor = configurer.getAsyncExecutor();
        this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
    }

}

Get executor here from AsyncConfigurer.

AsyncExecutionInterceptor

spring-aop-4.3.9.RELEASE-sources.jar! /org/springframework/aop/interceptor/AsyncExecutionInterceptor.java

/**
     * Intercept the given method invocation, submit the actual calling of the method to
     * the correct task executor and return immediately to the caller.
     * @param invocation the method to intercept and make asynchronous
     * @return {@link Future} if the original method returns {@code Future}; {@code null}
     * otherwise.
     */
    @Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }

        Callable<Object> task = new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                try {
                    Object result = invocation.proceed();
                    if (result instanceof Future) {
                        return ((Future<?>) result).get();
                    }
                }
                catch (ExecutionException ex) {
                    handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
                }
                catch (Throwable ex) {
                    handleError(ex, userDeclaredMethod, invocation.getArguments());
                }
                return null;
            }
        };

        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

AsyncExecutionAspectSupport.determineAsyncExecutor

spring-aop-4.3.9.RELEASE-sources.jar! /org/springframework/aop/interceptor/AsyncExecutionAspectSupport.java

**
     * Determine the specific executor to use when executing the given method.
     * Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
     * @return the executor to use (or {@code null}, but just if no default executor is available)
     */
    protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
        AsyncTaskExecutor executor = this.executors.get(method);
        if (executor == null) {
            Executor targetExecutor;
            String qualifier = getExecutorQualifier(method);
            if (StringUtils.hasLength(qualifier)) {
                targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
            }
            else {
                targetExecutor = this.defaultExecutor;
                if (targetExecutor == null) {
                    synchronized (this.executors) {
                        if (this.defaultExecutor == null) {
                            this.defaultExecutor = getDefaultExecutor(this.beanFactory);
                        }
                        targetExecutor = this.defaultExecutor;
                    }
                }
            }
            if (targetExecutor == null) {
                return null;
            }
            executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                    (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
            this.executors.put(method, executor);
        }
        return executor;
    }

@Aync annotation has a value that can indicate which executor to use, and getExecutorQualifier here is looking for this identifier.

Here, if the defaultExecutor is null, get to find the default executor.

/**
     * Retrieve or build a default executor for this advice instance.
     * An executor returned from here will be cached for further use.
     * <p>The default implementation searches for a unique {@link TaskExecutor} bean
     * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
     * If neither of the two is resolvable, this implementation will return {@code null}.
     * @param beanFactory the BeanFactory to use for a default executor lookup
     * @return the default executor, or {@code null} if none available
     * @since 4.2.6
     * @see #findQualifiedExecutor(BeanFactory, String)
     * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
     */
    protected Executor getDefaultExecutor(BeanFactory beanFactory) {
        if (beanFactory != null) {
            try {
                // Search for TaskExecutor bean... not plain Executor since that would
                // match with ScheduledExecutorService as well, which is unusable for
                // our purposes here. TaskExecutor is more clearly designed for it.
                return beanFactory.getBean(TaskExecutor.class);
            }
            catch (NoUniqueBeanDefinitionException ex) {
                logger.debug("Could not find unique TaskExecutor bean", ex);
                try {
                    return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    if (logger.isInfoEnabled()) {
                        logger.info("More than one TaskExecutor bean found within the context, and none is named " +
                                "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
                                "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
                    }
                }
            }
            catch (NoSuchBeanDefinitionException ex) {
                logger.debug("Could not find default TaskExecutor bean", ex);
                try {
                    return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    logger.info("No task executor bean found for async processing: " +
                            "no bean of type TaskExecutor and no bean named 'taskExecutor' either");
                }
                // Giving up -> either using local default executor or none at all...
            }
        }
        return null;
    }

If there is no default task executor defined in the project, a NoSuchBeanDefinitionException will be thrown when getting the bean.

AsyncExecutionInterceptor.getDefaultExecutor

spring-aop-4.3.9.RELEASE-sources.jar! /org/springframework/aop/interceptor/AsyncExecutionInterceptor.java

protected Executor getDefaultExecutor(BeanFactory beanFactory) {
        Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
        return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
    }

AsyncExecutionInterceptor rewrites the getDefaultExecutor method, calling getDefaultExecutor of AsyncExecutionAspectSupport first. If the default cannot be found, here is a new SimpleAsyncTaskExecutor.

Executor shutdown issue

If the executor defined in AsyncConfigurer is not managed by spring, it seems that it will not automatically shutdown when spring context is shut down, which may be a problem.

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
        implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
        //...
}        

spring-context-4.3.9.RELEASE-sources.jar! /org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java

public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
        implements BeanNameAware, InitializingBean, DisposableBean {

    //...
    /**
     * Perform a shutdown on the underlying ExecutorService.
     * @see java.util.concurrent.ExecutorService#shutdown()
     * @see java.util.concurrent.ExecutorService#shutdownNow()
     * @see #awaitTerminationIfNecessary()
     */
    public void shutdown() {
        if (logger.isInfoEnabled()) {
            logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
        }
        if (this.waitForTasksToCompleteOnShutdown) {
            this.executor.shutdown();
        }
        else {
            this.executor.shutdownNow();
        }
        awaitTerminationIfNecessary();
    }

    /**
     * Wait for the executor to terminate, according to the value of the
     * {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"} property.
     */
    private void awaitTerminationIfNecessary() {
        if (this.awaitTerminationSeconds > 0) {
            try {
                if (!this.executor.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Timed out while waiting for executor" +
                                (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
                    }
                }
            }
            catch (InterruptedException ex) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Interrupted while waiting for executor" +
                            (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
                }
                Thread.currentThread().interrupt();
            }
        }
    }
}        

ExecutorConfigurationSupport implements the DisposableBean interface, rewrites the destory method, and calls shutdown in it.

Therefore, it is best to host the definition of ThreadPoolTaskExecutor to spring, which can optimize shutdown.

Summary

Async comment does not specify executor

  • If AsyncConfigurer does not define an executor, it will look for an executor named taskExecutor managed by spring. If not, it will throw a NoSuchBeanDefinitionException and return null. Then, AsynceExecutionInterceptor. GetDefaultExecutor goes to new a SimpleAsyncTaskExecutor, but this is not managed by spring.
  • If AsyncConfigurer defines executor, this is not spring managed either.

It is not an executor managed by spring, but needs to listen for events and shut down gracefully.

Async comments specify executor

such as

@Async("myTaskExecutor")
public void xxxx(){
    
}

This uses the specified myTaskexecutor instead of the Executor defined in AsyncConfigurer.

It is recommended that async annotations specify task executor, and then getAsyncExecutor of AsyncConfigurer returns null to find the default taskExecutor (A taskExecutor is defined by default in your application to be managed by spring.)

doc