记一次奇葩的Async注解失效事件

  • 场景还原

事情是这个样子的,日常看git提交记录摸鱼的时候,看到同事提交了一段代码,简化代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Configuration
public class BaseConfig implements AsyncConfigurer {

@Autowired
private AsyncService asyncService;

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(50);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("defaultExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return ((Throwable var1, Method var2, Object... var3)->{
this.asyncService.Print();
log.info("怎么又是你!");
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Slf4j
@Component
public class AsyncServiceImpl implements AsyncService {

@Async
@Override
public void asyncPrint() {
Thread thread = Thread.currentThread();
log.info("current thread id:[{}], name:[{}]", thread.getId(), thread.getName());
}
@Override
public void Print() {
log.info("打印。。。");
}
}

代码就是实现了AsyncConfigurer这个接口,然后重写了getAsyncExecutorgetAsyncUncaughtExceptionHandler这两个方法,前一个是自定义异步任务的线程池,后者是可以在异步任务发生异常的时候捕捉到做一些特殊处理。AsyncService代码很简单就是打印了一下当前线程的标识,然后我尝试写个测试类测试一下handler这个方法是否可以在异步的asyncPrint执行之后抓到异常,测试类如下,只是简单的调用一下异步方法,然后在异步方法里面加一个int i = 1/0;让程序抛错:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@SpringBootTest(classes = EvtwboApplication.class)
@RunWith(SpringRunner.class)
public class SpringDemoTest {

@Autowired
private AsyncService asyncService;

@Test
public void testAsync(){
this.asyncService.asyncPrint();
}
}

测试方法执行后输出如下日志:

1
2
3
4
5
6
java复制代码2021-01-30 21:06:05.452  INFO 8320 --- [           main] com.demo.service.impl.AsyncServiceImpl   : current thread id:[1], name:[main]

java.lang.ArithmeticException: / by zero

at com.demo.service.impl.AsyncServiceImpl.asyncPrint(AsyncServiceImpl.java:17)
at com.demo.config.SpringDemoTest.testAsync(SpringDemoTest.java:20)

结果很奇怪,异常并没有被之前写的异常处理方法捕捉到,而且可以注意到异步service打印的日志打印出了主线程main,这说明是主线程执行了这段代码,@Async注解并没有生效。

先说下结论再分析吧,其实就是bean注入顺序的问题,在配置类里面注入了业务类,导致了业务类在BeanPostProcessor实例化之前就实例化了,这个时候通过aop生成代理对象来起作用的注解就会失效。

  • 问题分析

要想知道为什么@Async注解为什么没有生效,那我们得先知道它是怎么对我们的方法起作用的。
在使用@Async这个注解之前,我们会先在一个配置类上面添加一个@EnableAsync注解,代码类似:

1
2
3
4
5
6
7
8
java复制代码@EnableAsync
@SpringBootApplication
public class EvtwboApplication {

public static void main(String[] args) {
new SpringApplication(EvtwboApplication.class).run(args);
}
}

现在的很多框架里面都会使用这种类似的注解,看起来像是个开关一样,实际上也是如此,通过这样一个注解会import一些bean,使像@Async这样的自定义注解生效,我们点进去看一下

1
2
3
4
5
6
7
java复制代码@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
AdviceMode mode() default AdviceMode.PROXY;
}

这个注解里面还有一些属性,暂时用不到,先不看,列出的这个属性我们可以看到AdviceMode默认是AdviceMode.PROXY,上面的元注解@Import(AsyncConfigurationSelector.class)就是它导入的类,我们点进这个类看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}

}

这边因为咱们没有给AdviceMode赋值,所以会走上面一个分支,返回的是一个ProxyAsyncConfiguration,也就是它实际上导入的类,点进这个类看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}

可以看到它是一个配置类,导入了AsyncAnnotationBeanPostProcessor,并且beannameinternalAsyncAnnotationProcessor,看名字很明显这是一个BeanPostProcessor,看那一下这个接口

1
2
3
4
5
6
7
8
9
10
11
java复制代码public interface BeanPostProcessor {
@Nullable
default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}

@Nullable
default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
}

只有两个默认方法,方法名还是很好了解的postProcessBeforeInitialization这个方法实在bean初始化前调用的,可以对创建出来的bean做一些处理,而postProcessAfterInitialization自然就是在bean初始化之后调用的,这两个方法都是在对象实例创建之后initializeBean中调用初始方法前后的埋点,由于这些知识属于spring的refresh过程,这里不展开,稍微提一下下面会用到的知识点,首先是AbstractApplicationContextrefresh方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Override
public void refresh() throws BeansException, IllegalStateException {
prepareRefresh();
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
prepareBeanFactory(beanFactory);
postProcessBeanFactory(beanFactory);
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
registerBeanPostProcessors(beanFactory);
initMessageSource();
initApplicationEventMulticaster();
onRefresh();
registerListeners();
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);
finishRefresh();
}

