Java 线程池使用总结

「这是我参与11月更文挑战的第4天,活动详情查看:2021最后一次更文挑战

前言

image.png

如图:阿里巴巴 Java 开发手册中对于线程池的创建有着明确的规范。 Executors 返回的线程池有着无法避免的劣势。使用线程池强制使用 ThreadPoolExecutor 创建,建议小伙伴在对线程池的机制有充分的了解的前提下使用 。

当然使用 ThreadPoolExecutor 创建线程池的原因还有:

  1. 根据机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、拒绝策略等等。
  2. 显示地给我们的线程池命名,这样有助于定位问题。
  3. 方便开发人员对线程池运行状况进行监测,方便及时调整策略避免生产问题。

这里分享一个线上事故的案例 线程池运用不当的一次线上事故 。案例很精彩,场景很现实。是一次很不错的关于线程池问题处理过程。

PS:如果不想看笔者大致总结:父任务依赖子任务执行,且放在同一个线程池 newFixedThreadPool 中执行,导致核心线程死锁无界队列任务不断增加。

正文

既然线程池是日常工作非常常见的知识且使用过程中需要对此有着充分的认知,所以今天就总结一下线程池的常见知识点。

简单的例子

如下使用 ThreadPoolExecutor 实现了自定义线程池完成 Callable 的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码class ImpCallable implements Callable<String> {
// 核心线程数
private static final int CORE_POOL_SIZE=2;
// 最大线程数
private static final int MAX_POOL_SIZE=4;
// 线程数大于 corePoolSize 线程持续时间
private static final int KEEP_ALIVE_TIME=1;
// 阻塞队列的大小
private static final int QUEUE_CAPACITY=5;
private static final TimeUnit UNIT = TimeUnit.SECONDS;
// 自定义线程名
private static final String THREAD_NAME = "my-self-thread-pool-%d";

public static void main(String[] args) throws ExecutionException, InterruptedException {

ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
UNIT,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new BasicThreadFactory.Builder().namingPattern(THREAD_NAME).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
ImpCallable task = new ImpCallable();
List<Future<String>> futureList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
// 提交任务到线程池
Future<String> future = executor.submit(task);
// 任务结果 future 加入结果队列
futureList.add(future);
}

for (Future<String> fut : futureList) {
try {
// 取出结果
System.out.println(new Date() + "--" + fut.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

executor.shutdown();

try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("All threads Finished");
}

@Override
public String call() throws Exception {
Thread.sleep(2000L);
return Thread.currentThread().getName();
}
}

阻塞队列

用来保存等待被执行的任务的阻塞队列,Java 中提供了如下阻塞队列:

  1. ArrayBlockingQueue:基于数组结构的有界阻塞队列,其构造必须指定大小。对象按 FIFO 排序。
  2. LinkedBlockingQuene:基于链表结构的阻塞队列,大小不固定,若不指定大小,其大小有Integer.MAX_VALUE 来决定。对象按 FIFO 排序。吞吐量通常要高于 ArrayBlockingQuene
  3. SynchronousQuene:特殊的 BlockingQueue,对其的操作必须是放和取交替完成。
  4. priorityBlockingQuene:类似于 LinkedBlockingQueue,对象的排序由对象的自然顺序或者构造函数的 Comparator 决定。

自定义线程名

创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。

关于这一点非常有必要,方便快速定位问题以及监控线程池。

拒绝策略

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

  1. AbortPolicy 直接抛出异常,默认策略,可以及时发现线程池的瓶颈。
  2. CallerRunsPolicy 使用调用者所在的线程来执行任务,新任务不会丢失,采用谁提交谁负责的策略,有效控制线程池压力。
  3. DiscardOldestPolicy 丢弃阻塞队列中靠最前的任务,并执行当前任务,该策略存在丢失任务的风险,不建议使用
  4. DiscardPolicy 直接丢弃任务,该策略简单粗暴以保证系统可以为主要目标,不建议使用

当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略。如记录日志或持久化存储不能处理的任务。

线程池大小

关于线程池线程大小可以根据实际业务场景具体设置。

推荐个适用面比较广的公式( N 为 CPU 核心数):

  • CPU 密集型任务 N+1 。
  • IO 密集型任务 2N 。

线程池状态

image.png

如上图线程池的状态分为五种,分别对应 Java 中五个 int 字段:

1
2
3
4
5
arduino复制代码private static final int RUNNING = -536870912;
private static final int SHUTDOWN = 0;
private static final int STOP = 536870912;
private static final int TIDYING = 1073741824;
private static final int TERMINATED = 1610612736;
  1. RUNNING 线程创建成功初始化状态,能够接收新任务,以及对已添加的任务进行处理。
  2. SHUTDOWN 不接收新任务,但能处理已添加的任务。
  3. STOP 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
  4. TIDYING 当所有的任务已终止队列任务也为空线程池会变为 TIDYING 状态。
  5. TERMINATED 线程池彻底终止,由 TIDYING 状态变成 TERMINATED 状态。

监控线程池

我们可以通过第三方组件监控线程池的运行状态比如 SpringBoot 中的 Actuator 。

除此之外,我们还可以利用 ThreadPoolExecutor 的相关 API 监测线程池状态。如下图我们可以轻松获得线程池的各项参数,结合邮件实时监测线程池健康状况。

image.png

这里推荐美团 Java线程池实现原理及其在美团业务中的实践

美团线程池的文章中实现了线程池核心参数动态配置、线程池健康监测与告警、线程池的集中管理等功能。感兴趣的小伙伴可以自行研究。

参考资料

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%