线程池拒绝策略的坑,不得不防 现象 问题复现 原因分析 其他

「这是我参与11月更文挑战的第2天,活动详情查看:2021最后一次更文挑战

现象

先讲一下上周新鲜出炉的bug,业务反馈线上导出采购单功能超时,本来以为是业务导出采购单较多,让业务缩短下日期导出,结果还是导不出来,此时就怀疑是刚上线代码问题,立即进行了代码回滚,业务再重试导出成功。

本次上线代码主要是批量调用下游系统,改成了通过线程池调用,由于对下游系统不是强依赖,线程池调用设置的拒绝策略为丢弃策略(DiscardPolicy)

问题复现

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码public class DiscardTest {
public static void main(String[] args) throws Exception {

// 一个线程,队列最大为1
ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());

Future future1 = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("start Runnable one");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Future future2 = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("start Runnable two");
}
});

Future future3 = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("start Runnable three");
}
});

System.out.println("task one finish " + future1.get());// 等待任务1执行完毕
System.out.println("task two finish " + future2.get());// 等待任务2执行完毕
System.out.println("task three finish " + future3.get());// 等待任务3执行完毕

executorService.shutdown();// 关闭线程池,阻塞直到所有任务执行完毕
}
}

执行结果:一直卡着不动

image.png

原因分析

线程池线程数量到达3时,后续提交的任务执行丢弃策略(DiscardPolicy)。

我们看下DiscardPolicy实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
// 拒绝策略什么也没有做,此线程的状态依然是NEW
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

执行丢弃策略时,丢弃策略的执行方法是什么都不做。 线程的状态依然是NEW。

要分析这个问题另外我们还需要看下线程池的submit方法里面做了什么,提交任务到线程池时,会包装成 FutureTask ,初始状态是 NEW。执行任务的是包装后的FutureTask对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

// 初始状态为NEW
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

// 状态小于等于COMPLETING都会一直等待
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

我们看get()方法,FutureTask状态>COMPLETING 才会返回。因为拒绝策略没有修改FutureTask的状态,FutureTask的状态一直是NEW,所以不会返回,一直等待。

其他拒绝策略会不会导致阻塞

AbortPolicy是直接抛出异常,调用方马上可以获取结果

CallerRunsPolicy 是让主线程去执行,会更新任务状态

DiscardOldestPolicy 会poll出一个任务,但是没有任务处理,所以poll出来的任务是NEW状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
java复制代码public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

总结

  1. 使用Future.get(),需要根据业务实际情况设置超时时间
  2. 能不用丢弃策略(DiscardPolicy),就不要用,如确实需要用,则需要自己实现。

延伸,线程池使用还有哪些注意事项?

  • 虽然使用CallerRunsPolicy不会造成卡死,但是还是要慎重,如果导致主线程被大量阻塞,对业务同样有影响。
  • 线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。(阿里巴巴开发手册)

说明:Executors各个方法的弊端:

1)newFixedThreadPool和newSingleThreadExecutor:

主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。

2)newCachedThreadPool和newScheduledThreadPool:

主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

  • 如果公司有全链路的trace(如阿里云的TracingAnalysis),线程中记得传递trace信息,不然trace信息会丢失。

本文已参与「新人创作礼」活动,一起开启掘金创作之路。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%