代码里面移除了一些异常处理及无关的注释,想要了解的可以直接到源码里面看,这个refresh便是我们的applicationcontext容器的核心启动代码了,现在我们看一下留下注释的这两个方法。首先是registerBeanPostProcessors,看注释可以知道这是在注册BeanPostProcessors用来在bean实例化过程中拦截的一些埋点,我们可以跟进去看一下,最终是在PostProcessorRegistrationDelegateregisterBeanPostProcessors方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public static void registerBeanPostProcessors(
ConfigurableListableBeanFactory beanFactory, AbstractApplicationContext applicationContext) {
// First, register the BeanPostProcessors that implement PriorityOrdered.

// Next, register the BeanPostProcessors that implement Ordered.
List<BeanPostProcessor> orderedPostProcessors = new ArrayList<>(orderedPostProcessorNames.size());
for (String ppName : orderedPostProcessorNames) {
BeanPostProcessor pp = beanFactory.getBean(ppName, BeanPostProcessor.class);
orderedPostProcessors.add(pp);
if (pp instanceof MergedBeanDefinitionPostProcessor) {
internalPostProcessors.add(pp);
}
}
sortPostProcessors(orderedPostProcessors, beanFactory);
registerBeanPostProcessors(beanFactory, orderedPostProcessors);

// Now, register all regular BeanPostProcessors.

// Finally, re-register all internal BeanPostProcessors.
// Re-register post-processor for detecting inner beans as ApplicationListeners,
// moving it to the end of the processor chain (for picking up proxies etc).
beanFactory.addBeanPostProcessor(new ApplicationListenerDetector(applicationContext));
}

可以看到他是将先注册实现了PriorityOrdered接口的类,然后是实现了Ordered接口的,接着是其他的,我们可以看到AsyncAnnotationBeanPostProcessor是实现了Ordered接口的,所以会在第二个分支里面注册

从源码里面可以看到是先beanFactory.getBean(ppName, BeanPostProcessor.class);先创建这个实例,然后registerBeanPostProcessors(beanFactory, orderedPostProcessors);,里面实际做的是beanFactory.addBeanPostProcessor(postProcessor);就是把这个实例加到容器中去。那现在这个BeanPostProcessor已经注册完了,我们在看一下它是在哪里起作用的,源码是在refresh里面的finishBeanFactoryInitialization方法里,看注释可以知道这里是实际上初始化非懒加载的单例的,往里面跟,最终是到AbstractAutowireCapableBeanFactorydoCreateBean方法,其实之前的AsyncAnnotationBeanPostProcessor也是通过这个方法创建的


我们可以在这个方法里面看到有这么一句exposedObject = initializeBean(beanName, exposedObject, mbd);,这边就是初始化bean的地方,点进去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码protected Object initializeBean(final String beanName, final Object bean, @Nullable RootBeanDefinition mbd) {

Object wrappedBean = bean;
if (mbd == null || !mbd.isSynthetic()) {
wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
}

invokeInitMethods(beanName, wrappedBean, mbd);
if (mbd == null || !mbd.isSynthetic()) {
wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);
}

return wrappedBean;
}

很明显applyBeanPostProcessorsBeforeInitializationapplyBeanPostProcessorsAfterInitialization这两个方法就是调用我们的AsyncAnnotationBeanPostProcessor的方法,实际上我们的后置处理器是在初始化之后做代理的,我们可以跟进去看一下


可以看到这边AsyncAnnotationBeanPostProcessor可以生成一个对象替代原来的对象,也就是所谓的代理,继续往里面看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码    public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor != null && !(bean instanceof AopInfrastructureBean)) {
//这边针对已经代理过的对象,将这个通知加到通知链里面去
if (bean instanceof Advised) {
Advised advised = (Advised)bean;
if (!advised.isFrozen() && this.isEligible(AopUtils.getTargetClass(bean))) {
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
} else {
advised.addAdvisor(this.advisor);
}

return bean;
}
}

if (this.isEligible(bean, beanName)) {
//这边会生成一个代理对象
ProxyFactory proxyFactory = this.prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
this.evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}

proxyFactory.addAdvisor(this.advisor);
this.customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(this.getProxyClassLoader());
} else {
return bean;
}
} else {
return bean;
}
}

这边可以看到是生成了一个代理对象,并携带了通知,这个通知实际上是AsyncAnnotationAdvisor,我们看一下这个类的buildAdvice方法

1
2
3
4
5
6
7
8
java复制代码	protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//这边的两个参数明显就是咱们asyncconfig中的两个方法定义的了
interceptor.configure(executor, exceptionHandler);
return interceptor;
}

原来这个通知本质上是一个拦截器,我们在看一下这个拦截器AnnotationAsyncExecutionInterceptorinvoke方法,实际在他的父类AsyncExecutionInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码    @Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = this.determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");
} else {
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future)result).get();
}
} catch (ExecutionException var4) {
this.handleError(var4.getCause(), userDeclaredMethod, invocation.getArguments());
} catch (Throwable var5) {
this.handleError(var5, userDeclaredMethod, invocation.getArguments());
}

