CompletableFuture实现并发处理任务+汇总结果

CompletableFuture是一个非常好用的并发任务处理工具,本篇文章将介绍由此工具实现并发处理任务,并汇总结果批量保存DB,以此带来效率上的提升。

1 背景介绍

我们通常在项目中都会涉及到接收多天的原始数据,然后生成每天的数据报告,保存到DB。如果是循环每天数据顺序的执行生成每天报告保存DB,会有下面的问题:

  • 整个流程变成了串行,耗时较长
  • 写入DB的操作也是一条一条数据写入,没有批量写入效率高
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码/**
* 测试顺序串行生成报告,汇总批量保存DB
*/
@Test
public void testSequence() {
long start = System.currentTimeMillis();
// 模拟每天的数据
List<String> days = new ArrayList<>();
days.add("2024-03-01");
days.add("2024-03-02");
days.add("2024-03-03");

// 循环每天的数据,生成每天报告
List<DayReport> reportList = new ArrayList<>();
for(String day: days) {
DayReport result = generateDayReportTask(day);
reportList.add(result);
}

// 汇总的报告list,批量保存到DB,提高写入的性能
insertBatch(reportList);
long execTime = System.currentTimeMillis() - start;
log.info("执行耗时:{} ms", execTime);
}

耗时:743ms
image.png

如果是直接把生成报告的任务提交到线程池处理,主线程需要借助countDownLatch并发工具类等待线程池里面的任务执行完毕之后执行insertBatch(reportList)操作,代码实现上稍显复杂,同时还需考虑多个线程保存任务结果到reportList等线程安全问题。

所以针对上面的问题,引入CompletableFuture工具,实现并发处理任务,并汇总结果批量保存DB,以此带来效率上的提升。同时使用更加简单而且也不存在线程安全问题。

2 并发处理+汇总批量保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
java复制代码/**
* @ClassName CompletableFutureTest
* @Description
* @Author
**/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class CompletableFutureTest {


@Autowired
@Qualifier("SummaryReportTask")
ThreadPoolExecutor summaryReportTask;

@Data
private class DayReport{
/**
* 报告id
*/
private Long reportId;

/**
* 每天的日期
*/
private String day;

/**
* 是否执行异常
*/
private Boolean ex = false;

/**
* 走路的步数
*/
private int stepCount;

public DayReport(Long reportId, String day, int stepCount) {
this.reportId = reportId;
this.day = day;
this.stepCount = stepCount;
}

public DayReport(String day, Boolean ex) {
this.day = day;
this.ex = ex;
}
}

/**
* 生成每天报告
* @param day
* @return
*/
private DayReport generateDayReportTask(String day) {
log.info("模拟生成{}的报告...", day);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 报告id
Long reportId = Long.parseLong(day.replace("-", ""));
// 每天行走的步数
int stepCount = RandomUtil.randomInt(1, 100);
return new DayReport(reportId, day, stepCount);
}

/**
* 处理任务执行产生的异常
* @param e
* @param day
* @return
*/
private DayReport handleException(Throwable e, String day) {
// 打印异常信息,便于排查问题
log.error("day: {}的任务执行异常:{}", day, e);
// 返回异常标记的结果,便于后续判断任务是否出现异常,终止后续的业务流程
return new DayReport(day, true);
}


/**
* 并发生成报告,汇总批量保存DB
*/
@Test
public void testCompletableFuture() {
long start = System.currentTimeMillis();
// 模拟每天的数据
List<String> days = new ArrayList<>();
days.add("2024-03-01");
days.add("2024-03-02");
days.add("2024-03-03");

List<CompletableFuture<DayReport>> futures = new ArrayList<>();
// 循环每天的数据,使用CompletableFuture实现并发生成每天报告
for(String day: days) {
CompletableFuture<DayReport> future = CompletableFuture
// 提交生成报告任务到指定线程池,异步执行
.supplyAsync(() -> generateDayReportTask(day), summaryReportTask)
// 任务执行异常时,处理异常
.exceptionally(e -> handleException(e, day));
// future对象添加到集合中
futures.add(future);

}

try {
// allOf方法等待所有任务执行完毕,最好设置超时时间以免长时间阻塞主线程
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(20, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(" CompletableFuture.allOf异常: {}", e);
// 出现异常,终止后续逻辑
return;
}
// 循环获取任务执行返回的结果(即生成的每日报告)
List<DayReport> reportList = new ArrayList<>();
for (CompletableFuture<DayReport> future : futures) {
DayReport result;
try {
result = future.get();
} catch (Exception e) {
log.error("future.get出现异常:{}", e);
// 任何一个任务执行异常,则直接return,中断后续业务流程,防止产生的汇总报告不完整
return;
}
// 每日报告汇总
if(null != result && !result.getEx()) { // 判断任务执行没有出现异常
reportList.add(result);
} else {
log.error("result为null或者任务执行出现异常");
// 任何一个任务执行异常,则直接return,中断后续业务流程,防止产生的汇总报告不完整
return;
}
}

// 汇总的报告list,批量保存到DB,提高写入的性能
insertBatch(reportList);
long execTime = System.currentTimeMillis() - start;
log.info("执行耗时:{} ms", execTime);
}

void insertBatch(List<DayReport> reportList) {
log.info("报告批量保存reportList:{}", JSON.toJSONString(reportList));
}

线程池配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Configuration
@Slf4j
public class ExecutorConfig {


/**
* 处理每日汇总报告线程池
* @return
*/
@Bean("SummaryReportTask")
public ThreadPoolExecutor summaryReportTaskExecutor() {
int corePoolSize = cpuCores();
int maxPoolSize = corePoolSize * 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(800),
// 自定义线程池名称,便于排查问题
new CustomizableThreadFactory("summaryReportTaskExecutor"),
// 超过最大线程数,拒绝任务并抛出异常
new ThreadPoolExecutor.AbortPolicy());
return threadPoolExecutor;
}

private int cpuCores() {
return Runtime.getRuntime().availableProcessors();
}
}

主要流程:

  1. 循环每天数据,提交生成报告任务到线程池,并发执行生成每日报告
  2. 任务发生异常处理,主要是打印异常信息,标记结果
  3. CompletableFuture.allOf方法等待所有任务执行完毕
  4. 获取任务执行结果,进行每日报告汇总为list
  5. 最后汇总的list批量保存DB

耗时:331ms,比串行处理节省一半的时间。

image.png

最后需要注意的点:

  1. CompletableFuture需要配置自定义线程池使用,可以做到不同业务线的线程池隔离,避免相互影响
  2. 任务的异常处理打印必要的异常日志便于排查问题
  3. 每个任务出现异常时,记得中断后续逻辑,避免汇总的数据出现不完整

3 总结

本章主要介绍了CompletableFuture使用的一类场景:实现并发处理任务 && 等待多个并发任务完成,并汇总各个任务返回的结果 && 批量保存。有类似这种业务场景的可以使用CompletableFuture来实现,以此提高运行效率。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%