Spring 源码阅读 75:@EnableAsync 分析
基于 Spring Framework v5.2.6.RELEASE
概述
Spring 终有一种非常简便的方法使 Bean 中的一个方法变成异步执行的方法,那就是在方法上标记 @Async 注解,想要开启这一特性,需要在一个配置类上标记 @EnableAsync 注解。
本文将通过源码分析 @EnableAsync 注解是如何开启这一特性的。
@EnableAsync 分析
@EnableAsync 注解的源码如下。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
Class<? extends Annotation> annotation() default Annotation.class;
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
注解的每一个属性都指定了默认值,后续的分析也会基于默认的属性值进行分析。除此之外,注解上的 @Import 元注解引入了 AsyncConfigurationSelector 类。
从它的类关系中可以看出,AsyncConfigurationSelector 实现了 ImportSelector 接口,因此,当 Spring 扫描到配置类后,会执行它的 selectImports 方法,获取一个包含配置类名称的数组,用于加载对应的配置。
AsyncConfigurationSelector 虽然也包含了selectImports
方法,但是从参数类型中可以看出它不是接口中的selectImports
方法的实现方法,要找到接口中的实现方法,我们需要去 AsyncConfigurationSelector 的父类 AdviceModeImportSelector 中。
@Override
public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (attributes == null) {
throw new IllegalArgumentException(String.format(
"@%s is not present on importing class '%s' as expected",
annType.getSimpleName(), importingClassMetadata.getClassName()));
}
AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
String[] imports = selectImports(adviceMode);
if (imports == null) {
throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
}
return imports;
}
这个方法中,主要是从 @EnableAsync 注解获取各项属性的值,然后使用adviceMode
属性,调用另一个selectImports
方法获取最终的结果。
此处被调用的selectImports
方法,就是 AsyncConfigurationSelector 中的 selectImports
方法。
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
在 @EnableAsync 注解中,mode的默认值是AdviceMode.PROXY
,因此,这里引入的配置类是 ProxyAsyncConfiguration。
接下来分析 ProxyAsyncConfiguration 类。
ProxyAsyncConfiguration 分析
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
在 ProxyAsyncConfiguration 中,只有一个 Bean 配置,类型是 AsyncAnnotationBeanPostProcessor,由此可以知道,@EnableAsync 所开启的功能,是通过 Bean 的后处理器来实现的。
上述的方法体中,通过构造方法创建了 AsyncAnnotationBeanPostProcessor 对象。
public AsyncAnnotationBeanPostProcessor() {
setBeforeExistingAdvisors(true);
}
构造方法中设置了一个属性值,这个属性是是beforeExistingAdvisors
,定义在父类 AbstractAdvisingBeanPostProcessor 中,这个属性的默认值是false,当它的值为true时,会将新的增强逻辑添加到增强逻辑列表的开头而不是最后。
也就是说,@EnableAsync 提供的异步执行特性,是基于 AOP 特性来实现的。
接着往下看,在创建了 AsyncAnnotationBeanPostProcessor 对象之后,为其配置了一些属性,有一些属性的值是从 @EnableAsync 属性值获取的,还有两个属性值需要留意,就是this.executor
和this.exceptionHandler
,这两个成员变量的值是从哪儿来的呢?
我们可以找到 ProxyAsyncConfiguration 的父类 AbstractAsyncConfiguration,其中有一个标记了 @Autowired 注解的方法。
// org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}
如果我们自己配置了线程池和异常处理器,则会在这里执行配置,这样,我们配置的线程池和异常处理器就会被添加到 AsyncAnnotationBeanPostProcessor 中。
接下来,我们再分析 AsyncAnnotationBeanPostProcessor 后处理器是如何工作的。
AsyncAnnotationBeanPostProcessor 分析
从它的类继承关系中可以看出,它是一个基于 AOP 特性来为 Bean 中的方法提供异步执行功能的 Bean 后处理器。
AsyncAnnotationBeanPostProcessor 同时实现了 BeanFactoryAware 接口,在它的setBeanFactory
方法中,完成了 Advisor 的创建。
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
这里创建的 Advisor 类型是 AsyncAnnotationAdvisor,创建完之后,它被复制给了advisor
成员变量,这个成员变量定义在 AsyncAnnotationBeanPostProcessor 的父类 AbstractBeanFactoryAwareAdvisingPostProcessor 中。这个advisor
成员变量就是处理增强逻辑的对象。
AsyncAnnotationAdvisor 分析
关于 Spring 是如何在后处理器中为 Bean 创建代理对象以及如何向代理对象中加入增强逻辑的,我之前的文章有很详细的分析,可以阅读之前关于 AOP 原理的分析文章来了解。下面我们直接分析 AsyncAnnotationAdvisor,它是完成方法异步执行的核心。
一个 Advisor 通常有两个非常重要的部分,一个是 Pointcut,用于匹配需要增强的方法,另一个是 Advice 也就是具体的增强逻辑。对于 AsyncAnnotationAdvisor 来说,这两个部分都是在它的构造方法中构建的。
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
其中可以看到两行关键的代码,他们分别完成了advice
和pointcut
成员变量的构建。
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
下面分别来看这两部分。
Advice 构建
先看buildAdvice
方法。
// org.springframework.scheduling.annotation.AsyncAnnotationAdvisor#buildAdvice
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
Advice 的构建比较简单,这里可以看到,最终构建的 Advice 是一个 AnnotationAsyncExecutionInterceptor 类型的拦截器,除了调用构造方法创建之外,还配置了executor
和exceptionHandler
,这个拦截器应该就是完成 AOP 增强逻辑的拦截器,我们放到后文中分析。
Pointcut 构建
下面再看buildPointcut
方法。
// org.springframework.scheduling.annotation.AsyncAnnotationAdvisor#buildPointcut
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
这个方法的逻辑比较简单,首先创建了两个 Pointcut 对象,cpc
用于匹配类型,mpc
用于匹配方法,他们的逻辑都很简单,就是看类或者方法的定义是否包含 @Async 注解。
最后再将两者合并为一个 ComposablePointcut 对象返回,ComposablePointcut 的作用就是将多个 Pointcut 对象合并成一个。
AnnotationAsyncExecutionInterceptor 分析
了解完上面的内容,接下来就开始分析 AnnotationAsyncExecutionInterceptor 拦截器。它是一个包含 AOP 增强逻辑的拦截器,也是完成方法异步调用的核心逻辑。
AnnotationAsyncExecutionInterceptor 要完成它的任务,有两个比较核心的功能,一个是目标方法的匹配,另一个就是拦截器的逻辑。目标方法的匹配逻辑,我们在上文中已经介绍过了,以下主要分析其拦截器逻辑,也就是它的invoke
方法。
以上是 AnnotationAsyncExecutionInterceptor 的类关系图,它实现了 MethodInterceptor 接口,invoke
方法的实现在父类 AsyncExecutionInterceptor 中。
// org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
从上面的源码中可以看到三个关键的步骤:
- 找到目标方法,并根据目标方法获取到执行它的 AsyncTaskExecutor。
- 将目标方法的调用,封装到一个 Callable 异步任务
task
当中。
- 通过doSubmit方法来异步调用上一步封装的
task
。
下面我们详细分析这三个步骤。
AsyncTaskExecutor 查找
AsyncTaskExecutor 在determineAsyncExecutor
方法中完成。
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
首先会从executors
中根据方法获取对应的 AsyncTaskExecutor,executors
是一个用来缓存 Executor 的成员变量。
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
当第一次进入这个方法的时候,executors
肯定是空的,因此会进入if
语句的逻辑获取 Executor 然后再将其添加到executors
中。在if语句中,首先会通过getExecutorQualifier
方法获取一个qualifier
,我们进入方法查看获取的过程。
// org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier
@Override
@Nullable
protected String getExecutorQualifier(Method method) {
// Maintainer's note: changes made here should also be made in
// AnnotationAsyncExecutionAspect#getExecutorQualifier
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
return (async != null ? async.value() : null);
}
这个方法会从目标方法或者其所在的类型上的 @Async 注解的value
属性,作为方法的返回值复制给qualifier
。这个qualifier
的值是一个 Executor 的 Bean 名称,也就是说,我们可以通过 @Async 的value
属性指定执行异步任务的 Executor 的 Bean 名称。
如果qualifier
不是空的,那么,就会通过findQualifiedExecutor方法从 Spring 容器中获取对应的 Executor 实例。
// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#findQualifiedExecutor
@Nullable
protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {
if (beanFactory == null) {
throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +
" to access qualified executor '" + qualifier + "'");
}
return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
}
如果qualifier
是空的,那么就会通过this.defaultExecutor.get()
获取默认的 Executor,那么,默认的 Executor 是什么呢?我们需要在去 AsyncAnnotationAdvisor 的buildAdvice
方法中,回顾一下 AnnotationAsyncExecutionInterceptor 创建的过程。
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
以上是 AnnotationAsyncExecutionInterceptor 创建的语句,从这里找到对应的构造方法。
public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
super(defaultExecutor);
}
构造方法需要提供一个默认的 Executor,也就是defaultExecutor
参数,这里提供了null
,不过我们可以继续查看父类的构造方法。
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
}
在被调用的 AsyncExecutionAspectSupport 的构造方法中,通过getDefaultExecutor
方法,提供了默认的 Executor。
// org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
这里看到,默认的 Executor 是一个 SimpleAsyncTaskExecutor,也就是说,如果我们没有在项目中配置线程池,则默认使用 SimpleAsyncTaskExecutor 来执行异步任务。
Callable 任务封装
得到 Executor 之后,就是任务的封装,这一步很简单,就是将目标方法的调用放到一个 Callable 类型的任务的call
方法中。
doSubmit 异步执行方法
最后一步就是任务的提交,通过doSubmit
方法完成。
// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
其实就是调用了 Executor 的submit
异步执行了任务。
不过这里有一点要说明,虽然在我们没有配置 Excutor 的情况下 ,Spring 会使用默认的 SimpleAsyncTaskExecutor 来执行异步任务,但是 SimpleAsyncTaskExecutor 会为每一个任务创建一个新的线程,而不是使用线程池来完成,很容易导致内存溢出,因此,在实践中最好为异步任务配置合适的线程池。
总结
本文以 @EnableAsync 作为切入点,分析了 Spring 开启基于注解的异步任务特性的原理。
转载自:https://juejin.cn/post/7169411700674789407