Spring 异步调用,多线程
- 概述
- 快速入门
- 异步回调
- 异步异常处理
- 自定义执行器
1、概述
在日常开发中,我们的逻辑都是同步调用,顺序执行。但是在某些情况下我们希望异步调用,将主线程和部分逻辑分开,以达到程序更快速的执行,提升性能。例如,高并发的接口,用户操作日志等。
异步调用,对应的是同步调用。
- 同步调用:指程序按照 定义顺序 依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行;
- 异步调用:指程序在顺序执行时,不等异步调用返回执行结果,就执行后面的程序。
考虑到异步的可靠性,我们一般会考虑引入消息队列,例如: RabbitMQ、RocketMQ、Kafka 等等。 但是在一些时候,我们不需要如此高的可靠性,可以使用进程内的队列或线程池。
1 | 复制代码public static void main(String[] args) { |
在进程内的队列或者线程池,相对不可靠的原因是,队列和线程池中的任务仅仅存储在内存中,如何JVM进程被异常关闭,将会导致丢失,未被执行。
而分布式消息队列,异步调用会以一个消息的形式,存储在消息服务器上,所以即使JVM进程被异常中断,消息依然在消息服务队列的服务器上
所以使用进程内的队列或者线程池来实现异步调用的话,一定要尽可能的保证JVM进程的优雅关闭,保证他们在关闭前被执行完。
在 Spring Framework 的 Spring Task 模块,提供了 @Async
注解,可以添加在方法上,自动实现该方法的异步调用
简单来说,我们可以像使用 @Transactional
声明式事务,使用SpringTask提供的@Async
注解,声明式异步。而在实现原理上,也是基于 Spring AOP 拦截,实现异步提交该操作到线程池中,达到异步调用的目的。
2、快速入门
2.1 引入依赖
1 | 复制代码<?xml version="1.0" encoding="UTF-8"?> |
因为 Spring Task 是 Spring Framework 的模块,所以在我们引入 spring-boot-web
依赖后,无需特别引入它。
2.2 Application
创建Application类,添加@EnableAsync 开启 @Async 的支持
1 | 复制代码@SpringBootApplication |
- 在类上添加
@EnableAsync
注解,启用异步功能。
2.3 DemoService
1 | 复制代码package cn.iocoder.springboot.lab29.asynctask.service; |
- 定义
execute01
和execute02
方法,分别模拟sleep 10秒和5秒。 - 同时在方法中,使用
logger
打印日志,方便我们看到每个方法的执行时间,和执行的线程
2.4 同步调用测试
编写DemoServiceTest
测试类,添加#task01()
方法,同步调用上述方法,代码如下:
1 | 复制代码@RunWith(SpringRunner.class) |
运行单元测试,打印日志如下:
1 | 复制代码2020-06-02 09:16:03.391 INFO 3108 --- [ main] c.i.s.l.a.service.DemoServiceTest : [task01][开始执行] |
- 两个方法都按顺序执行,执行时间15秒。
- 都在主线程执行。
2.5 异步调用测试
修改DemoServiceTest
,增加 execute01Async()
和execute02Async()
异步调用方法,代码:
1 | 复制代码 @Async |
- 在
execute01Async()
和execute01Async()
上,添加@Async
实现异步调用
修改DemoServiceTest
类, 编写 #task02()
方法,异步调用上述的两个方法。
1 | 复制代码 @Test |
打印日志:
1 | 复制代码2020-06-02 10:57:41.643 INFO 14416 --- [main] c.i.s.l.a.service.DemoServiceTest : [task02][开始执行] |
- DemoService 的两个方法,异步执行,所以主线程只消耗 39毫秒左右。注意,实际这两个方法,并没有执行完成。
- DemoService 的两个方法,都在异步线程池中执行。
2.6 等待异步调用完成测试
在上面的**【2.5 异步调用测试】**异步调用中,两个方法只是异步调用,方法没有执行完。在一些业务场景中,我们达到异步调用效果,同时主线程有返回结果,就需要主线程阻塞等待异步调用的结果。
修改DemoService
,添加execute01AsyncWithFuture()
和execute01AsyncWithFuture()
异步调用,并返回 Future 对象 。代码:
1 | 复制代码 @Async |
- 在这里两个异步方法中,添加了
AsyncResult.forValue(this.execute02());
,返回带有执行结果的Future对象
修改DemoServiceTest
类, 编写 #task02()
方法,异步调用上述的两个方法,并阻塞线程等待异步调用返回结果
代码:
1 | 复制代码 @Test |
- 异步调用两个方法,并返回对应Future对象。这两个的异步调用逻辑,可以并行执行。
- Future对象的
get()
方法,效果:阻塞线程等待返回结果。
打印日志:
1 | 复制代码2020-06-02 13:56:43.955 INFO 7828 --- [ main] c.i.s.l.a.service.DemoServiceTest : [task03][开始执行] |
- 两个异步调用方法,分别由线程池
task-1
和task-2
同时执行。 因为主线程阻塞等待执行结果 ,执行时间10秒,当同时有多个异步调用,线程阻塞等待,执行时间由消耗最长的异步调用逻辑所决定。
2.7 应用配置文件
在application中,添加spring Task配置
1 | 复制代码spring: |
- Spring 本身依赖了Spring Task
- 在
spring.task.execution
配置项, Spring Task 调度任务的配置 ,对应TaskExecutionProperties
配置类 - Spring Boot TaskExecutionAutoConfiguration 自动化配置类, 实现了Spring Task 自动配置,创建了
ThreadPoolTaskExecutor
基于线程池的任务执行器,实际上ThreadPoolTaskExecutor
就是ThreadPoolExecutor
的分装,主要增加执行任务,并返回 ListenableFuture 对象功能。
之前说的异步的可靠性,要优雅的关闭进程。spring.task.execution.shutdown
配置关闭,是为了实现Spring Task
的优雅关闭。异步任务在执行过程中,如果应用开始关闭,异步任务需要使用的Bean
被销毁,例如:需要访问数据库连接池,这时候异步任务还在执行中,一旦需要访问数据库,但是没有对应的Bean将会导致报错。
- 通过配置
await-termination: true
,实现在应用关闭时,等待异步任务执行完成。这样在应用关闭时,Spring 会等待ThreadPoolTaskExecutor
执行完任务,再销毁Bean
。 - 应用关闭时,在某些业务场景下我们不可能让Spring一直等待,异步任务的完成。通过配置
await-termination-period: 60
,设置Spring最大等待时间,时间一到将不再等待异步任务完成。
3、异步回调
业务场景中,执行完异步任务,可能需回调。下面介绍异步执行完成后,实现自定义回调。
3.1、AsyncResult 源码解释
在 2.6 等待异步调用完成 中,我们看到的 AsyncResult类 表示异步结果。返回结果分为两种情况:
- 执行成功时,调用
AsyncResult#forValue(V value)
静态方法,返回成功的 ListenableFuture对象,
源码:
1 | 复制代码 /** |
- 执行异常时,调用
AsyncResult#forExecutionException(Throwable ex)
静态方法,返回异常的 ListenableFuture 对象。源码:
1 | 复制代码 /** |
AsyncResult 同时也实现了 ListenableFuture接口,提供异步执行结果回调处理。
1 | 复制代码public class AsyncResult<V> implements ListenableFuture<V> |
ListenableFuture接口,源码:
1 | 复制代码public interface ListenableFuture<T> extends Future<T> { |
ListenableFuture继承了Future,所以AsyncResult 也实现了Future的接口,源码:
1 | 复制代码public interface Future<V> { |
AsyncResult 中对addCallback(...)
方法回调的实现,源码:
1 | 复制代码 @Override |
- 从 ListenableFutureCallback 知道 ,ListenableFutureCallback 接口同时继承了 SuccessCallback、FailureCallback接口
1 | 复制代码public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback |
- 《1》,如果是异常处理结果调用 failureCallback回调
- 《2》,如果是成功处理结果调用successCallback回调
- 《3》,如果回调逻辑发生异常,直接忽略。假设多个回调,其中一个出现议程,不会影响其他的回调。
实际上,AsyncResult 是作为异步执行的结果。既然是结果,执行就已经完成。所以,在我们调用 #addCallback(...)
接口方法来添加回调时,必然直接使用回调处理执行的结果。
AsyncResult 对 Future 定义的所有方法,实现代码如下:
1 | 复制代码// AsyncResult.java |
3.2 ListenableFutureTask
在我们调用使用 @Async
注解的方法时,如果方法返回的类型是 ListenableFuture 的情况下,实际方法返回的是 ListenableFutureTask 对象。
ListenableFutureTask 类,也实现 ListenableFuture 接口,继承 FutureTask 类,ListenableFuture 的 FutureTask 实现类。
ListenableFutureTask 对 ListenableFuture 定义的 #addCallback(...)
方法,实现源码如下:
1 | 复制代码private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>(); |
- 可以看到在
ListenableFutureTask
中,暂存回调到ListenableFutureCallbackRegistry
中
ListenableFutureTask 对 FutureTask 已实现的 #done()
方法,进行重写。实现源码如下:
1 | 复制代码@Override |
3.3 具体示例
修改 DemoService 的代码,增加 #execute02()
的异步调用,并返回 ListenableFuture 对象。代码如下:
1 | 复制代码@Async |
- 根据执行的结果,包装出成功还是异常的 AsyncResult 对象。
DemoServiceTest 测试类,编写 #task04()
方法,异步调用上述的方法,在塞等待执行完成的同时,添加相应的回调 Callback 方法。代码:
1 | 复制代码@Test |
<1>
处,调用DemoService#execute01AsyncWithListenableFuture()
方法,异步调用该方法,并返回 ListenableFutureTask 对象。这里,我们看下打印的日志。
1 | 复制代码2020-06-08 14:13:16.738 INFO 5060 --- [ main] c.i.s.l.a.service.DemoServiceTest : [task04][execute01Result 的类型是:(ListenableFutureTask)] |
<2.1>
处,增加成功的回调和失败的回调。<2.2>
处,增加成功和失败的统一回调。<3>
处,阻塞等待结果。执行完成后,我们会看到回调被执行,打印日志如下:
1 | 复制代码2020-06-08 14:13:21.752 INFO 5060 --- [ main] c.i.s.l.a.service.DemoServiceTest : [task04][结束执行,消耗时长 5057 毫秒] |
- 异步异常处理器
通过实现 AsyncUncaughtExceptionHandler 接口,达到对异步调用的异常的统一处理。
创建 GlobalAsyncExceptionHandler 类,全局统一的异步调用异常的处理器。代码:
1 | 复制代码@Component |
- 类上,我们添加了
@Component
注解,考虑到胖友可能会注入一些 Spring Bean 到属性中。 - 实现
#handleUncaughtException(Throwable ex, Method method, Object... params)
方法,打印异常日志。
注意,AsyncUncaughtExceptionHandler 只能拦截返回类型非 Future 的异步调用方法。通过看 AsyncExecutionAspectSupport#handleError(Throwable ex, Method method, Object... params)
的源码,可以很容易得到这个结论,代码:
1 | 复制代码// AsyncExecutionAspectSupport.java |
- 对了,AsyncExecutionAspectSupport 是 AsyncExecutionInterceptor 的父类哟。所以哟,返回类型为 Future 的异步调用方法,需要通过「3. 异步回调」来处理。
4.2 AsyncConfig
创建 AsyncConfig 类,配置异常处理器。代码:
1 | 复制代码@Configuration |
- 在类上添加
@EnableAsync
注解,启用异步功能。这样「2. Application」 的@EnableAsync
注解,也就可以去掉了。 - 实现 AsyncConfigurer 接口,实现异步相关的全局配置。 此时此刻,胖友有没想到 SpringMVC 的 WebMvcConfigurer 接口。
- 实现
#getAsyncUncaughtExceptionHandler()
方法,返回我们定义的 GlobalAsyncExceptionHandler 对象。 - 实现
#getAsyncExecutor()
方法,返回 Spring Task 异步任务的默认执行器。这里,我们返回了null
,并未定义默认执行器。所以最终会使用 TaskExecutionAutoConfiguration 自动化配置类创建出来的 ThreadPoolTaskExecutor 任务执行器,作为默认执行器。
4.3 DemoService
DemoService 类,增加 #zhaoDaoNvPengYou(...)
的异步调用。代码如下:
1 | 复制代码@Async |
4.4 简单测试
1 | 复制代码 @Test |
运行单元测试,执行日志如下:
1 | 复制代码2020-06-08 15:26:35.120 ERROR 11388 --- [ task-1] .i.s.l.a.c.a.GlobalAsyncExceptionHandler : [handleUncaughtException][method(public java.lang.Integer cn.iocoder.springboot.lab29.asynctask.service.DemoService.zhaoDaoNvPengYou(java.lang.Integer,java.lang.Integer)) params([1, 2]) 发生异常] |
- 自定义执行器
在 上面 中,我们使用 Spring Boot TaskExecutionAutoConfiguration 自动化配置类,实现自动配置 ThreadPoolTaskExecutor 任务执行器。
本小节,我们希望两个自定义 ThreadPoolTaskExecutor 任务执行器,实现不同方法,分别使用这两个 ThreadPoolTaskExecutor 任务执行器。
5.1 引入依赖
1 | 复制代码<?xml version="1.0" encoding="UTF-8"?> |
- 和 上面引入依赖 一致。
5.2 应用配置文件
在 application.yml
中,添加 Spring Task 定时任务的配置,如下:
1 | 复制代码spring: |
- 在
spring.task
配置项下,我们新增了execution-one
和execution-two
两个执行器的配置。在格式上,我们保持和在「2.7 应用配置文件」看到的spring.task.exeuction
一致,方便我们后续复用 TaskExecutionProperties 属性配置类来映射。
5.3 AsyncConfig
创建 AsyncConfig 类,配置两个执行器。代码如下:
1 | 复制代码@Configuration |
- 参考 Spring Boot TaskExecutionAutoConfiguration 自动化配置类,我们创建了 ExecutorOneConfiguration 和 ExecutorTwoConfiguration 配置类,来分别创建 Bean 名字为
executor-one
和executor-two
两个执行器。
5.4 DemoService
1 | 复制代码@Service |
- 在
@Async
注解上,我们设置了其使用的执行器的 Bean 名字。
5.5 简单测试
1 | 复制代码@RunWith(SpringRunner.class) |
运行单元测试,执行日志如下:
1 | 复制代码2020-06-08 15:38:28.846 INFO 12020 --- [ task-one-1] c.i.s.l.asynctask.service.DemoService : [execute01] |
- 从日志中,我们可以看到,
#execute01()
方法在executor-one
执行器中执行,而#execute02()
方法在executor-two
执行器中执行。
本文转载自: 掘金