springboot异步任务与定时任务 1实现Async异

「这是我参与11月更文挑战的第15天,活动详情查看:2021最后一次更文挑战

1.实现Async异步任务

1.1 环境准备

Spring Boot 入口类上配置 @EnableAsync 注解开启异步处理。创建任务抽象类 AbstractTask,并分别配置三个任务方法 doTaskOne()doTaskTwo()doTaskThree()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public abstract class AbstractTask {
   private static Random random = new Random();

   public void doTaskOne() throws Exception {
       System.out.println("开始做任务一");
       long start = currentTimeMillis();
       sleep(random.nextInt(10000));
       long end = currentTimeMillis();
       System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
  }

   public void doTaskTwo() throws Exception {
       System.out.println("开始做任务二");
       long start = currentTimeMillis();
       sleep(random.nextInt(10000));
       long end = currentTimeMillis();
       System.out.println("完成任务二,耗时:" + (end - start) + "毫秒");
  }

   public void doTaskThree() throws Exception {
       System.out.println("开始做任务三");
       long start = currentTimeMillis();
       sleep(random.nextInt(10000));
       long end = currentTimeMillis();
       System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");
  }
}

1.2 同步调用

下面通过一个简单示例来直观的理解什么是同步调用:

  • 定义 Task 类,继承 AbstractTask,三个处理函数分别模拟三个执行任务的操作,操作消耗时间随机取(10 秒内)。
1
2
3
java复制代码@Component
public class SyncTask extends AbstractTask {
}
  • 单元测试 用例中,注入 SyncTask 对象,并在测试用例中执行 doTaskOne()doTaskTwo()doTaskThree() 三个方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@RunWith(SpringRunner.class)
@SpringBootTest
public class TaskTest {
   @Autowired
   private SyncTask task;

   @Test
   public void testSyncTasks() throws Exception {
       task.doTaskOne();
       task.doTaskTwo();
       task.doTaskThree();
  }
}
  • 执行单元测试,可以看到类似如下输出:
1
2
3
4
5
6
复制代码开始做任务一
完成任务一,耗时:6720毫秒
开始做任务二
完成任务二,耗时:6604毫秒
开始做任务三
完成任务三,耗时:9448毫秒

任务一、任务二、任务三顺序的执行完了,换言之 doTaskOne()doTaskTwo()doTaskThree() 三个方法顺序的执行完成。

1.3 异步调用

上述的 同步调用 虽然顺利的执行完了三个任务,但是可以看到 执行时间比较长,若这三个任务本身之间 不存在依赖关系,可以 并发执行 的话,同步调用在 执行效率 方面就比较差,可以考虑通过 异步调用 的方式来 并发执行

  • 创建 AsyncTask类,分别在方法上配置 @Async 注解,将原来的 同步方法 变为 异步方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Component
public class AsyncTask extends AbstractTask {
   @Async
   public void doTaskOne() throws Exception {
       super.doTaskOne();
  }

   @Async
   public void doTaskTwo() throws Exception {
       super.doTaskTwo();
  }

   @Async
   public void doTaskThree() throws Exception {
       super.doTaskThree();
  }
}
  • 单元测试 用例中,注入 AsyncTask 对象,并在测试用例中执行 doTaskOne()doTaskTwo()doTaskThree() 三个方法。
1
2
3
4
5
6
7
8
9
java复制代码@Autowired
private AsyncTask asyncTask;

@Test
public void testAsyncTasks() throws Exception {
   asyncTask.doTaskOne();
   asyncTask.doTaskTwo();
   asyncTask.doTaskThree();
}
  • 执行单元测试,可以看到类似如下输出:
1
2
3
复制代码开始做任务三
开始做任务一
开始做任务二

如果反复执行单元测试,可能会遇到各种不同的结果,比如:

  1. 没有任何任务相关的输出
  2. 有部分任务相关的输出
  3. 乱序的任务相关的输出

原因是目前 doTaskOne()doTaskTwo()doTaskThree() 这三个方法已经 异步执行 了。主程序在 异步调用 之后,主程序并不会理会这三个函数是否执行完成了,由于没有其他需要执行的内容,所以程序就 自动结束 了,导致了 不完整 或是 没有输出任务 相关内容的情况。

注意:@Async所修饰的函数不要定义为static类型,这样异步调用不会生效。

1.4 异步回调

为了让 doTaskOne()doTaskTwo()doTaskThree() 能正常结束,假设我们需要统计一下三个任务 并发执行 共耗时多少,这就需要等到上述三个函数都完成动用之后记录时间,并计算结果。