return null;
};
return this.doSubmit(task, executor, invocation.getMethod().getReturnType());
}
}

源码很好理解就是创建了一个异步任务用来执行指定的方法,然后把它交给线程池去执行,这就是@Async注解实际上干的事了。

看到这里,你可能会有疑问,这看起来好像和上面的问题关系不大呀,别急,我们先回到AsyncAnnotationBeanPostProcessor注册的地方去看一下,源码地址PostProcessorRegistrationDelegate#registerBeanPostProcessors,到BeanPostProcessor pp = beanFactory.getBean(ppName, BeanPostProcessor.class);这边,也就是在创建AsyncAnnotationBeanPostProcessor这个对象并将它注册到ioc容器中区,然后后面初始化AsyncServiceImpl的时候就可以将已经创建的实例替换成带有通知的代理对象了,接着我们再往下debug一步,奇怪的事情发生了,源码跳到了咱们之前debug的AbstractAutowireCapableBeanFactorydoCreateBean这个方法的地方,并且此时创建的对象竟然是asyncServiceImpl

咱们想一想,明明这个时候还在创建AsyncAnnotationBeanPostProcessor,为什么中途来创建asyncServiceImpl了,而且如果这个时候就创建这个实体,当它实例化的时候,咱们的后置处理器还没有注册到beanFactory中去,这个时候放到代理对象里面肯定是不携带使异步生效的advisor了,@Async注解也就没有办法生效了,那么它到底为什么会走到这里呢?我们可以看一下方法的调用栈,发现在创建asyncServiceImpl之前先创建了baseConfig这个对象,根据咱们之前看到的源码,这样也是合理的,因为baseConfig中注入了asyncServiceImpl,所以在baseConfig实例化的时候也会先实例化asyncServiceImpl,这段源码在AbstractAutowireCapableBeanFactorypopulateBean方法中,是依赖注入的地方,在bean初始化之前,有兴趣的可以看看,在往前看发现又创建了org.springframework.scheduling.annotation.ProxyAsyncConfiguration这个对象,这个对象好像是有点印象,它就是导入AsyncAnnotationBeanPostProcessor的类,其实在spring单例的初始化过程中确实会获取工厂bean来通过反射获取到通过@Bean注入的bean,详细代码可以看ConstructorResolverinstantiateUsingFactoryMethod方法,那么现在情况就比较明朗了,在创建AsyncAnnotationBeanPostProcessor的时候实例化了它的工厂类ProxyAsyncConfiguration,然后不知道怎么回事又实例化了baseConfig,又因为注入关系,所以创建了asyncServiceImpl, 那么现在问题就在于为什么创建ProxyAsyncConfiguration的时候会创建baseConfig对象呢?我们继续看调用栈,如下图

仔细看可以发现baseConfig是通过一个方法setConfigurersautowire进来的,我们打开ProxyAsyncConfiguration这个类的结构图看一下

可以很清楚的看到这个setConfigurers这个方法来自与它的父类AbstractAsyncConfiguration,我们点进去看一下

原来在这里setter注入了baseConfig,到现在问题原因就很明了了,就是因为连续的注入导致AsyncAnnotationBeanPostProcessor创建的时候创建了ProxyAsyncConfiguration,然后又创建了baseConfig,紧接着又创建了asyncServiceImpl,然后在asyncServiceImpl创建完实例初始化之后应用BeanPostProcessor的时候少掉了还尚未注册进容器的AsyncAnnotationBeanPostProcessor,因此使得@Async注解失效了。

  • 问题解决

知道了问题的原因,那么解决方法也就很明显了,我们秩序只需要将注入的asyncServiceImpl改为在方法内部通过容器获取即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码@Slf4j
@Configuration
public class BaseConfig implements AsyncConfigurer, ApplicationContextAware {

private static ApplicationContext applicationContext;

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(50);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("defaultExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return ((Throwable var1, Method var2, Object... var3) -> {
AsyncService bean = applicationContext.getBean(AsyncService.class);
bean.Print();
log.info("怎么又是你!");
});
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
BaseConfig.applicationContext = applicationContext;
}
}

执行

1
2
3
4
java复制代码2021-01-30 23:56:34.407  INFO 9304 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService
2021-01-30 23:56:34.420 INFO 9304 --- [faultExecutor-1] com.demo.service.impl.AsyncServiceImpl : current thread id:[17], name:[defaultExecutor-1]
2021-01-30 23:56:34.424 INFO 9304 --- [faultExecutor-1] com.demo.service.impl.AsyncServiceImpl : 打印。。。
2021-01-30 23:56:34.424 INFO 9304 --- [faultExecutor-1] com.demo.config.BaseConfig : 怎么又是你!

完美!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%