本文带着以下问题
1 | scss复制代码❶ execute方法里为啥要用workQueue.offer(command)这个非阻塞方法呢,而不用put等阻塞方法呢? |
理解ThreadPoolExecutor,甚至任何其他的类或组件,我觉得从两个点出发会更顺滑:他的数据结构和结构中的属性变化
而ThreadPoolExecutor的数据结构为
1 | arduino复制代码private final HashSet<Worker> workers; |
线程池定义了一个worker对象封装了一个 thread 线程和一个 task 任务。可以看出一个关系:一个线程对应一个worker。而 workers 集合显而易见装的就是worker了,即worker创建了加入集合;worker的任务执行完了,worker从集合中删除。
workQueue一个阻塞队列,存放超出核心数量的任务。阻塞和队列都是很有讲究的用意。
ctl是一个巧妙的设计,既表示 workerCount 又表示 runState
本文将ThreadPoolExecutor高深的位运算转换为二进制,以便更直观的理解方法和属性的使用。对加入线程池,执行worker的线程,释放worker的线程,终止线程池等进行细致的理解,以求轻便理解。
状态
NOTE: 代码中的位运算不好直观,我们学习时可以将他们转成十进制和二进制,直观便于理解。使用以下方法操作进制间的转换
1 | rust复制代码1、二进制 -> 十进制 Integer.parseInt("00111100", 2) |
ctl即是又是
既是workerCount:表示有效的线程数
又是runState: 表示线程池的状态
Tips => 强调 ctl即是workerCount:当其值为正数时表示有效的线程数 又是runState:当其值为负数时表示线程的状态
1 | arduino复制代码/** |
Tips => 再次强调:runState的几个常量仅是状态的边界值。换句话说,每个状态其实是一个范围
正是由于每个状态其实是一个范围,状态常量仅是状态范围的边界。于是,状态方法的使用就简单明了了
状态方法
1 | markdown复制代码/** |
线程池的执行过程,这个网上说的很明白了
Tips => 正式开始前,再次强调 ctl其值为正数时表示线程数,其值为负数时表示线程状态
添加任务方法 - execute
1 | scss复制代码public void execute(Runnable command) { |
这里抛出一个问题(问题A):这里为啥要用workQueue.offer(command)这个非阻塞方法呢,而不用put等阻塞方法呢?先想想,文末一起说说
添加任务方法 - addWorker
记住一个前提:进入这个方法的条件是当前线程数<=核心线程数,或队列已满&当前线程数<=最大线程数。有了这个前提就好理解多了
1 | ini复制代码private boolean addWorker(Runnable firstTask, boolean core) { |
addWorker方法一共做了两件事:1.ctl递增;2.Worker对象加入workers集合并start Worker.thread线程(即线程池中的线程)。再聚合点说,这个方法就是进行start Worker.thread线程。既然是这样,那么有两个疑问:
- 问题B: 传入的Runnable类型的任务紧接着就进行start了,那么为什么还要workers.add(w)放入集合呢,workers集合存在的意义是什么呢?自己先想想,文末给出答案
- 请注意一个细节: addWorker方法在什么线程里执行的?这有助于问题A的理解
再问个问题(问题E):线程池(ThreadPoolExecutor)中存放的是线程吗?不是,是一堆的Worker对象,Worker既不是thread线程也不是要执行的任务。那么它是做啥的呢
我们来看下Worker的构造方法
1 | scss复制代码Worker(Runnable firstTask) { |
从构造方法知道,我们要执行的任务成为Worker的一个字段,同时Worker还有一个thread字段,看Worker的(3)处代码,我觉得这行很关键,我改下它的同义写法:this.thread.target = this,即worker作为他自身的thread字段的值,从Worker的定义知道,Worker本身也是Runnable的。所以,当执行addWorker方法的(1)处t.start()时,我们的任务也跟着执行了,这个流程如图
明白了这里,Worker.run()和runWorker(Worker)怎么触发的就很容易理解了
执行任务方法 - runWorker
1 | ini复制代码Worker的thread字段值,执行thread.start()方法,触发了此方法的执行。 |
- 请注意一个细节: runWorker方法是在什么线程里执行?这有助于问题D的理解
runWorker方法的目的就是执行任务(即task.run())。它首先执行Worker的firstTask,然后再从workQueue队列里取task继续执行。简单来说,就是取任务 执行,取任务 执行,取任务 执行。在这其中,怎么取,执行前中后会做什么事情,如ctl判断,线程中断检测,w.unlock()和w.lock()等都很重要,一个一个说吧
- 怎么取:这是getTask()方法的事情,稍后说
- ctl判断:runStateAtLeast(ctl.get(), STOP) => 是否 ctl.get() > STOP,当线程被中断了,这个方法才返回true
- w.unlock()和w.lock(),worker的lock相关用于区分线程是否空闲。结合shutdown()方法一起理解,见下文shutdown的部分
从while条件可知:null值对于runWorker()来说有特殊用途:通知获取任务的工作线程结束并退出,所以getTask方法返回null时是很特殊的
执行任务方法 - getTask
1 | java复制代码private Runnable getTask() { |
我们来分析下getTask方法的(1)处代码:
如果核心线程可超时(allowCoreThreadTimeOut=true),那么在keepAliveTime时间内,核心线程一直是活着的,所以队列一有任务来就执行,否则它就阻塞等着,时刻等着任务来;
或者当前线程数超过核心线程数,那么在keepAliveTime时间内,大于核心线程数的那些线程一直是活着的,所以也是队列一有任务来就执行,否则就阻塞等着,时刻等着任务来;
这里有个疑问(问题D):为什么要用阻塞的方法呢,不阻塞的方法不行吗,答案文末给出
要想更好的理解execute方法,addWorker方法,runWorker方法,getTask方法中的特别是条件判断的逻辑,通过shutdown,shutdownNow,tryTerminate,awaitTermination相结合着更清晰些,下面我们就看下这些方法
以下是关闭相关的方法
shutdown和shutdownNow方法比较相似,类比图如下
shutdown
调用shutdown()方法会进入 SHUTDOWN 状态。在 SHUTDOWN 状态下,线程池不接受新的任务,但是会继续执行任务队列中已有的任务。
怎么证明它此时不接收新的任务了呢,场景 do it by practise
通过调用shutdown()关闭的线程池,关闭以后表现的行为就是不能再提交任务给线程池,但是在关闭前已经提交的任务仍旧会被执行。等到任务队列空了以后线程池才会进入关闭流程
1 | scss复制代码public void shutdown() { |
shutdown方法核心由三部分组成:
- advanceRunState(SHUTDOWN):改线程池状态为SHUTDOWN
- interruptIdleWorkers():中断线程池中空闲线程
- tryTerminate():Transitions to TERMINATED state if either (SHUTDOWN and pool and queue empty) or (STOP and pool empty)
advanceRunState
转换线程池状态为入参值,入参值只能是SHUTDOWN or STOP
如果是SHUTDOWN,那么执行完advanceRunState方法后ctl的值>=0,即>=SHUTDOWN。假如当前线程数是3,那么ctl就是3
如果是STOP,那么执行完advanceRunState方法后ctl的值>=536870912,即>=STOP。假如当前线程数是3,那么ctl就是536870912+3
所以,这也证实了线程池的状态是一个范围,而不是一个值,这个范围正如文档开头处所述
1 | csharp复制代码private void advanceRunState(int targetState) { |
interruptIdleWorkers
方法很明确,就是将workers对应的线程中断。从方法的名称就可以知道功能是对空闲的线程中断。那怎么知道哪些work的线程是空闲的呢
1 | java复制代码private void interruptIdleWorkers() { |
注意这点:w.tryLock(),为啥要试图加锁呢。这时候就要看看runWorker方法了,runWorker执行时是要对worker加锁的(即调用lock)。
所有工作中的线程都需要对Worker加锁,所以在这里通过Worker.tryLock()来判断被检查的工作线程是否是空闲状态,如果是空闲状态则表示可以加锁,然后发送interrupt()命令。在发送中断命令的过程中由于工作线程是处于加锁状态的,所以被中断线程将不能被同时用于执行任务。
tryTerminate
1 | scss复制代码final void tryTerminate() { |
shutdownNow
调用shutdownNow()会进入 STOP 状态。在 STOP 状态下线程池既不接受新的任务,也不处理已经在队列中的任务。对于还在执行任务的工作线程,线程池会发起中断请求来中断正在执行的任务,同时会清空任务队列中还未被执行的任务。
1 | ini复制代码public List<Runnable> shutdownNow() { |
shutdownNow方法核心由四部分组成:
- advanceRunState(STOP):改线程池状态为STOP,与advanceRunState(SHUTDOWN)逻辑相同
- interruptWorkers():中断线程池中所有线程,这个与interruptIdleWorkers的区别细细体会,这个方法中断所有已经启动的工作线程,即进行中的任务(执行了w.lock,但还没执行w.unlock),这些线程中断可能成功也可能不成功
- tryTerminate():前面已说完
- drainQueue():从任务队列中取出所有未被执行的任务,未被执行的任务列表会被作为返回值返回给应用程序
interruptWorkers
1 | csharp复制代码// Interrupts all threads, even if active |
有个疑问:interruptWorkers是中断线程池中所有的线程(空闲的和执行中的总和),但interruptIfStarted()方法只是中断执行中的线程。如果你有这个疑惑的话,咱们一起看下getState() >= 0这个判断,我们看下runWorker方法,先执行了w.unlock(),再执行w.lock(),在执行w.unlock(),unlock是把state设置为0,lock把state设置为1,又只要执行了runWorker,那么state的值就是>=等于0的了,所以不管空闲与否,state总是>=0,所以interruptWorkers方法这时候执行interruptIfStarted方法,中断的就是所有的线程
drainQueue
将workerQueue队列里的worker返回
1 | ini复制代码private List<Runnable> drainQueue() { |
问答
1 | arduino复制代码❶ execute方法里为啥要用workQueue.offer(command)这个非阻塞方法呢,而不用put等阻塞方法呢? |
小结
对ThreadPoolExecutor的理解每次都会有新的收获,看似不经意的一行代码,一个判断,实践懂了之后都感叹和震撼作者的编程能力,每次领悟之后,都感觉作者的实现技巧都开启了我以前没见过的窗。
ThreadPoolExecutor
ThreadPoolExecutor属性源码和自己debug实践的例子:ThreadPoolExecutorTest0
原文地址:ThreadPoolExecutor系列说之由浅入深源码解说
本文转载自: 掘金