那么我们如何判断上述三个 异步调用 是否已经执行完成呢?我们需要使用 Future 来返回 异步调用结果

  • 创建 AsyncCallBackTask 类,声明 doTaskOneCallback()doTaskTwoCallback()doTaskThreeCallback() 三个方法,对原有的三个方法进行包装。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Component
public class AsyncCallBackTask extends AbstractTask {
   @Async
   public Future<String> doTaskOneCallback() throws Exception {
       super.doTaskOne();
       return new AsyncResult<>("任务一完成");
  }

   @Async
   public Future<String> doTaskTwoCallback() throws Exception {
       super.doTaskTwo();
       return new AsyncResult<>("任务二完成");
  }

   @Async
   public Future<String> doTaskThreeCallback() throws Exception {
       super.doTaskThree();
       return new AsyncResult<>("任务三完成");
  }
}
  • 单元测试 用例中,注入 AsyncCallBackTask 对象,并在测试用例中执行 doTaskOneCallback()doTaskTwoCallback()doTaskThreeCallback() 三个方法。循环调用 FutureisDone() 方法等待三个 并发任务 执行完成,记录最终执行时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@Autowired
private AsyncCallBackTask asyncCallBackTask;

@Test
public void testAsyncCallbackTask() throws Exception {
   long start = currentTimeMillis();
   Future<String> task1 = asyncCallBackTask.doTaskOneCallback();
   Future<String> task2 = asyncCallBackTask.doTaskTwoCallback();
   Future<String> task3 = asyncCallBackTask.doTaskThreeCallback();

   // 三个任务都调用完成,退出循环等待
   while (!task1.isDone() || !task2.isDone() || !task3.isDone()) {
       sleep(1000);
  }

   long end = currentTimeMillis();
   System.out.println("任务全部完成,总耗时:" + (end - start) + "毫秒");
}

看看都做了哪些改变:

  • 在测试用例一开始记录开始时间;
  • 在调用三个异步函数的时候,返回Future类型的结果对象;
  • 在调用完三个异步函数之后,开启一个循环,根据返回的Future对象来判断三个异步函数是否都结束了。若都结束,就结束循环;若没有都结束,就等1秒后再判断。
  • 跳出循环之后,根据结束时间 - 开始时间,计算出三个任务并发执行的总耗时。

执行一下上述的单元测试,可以看到如下结果:

1
2
3
4
5
6
7
复制代码开始做任务三
开始做任务一
开始做任务二
完成任务二,耗时:2572毫秒
完成任务一,耗时:7333毫秒
完成任务三,耗时:7647毫秒
任务全部完成,总耗时:8013毫秒

可以看到,通过 异步调用,让任务一、任务二、任务三 并发执行,有效的 减少 了程序的 运行总时间

2.为异步任务规划线程池

2.1线程池的作用

  1. 防止资源占用无限的扩张
  2. 调用过程省去资源的创建和销毁所占用的时间

在上一节中,我们的一个异步任务打开了一个线程,完成后销毁。在高并发环境下,不断的分配新资源,可能导致系统资源耗尽。所以为了避免这个问题,我们为异步任务规划一个线程池。

2.2定义线程池

在上述操作中,创建一个 线程池配置类TaskConfiguration ,并配置一个 任务线程池对象taskExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Configuration
public class TaskConfiguration {
   @Bean("taskExecutor")
   public Executor taskExecutor() {
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
       executor.setCorePoolSize(10);
       executor.setMaxPoolSize(20);
       executor.setQueueCapacity(200);
       executor.setKeepAliveSeconds(60);
       executor.setThreadNamePrefix("taskExecutor-");
       executor.setRejectedExecutionHandler(new CallerRunsPolicy());
       return executor;
  }
}

上面我们通过使用 ThreadPoolTaskExecutor 创建了一个 线程池,同时设置了以下这些参数:

线程池属性 属性的作用 设置初始值
核心线程数 线程池创建时候初始化的线程数 10
最大线程数 线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程 20
缓冲队列 用来缓冲执行任务的队列 200
允许线程的空闲时间 当超过了核心线程之外的线程,在空闲时间到达之后会被销毁 60秒
线程池名的前缀 可以用于定位处理任务所在的线程池 taskExecutor-
线程池对拒绝任务的处理策略 这里采用CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务 CallerRunsPolicy
  • 创建 AsyncExecutorTask类,三个任务的配置和 AsyncTask 一样,不同的是 @Async 注解需要指定前面配置的 线程池的名称taskExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码@Component
