为什么要使用线程池
- 降低创建线程和销毁线程的性能开销。
- 提高响应速度,当有新任务需要执行是不需要等待线程创建就可以立马执行。
- 合理的设置线程池大小可以避免因为线程数过多消耗CPU资源。
我们来看阿里巴巴的代码规范,在项目中创建线程必须要使用线程池创建,原因也是我说的以上三点。
线程池的使用
首先我们来看下UML类图
- Executor:可以看到最顶层是 Executor 的接口。这个接口很简单,只有一个 execute 方法。此接口的目的是为了把任务提交和任务执行解耦。
- ExecutorService:这还是一个接口,继承自 Executor,它扩展了 Executor 接口,定义了更多线程池相关的操作。
- AbstractExecutorService:提供了 ExecutorService 的部分默认实现。
- ThreadPoolExecutor:实际上我们使用的线程池的实现是 ThreadPoolExecutor。它实现了线程池工作的完整机制。也是我们接下来分析的重点对象。
- ForkJoinPool:和ThreadPoolExecutor都继承自AbstractExecutorService,适合用于分而治之,递归计算的算法
- ScheduledExecutorService:这个接口扩展了ExecutorService,定义个延迟执行和周期性执行任务的方法。
- ScheduledThreadPoolExecutor:此接口则是在继承 ThreadPoolExecutor 的基础上实现 ScheduledExecutorService 接口,提供定时和周期执行任务的特性。
搞清楚上面的结构很重要,Executors是一个工具类,然后看创建线程的两种方式,第一种是通过Executors提供的工厂方法来实现,有下面四种方式:
1 | java复制代码 Executor executor1 = Executors.newFixedThreadPool(10); |
第二种是通过构造方法来实现
1 | java复制代码 ExecutorService executor5 = new ThreadPoolExecutor(1, |
其实查看第一种方式创建的源码就会发现:
1 | java复制代码 public static ExecutorService newCachedThreadPool() { |
它们的底层还是通过调用ThreadPoolExecutor的构造方法,创建时传入不同参数,所以本质上还是只有一种创建线程池的方式,就是用构造方法,这里我不想讲用Executors的工厂方法具体帮我们创建了怎样的线程池,让我们再来看一条阿里巴巴规范。
看到这里大家都明白了吧,正是因为封装性太强了,反而小伙们会不知道怎么用,乱用,滥用,有可能会导致OOM,除非你对创建的这四个线程池了如指掌,所以我介绍了也是白介绍,因为就不让用,接下来我们重点看下ThreadPoolExecutor构造方法里各个参数的含义。
1 | java复制代码public ThreadPoolExecutor(int corePoolSize, //核心线程数量 |
- corePoolSize:即线程池的核心线程数量,其实也是最小线程数量。不设置allowCoreThreadTimeOut 的情况下,核心线程数量范围内的线程一直存活。线程不会自行销毁,而是以挂起的状态返回到线程池,直到应用程序再次向线程池发出请求时,线程池里挂起的线程就会再度激活执行任务。
- maximumPoolSize:即线程池的最大线程数量
- keepAliveTime和unit:超出核心线程数后的存活时间和存活单位
- workQueue:是一个阻塞的 queue,用来保存线程池要执行的所有任务。通常可以取下面三种类型:
1 | ini复制代码1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小; |
- ThreadFactory:我们一般用Executors.defaultThreadFactory()默认工厂,为什么要用工厂呢,其实就是规范了生成的Thread。避免调用new Thread创建,导致创建出来的Thread可能存在差异
- handler:当队列和最大线程池都满了之后的拒绝策略。
1 | 复制代码1、AbortPolicy:直接抛出异常,默认策略; |
创建完线程池后使用也很简单,带返回值和不带返回值,传入对应传入Runnable或者Callable接口的实现
1 | java复制代码//无返回值 |
源码分析
execute方法
我们先从execute方法开始看
1 | java复制代码public void execute(Runnable command) { |
分三步做处理:
- 如果运行的线程数量小于 corePoolSize,那么尝试创建新的线程,并把传入的 command 作为它的第一个 task 来执行。调用 addWorker 会自动检查 runState 和 workCount,以此来防止在不应该添加线程时添加线程的错误警告;
- 即使 task 可以被成功加入队列,我们仍旧需要再次确认我们是否应该添加 thread(因为最后一次检查之后可能有线程已经死掉了)还是线程池在进入此方法后已经停掉了。所以我们会再次检查状态,如果有必要的话,可以回滚队列。或者当没有线程时,开启新的 thread;
- 如果无法将 task 加入 queue,那么可以尝试添加新的 thread。如果添加失败,这是因为线程池被关闭或者已经饱和了,所以拒绝这个 task。
下面用流程图演示一下,更加直观清楚
然后介绍一下源码中ctl是干什么的,点进去查看源码
我们发现它是一个原子类,主要作用是用来保存线程数量和线程池的状态,他用到了位运算,
一个int数值是32个 bit 位,这里采用高 3 位来保存运行状态,低 29 位来保存线程数量。
我们来计算一下ctlOf(RUNNING, 0)方法,其中 RUNNING =-1 << COUNT_BITS ; -1 左移 29 位,-1 的二进制是32个1(1111 1111 1111 1111 1111 1111 1111 1111),左移29位后得到(1110 0000 0000 0000 0000 0000 0000 0000),然后111| 0还是111,同理可得其他状态的 bit 位。这个位运算很有意思,hashmap源码中也用到了位运算,小伙们在平时开发中也可以尝试用下,这样运算速度会快,而且能够装b,介绍下这五种线程池的状态
- RUNNING:接收新任务,并执行队列中的任务
- SHUTDOWN:不接收新任务,但是执行队列中的任务
- STOP:不接收新任务,不执行队列中的任务,中断正在执行中的任务
- TIDYING:所有的任务都已结束,
线程数量为 0,处于该状态的线程池即将调用 terminated()方法 - TERMINATED:terminated()方法执行完成
他们的转换关系如下:
addWorker方法
我们看到execute流程的核心方法为addWorker,我们继续分析,其实就做了两件事,拆分一下
第一步:通过原子操作增加线程数量:
1 | java复制代码retry: |
retry是一个标记,和循环配合使用,continue retry 的时候,会跳到 retry 的地方再次执行。如果 break retry,则跳出整个循环体。源码先获取到 ctl,然后检查状态,然后根据创建线程类型的不同,进行数量的校验。在通过CAS方式更新 ctl状态,成功的话则跳出循环。否则再次取得线程池状态,如果和最初已经不一致,那么从头开始执行。如果状态并未改变则继续更新worker的数量。流程图如下:
第二步:添加 worker 到 workers 的 set 中。并且启动 worker 中持有的线程。代码如下:
1 | java复制代码boolean workerStarted = false; |
可以看到添加 work 时需要先获得锁,这样确保多线程并发安全。如果添加 worker 成功,那么调用 worker 中线程的 start 方法启动线程。如果启动失败则调用 addWorkerFailed 方法进行回滚。看到这里小伙们会发现
1、ThreadPoolExecutor在初始化后并没有启动和创建任何线程,在调用 execute方法时才会调用 addWorker创建线程
2、addWorker方法中会创建新的worker,并启动其持有的线程来执行任务。
上文提到如果线程数量已经达到corePoolSize,则只会把command 加入到 workQueue中,那么加入到 workQueue中的command是如何被执行的呢?下面我们来分析 Worker 的源代码。
Worker类
Worker封装了线程,是executor中的工作单元。worker继承自AbstractQueuedSynchronizer,并实现 Runnable。 Worker 简单理解其实就是一个线程,里面重新了 run 方法,我们来看他的构造方法:
1 | java复制代码 Worker(Runnable firstTask) { |
再来看下这两个重要的属性
1 | java复制代码 /** Thread this worker is running in. Null if factory fails. */ |
firstTask 用它来保存传入的任务;thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,是用来处理任务的线程,这里用的是 ThreadFactory 创建线程,并没有直接 new,原因上文也提到过,这里看下 newThread 传入的是 this,因为 Worker 本身继承了 Runnable 接口,所以 addWork 中调用的 t.start(),实际上运行的是 t 所属 worker 的 run 方法。worker 的 run 方法如下:
1 | java复制代码public void run() { |
runWorker源码再如下:
1 | java复制代码 final void runWorker(Worker w) { |
getTask()
1 | java复制代码private Runnable getTask() { |
简单分析一下
- 先取出 worker 中的 firstTask,并清空;
- 如果没有 firstTask,则调用 getTask 方法,从 workQueue 中获取task;
- 获取锁;
- 执行 beforeExecute。这里是空方法,如有需要在子类实现;
- 执行 task.run;
- 执行 afterExecute。这里是空方法,如有需要在子类实现;
- 清空 task,completedTasks++,释放锁;
- 当有异常或者没有 task 可执行时,会进入外层 finnaly 代码块。调用 processWorkerExit 退出当前 worker。从 works 中移除本 worker 后,如果 worker 数量小于 corePoolSize,则创建新的 worker,以维持 corePoolSize 大小的线程数。
这行代码 while (task != null || (task = getTask()) != null) ,确保了 worker 不停地从 workQueue 中取得 task 执行。getTask 方法会从 BlockingQueue workQueue 中 poll 或者 take 其中的 task 出来。
后面还有shutdown()、shutdownNow()等其他方法留给小伙们自行去观察研究哈。
如何合理配置线程池的大小
线程池大小不是靠猜,也不是说越多越好,最好的方式还是根据实际情况测试得出最佳配置。
- CPU 密集型:主要是执行计算任务,响应时间很快,CPU 一直在运行,这种任务 CPU 利用率很高,会增加上下文切换,应当分配较少的线程,比如 CPU
core+1
。 - IO 密集型:主要是进行 IO 操作,执行 IO 操作的时间较长,由于线程并不是一直在运行,这时 CPU 利用率不高,可以增加线程池的大小,比如 CPU
2*core+1
。
线程池的监控
如果在项目中大规模的使用了线程池,那么必须要有一套监控体系,来指导当前线程池的状
态,当出现问题的时候可以快速定位到问题。我们通过重写线程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以实现对线程的监控
看这些名称和定义都知道,这是让子类来实现的,可以在线程执行前、后、终止状态执行自定义逻辑。
总结
线程池这东西说简单也简单,说难也难,简单是因为用起来简单,难是难在要知道它的底层的源码,它是如何调度线程的,说两点吧,第一是本文中用了大量的流程图,当我们在阅读源码或者做复杂业务开发的时候,一定要静下心来先画个图,否则会被绕晕或者被别人打断后,又得从头到尾的看一边,第二是阅读源码,刚毕业的小伙伴可能只要会用行了,但是如果你工作五年了,还是只会用,那你比刚毕业的优势在哪里,凭什么工资要的高。感谢大家观看~
本文转载自: 掘金