public class AsyncExecutorTask extends AbstractTask {
   @Async("taskExecutor")
   public Future<String> doTaskOneCallback() throws Exception {
       super.doTaskOne();
       System.out.println("任务一,当前线程:" + Thread.currentThread().getName());
       return new AsyncResult<>("任务一完成");
  }

   @Async("taskExecutor")
   public Future<String> doTaskTwoCallback() throws Exception {
       super.doTaskTwo();
       System.out.println("任务二,当前线程:" + Thread.currentThread().getName());
       return new AsyncResult<>("任务二完成");
  }

   @Async("taskExecutor")
   public Future<String> doTaskThreeCallback() throws Exception {
       super.doTaskThree();
       System.out.println("任务三,当前线程:" + Thread.currentThread().getName());
       return new AsyncResult<>("任务三完成");
  }
}
  • 单元测试 用例中,注入 AsyncExecutorTask 对象,并在测试用例中执行 doTaskOne()doTaskTwo()doTaskThree() 三个方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@RunWith(SpringRunner.class)
@SpringBootTest
public class AsyncExecutorTaskTest {
   @Autowired
   private AsyncExecutorTask task;

   @Test
   public void testAsyncExecutorTask() throws Exception {
       task.doTaskOneCallback();
       task.doTaskTwoCallback();
       task.doTaskThreeCallback();

       sleep(30 * 1000L);
  }
}

执行一下上述的 单元测试,可以看到如下结果:

1
2
3
4
5
6
7
8
9
复制代码开始做任务一
开始做任务三
开始做任务二
完成任务二,耗时:3905毫秒
任务二,当前线程:taskExecutor-2
完成任务一,耗时:6184毫秒
任务一,当前线程:taskExecutor-1
完成任务三,耗时:9737毫秒
任务三,当前线程:taskExecutor-3

执行上面的单元测试,观察到 任务线程池线程池名的前缀 被打印,说明 线程池 成功执行 异步任务

2.3优雅地关闭线程池

由于在应用关闭的时候异步任务还在执行,导致类似 数据库连接池 这样的对象一并被 销毁了,当 异步任务 中对 数据库 进行操作就会出错。

解决方案如下,重新设置线程池配置对象,新增线程池 setWaitForTasksToCompleteOnShutdown()setAwaitTerminationSeconds() 配置:

1
2
3
4
5
6
7
8
9
java复制代码@Bean("taskExecutor")
public Executor taskExecutor() {
   ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
   executor.setPoolSize(20);
   executor.setThreadNamePrefix("taskExecutor-");
   executor.setWaitForTasksToCompleteOnShutdown(true);
   executor.setAwaitTerminationSeconds(60);
   return executor;
}
  • setWaitForTasksToCompleteOnShutdown(true): 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,这样这些 异步任务销毁 就会先于 数据库连接池对象 的销毁。
  • setAwaitTerminationSeconds(60): 该方法用来设置线程池中 任务的等待时间,如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。

3.quartz动态定时任务(数据库持久化)

3.1前言

在项目开发过程当中,某些定时任务,可能在运行一段时间之后,就不需要了,或者需要修改下定时任务的执行时间等等。需要在代码当中进行修改然后重新打包发布,很麻烦。使用Quartz来实现的话不需要重新修改代码而达到要求。

3.2原理

  1. 使用quartz提供的API完成配置任务的增删改查
  2. 将任务的配置保存在数据库中

3.3配置

application.yml在上一节中已经引入了maven依赖包,这里不再重复。直接spring属性下面加入quartz配置信息

1
2
3
4
5
6
7
8
9
10
yml复制代码spring:
datasource:
  url: jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
  username: root
  password: 123456
  driver-class-name: com.mysql.cj.jdbc.Driver
quartz:
  job-store-type: JDBC #数据库存储quartz任务配置
  jdbc:
    initialize-schema: NEVER #自动初始化表结构,第一次启动的时候这里写always

但可能是版本bug,有的时候自动建表不会生效,自己去quartz-scheduler-x.x.x.jar里面找一下建表sql脚本:classpath:org/quartz/impl/jdbcjobstore/tables_@@platform@@.sql,然后执行。

3.4动态配置代码实现

第一步 创建一个定时任务相关实体类用于保存定时任务相关信息到数据库当中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Data
public class QuartzBean {
   /** 任务id */
   private String id;

   /** 任务名称 */
   private String jobName;

   /** 任务执行类 */
   private String jobClass;

   /** 任务状态 启动还是暂停*/
   private Integer status;

   /** 任务运行时间表达式 */
   private String cronExpression;
}

第二步 创建定时任务暂停,修改,启动,单次启动工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
java复制代码public class QuartzUtils {

  /**
    * 创建定时任务 定时任务创建之后默认启动状态
    * @param scheduler 调度器
    * @param quartzBean 定时任务信息类
    */
  @SuppressWarnings("unchecked")
  public static void createScheduleJob(Scheduler scheduler, QuartzBean quartzBean) throws ClassNotFoundException, SchedulerException {
          //获取到定时任务的执行类 必须是类的绝对路径名称
          //定时任务类需要是job类的具体实现 QuartzJobBean是job的抽象类。
          Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(quartzBean.getJobClass());
          // 构建定时任务信息
          JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(quartzBean.getJobName()).build();
          // 设置定时任务执行方式
          CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzBean.getCronExpression());
          // 构建触发器trigger
          CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(quartzBean.getJobName()).withSchedule(scheduleBuilder).build();
          scheduler.scheduleJob(jobDetail, trigger);
  }

  /**
    * 根据任务名称暂停定时任务
    * @param scheduler 调度器
    * @param jobName 定时任务名称
    */
  public static void pauseScheduleJob(Scheduler scheduler, String jobName) throws SchedulerException {
      JobKey jobKey = JobKey.jobKey(jobName);
      scheduler.pauseJob(jobKey);
  }

  /**
    * 根据任务名称恢复定时任务
    * @param scheduler 调度器
    * @param jobName 定时任务名称
    */
  public static void resumeScheduleJob(Scheduler scheduler, String jobName) throws SchedulerException {
      JobKey jobKey = JobKey.jobKey(jobName);
      scheduler.resumeJob(jobKey);
  }

  /**
    * 根据任务名称立即运行一次定时任务
    * @param scheduler 调度器
    * @param jobName 定时任务名称
    */
  public static void runOnce(Scheduler scheduler, String jobName) throws SchedulerException {
      JobKey jobKey = JobKey.jobKey(jobName);
      scheduler.triggerJob(jobKey);
  }

  /**
    * 更新定时任务
    * @param scheduler 调度器
    * @param quartzBean 定时任务信息类
    */
  public static void updateScheduleJob(Scheduler scheduler, QuartzBean quartzBean) throws SchedulerException {

          //获取到对应任务的触发器
          TriggerKey triggerKey = TriggerKey.triggerKey(quartzBean.getJobName());
          //设置定时任务执行方式
          CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzBean.getCronExpression());
          //重新构建任务的触发器trigger
          CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
          trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
          //重置对应的job
          scheduler.rescheduleJob(triggerKey, trigger);
  }

  /**
    * 根据定时任务名称从调度器当中删除定时任务
    * @param scheduler 调度器
    * @param jobName 定时任务名称
    */
  public static void deleteScheduleJob(Scheduler scheduler, String jobName) throws SchedulerException {
      JobKey jobKey = JobKey.jobKey(jobName);
      scheduler.deleteJob(jobKey);
  }
}
@Controller
@RequestMapping("/quartz/job/")
public class QuartzController {
  //注入任务调度
  @Resource
  private Scheduler scheduler;

  @PostMapping("/create")
  @ResponseBody
  public String createJob(@RequestBody QuartzBean quartzBean) throws SchedulerException, ClassNotFoundException {
      QuartzUtils.createScheduleJob(scheduler,quartzBean);
      return "已创建任务";//这里return不是生产级别代码,测试简单写一下
  }

  @PostMapping("/pause")
  @ResponseBody
  public String pauseJob(String jobName) throws SchedulerException {
      QuartzUtils.pauseScheduleJob (scheduler,jobName);
      return "已暂停成功";//这里return不是生产级别代码,测试简单写一下
  }

  @PostMapping("/run")
  @ResponseBody
  public String runOnce(String jobName) throws SchedulerException {
      QuartzUtils.runOnce (scheduler,jobName);
      return "运行任务" + jobName + "成功";//这里return不是生产级别代码,测试简单写一下
  }

  @PostMapping("/resume")
  @ResponseBody
  public String resume(String jobName) throws SchedulerException {
      QuartzUtils.resumeScheduleJob(scheduler,jobName);
      return "恢复定时任务成功:" + jobName;
  }

  @PostMapping("/update")
  @ResponseBody
  public String update(@RequestBody QuartzBean quartzBean) throws SchedulerException {
      QuartzUtils.updateScheduleJob(scheduler,quartzBean);
      return "更新定时任务调度信息成功";
  }
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%