ThreadPoolExecutor 的参数含义及源码执行流

「这是我参与11月更文挑战的第11天,活动详情查看:2021最后一次更文挑战

线程池是为了避免线程频繁的创建和销毁带来的性能消耗,而建立的一种池化技术,它是把已创建的线程放入“池”中,当有任务来临时就可以重用已有的线程,无需等待创建的过程,这样就可以有效提高程序的响应速度。但如果要说线程池的话一定离不开**ThreadPoolExecutor

说明:Executors 返回的线程池对象的弊端如下:

  • 1)FixedThreadPoolSingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM(Out Of Memory)
  • 2)CachedThreadPoolScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM

其实当我们去看**Executors的源码会发现,Executors.newFixedThreadPool()** 、Executors.newSingleThreadExecutor()Executors.newCachedThreadPool() 等方法的底层都是通过 ThreadPoolExecutor 实现的。

过程

ThreadPoolExecutor 的核心参数指的是它在构建时需要传递的参数,其构造方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码 public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory,
                             RejectedExecutionHandler handler) {
       if (corePoolSize < 0 ||
           //maximumPoolSize必须大于0,且必须大于corePoolSize
           maximumPoolSize <= 0 ||
           maximumPoolSize < corePoolSize ||
           keepAliveTime < 0)
           throw new IllegalArgumentException();
       if (workQueue == null || threadFactory == null || handler == null)
           throw new NullPointerException();
       this.acc = System.getSecurityManager() == null ?
               null :
               AccessController.getContext();
       this.corePoolSize = corePoolSize;
       this.maximumPoolSize = maximumPoolSize;
       this.workQueue = workQueue;
       this.keepAliveTime = unit.toNanos(keepAliveTime);
       this.threadFactory = threadFactory;
       this.handler = handler;
  }
  • 第1个参数:**corePoolSize**表示线程池的常驻核心线程数。如果设置为0,则表示在没有任何任务时,销毁线程池;如果大于0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程;如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值。
  • 第2个参数:maximumPoolSize表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于0,也必须大于等于corePoolSize,此值只有在任务比较多,且不能存放在任务队列时,才会用到。
  • 第3个参数:keepAliveTime表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁的等于corePoolSize为止,如果maximumPoolSize等于corePoolSize,那么线程池在空闲的时候也不会销毁任何线程。
  • 第4个参数:unit 表示存活时间的单位,它是配合**keepAliveTime**参数共同使用的。
  • 第5个参数:**workQueue**表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行。
  • 第6个参数:**threadFactory**表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程,源代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码  public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory) {
       this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            threadFactory, defaultHandler);
  }

public static ThreadFactory defaultThreadFactory() {
       return new DefaultThreadFactory();
  }

// 默认的线程创建工厂,需要实现 ThreadFactory 接口
static class DefaultThreadFactory implements ThreadFactory {
       private static final AtomicInteger poolNumber = new AtomicInteger(1);
       private final ThreadGroup group;
       private final AtomicInteger threadNumber = new AtomicInteger(1);
       private final String namePrefix;

       DefaultThreadFactory() {
           SecurityManager s = System.getSecurityManager();
           group = (s != null) ? s.getThreadGroup() :
                                 Thread.currentThread().getThreadGroup();
           namePrefix = "pool-" +
                         poolNumber.getAndIncrement() +
                        "-thread-";
      }

  // 创建线程
       public Thread newThread(Runnable r) {
           Thread t = new Thread(group, r,
                                 namePrefix + threadNumber.getAndIncrement(),
                                 0);
           if (t.isDaemon())
               //创建一个非守护线程
               t.setDaemon(false);
           if (t.getPriority() != Thread.NORM_PRIORITY)
               //线程优先级设置为默认值
               t.setPriority(Thread.NORM_PRIORITY);
           return t;
      }
  }

我们也可以自定义一个线程工厂,通过实现**ThreadFactory**接口来完成,这样就可以自定义线程的名称或线程执行的优先级了。

  • 第7个参数:RejectedExecutionHandler 表示指定线程池的拒绝策略,当线程池的任务已经在缓存队列**workQueue中存储满了之后,并且不能创建新的线程来执行此任务时,就会用到此拒绝策略,它属于一种限流保护的机制。线程池的工作流程要从它的执行方法execute()** 说起,源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码public void execute(Runnable command) {
       if (command == null)
           throw new NullPointerException();
       /*
          *分3步进行:
          *
          * 1.如果少于正在运行的corePoolSize线程,请尝试
          *以给定的命令作为第一个线程启动一个新线程
          *任务。 对addWorker的调用自动检查runState和
          * workerCount,因此可以防止假警报的增加
          *通过返回false返回不应该执行的线程。
          *
          * 2.如果任务可以成功排队,那么我们仍然需要
          *仔细检查我们是否应该添加线程
          *(因为现有的自上次检查后死亡)或
          *自从进入此方法以来,该池已关闭。 所以我们
          *重新检查状态,并在必要时回退排队
          *停止,如果没有,则启动一个新线程。
          *
          * 3.如果我们无法将任务排队,那么我们尝试添加一个新的
          *线程。 如果失败,我们知道我们已经关闭或饱和
          *并因此拒绝任务。
          */
       int c = ctl.get();
  // 当前工作的线程小于核心线程数
       if (workerCountOf(c) < corePoolSize) {
           //创建新的线程执行次任务
           if (addWorker(command, true))
               return;
           c = ctl.get();
      }
  // 检查线程池是否处于可运行状态,如果是则把任务添加到队列
       if (isRunning(c) && workQueue.offer(command)) {
           int recheck = ctl.get();
           //再次检查线程池是否处于可运行状态,防止第一次校验通过后线程池关闭
           //如果是非运行状态,则把刚加入队列的任务移除
           if (! isRunning(recheck) && remove(command))
               reject(command);
           //如果线程池的线程数为0时(当corePoolSize设置为0时会发生)
           else if (workerCountOf(recheck) == 0)
               //新建线程执行任务
               addWorker(null, false);
      }
  //核心线程都在忙且队列都已爆满,尝试新启动一个线程执行失败
       else if (!addWorker(command, false))
           //执行拒绝策略
           reject(command);
  }

其中**addWorker(Runnable firstTask,boolean core)** 方法参数说明:

+ **`firstTask`**:线程首先执行的任务,如果没有则设置为null
+ **core**:判断是否可以创建线程的阈值(最大值),如果等于true则表示使用\*\*`corePoolSize`**作为阈值,false则表示使用**`maximumPoolSize`\*\*作为阈值

扩展

execute()和submit()

execute()和submit()都是用来执行线程任务的,他们最主要的区别是,submit()能接受线程池执行的返回值,execute()不能接受返回值。

两个方法的具体使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public static void main(String[] args) throws ExecutionException, InterruptedException {
       ThreadPoolExecutor execute = new ThreadPoolExecutor(2,10,10L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(20));
       //execute使用
       execute.execute(new Runnable() {
           @Override
           public void run() {
               System.out.println("Hello,execute");
          }
      });
       //submit使用
       Future<String> future = execute.submit(new Callable<String>() {
           @Override
           public String call() throws Exception {
               System.out.println("Hello,submit");
               return "Success";
          }
      });
       System.out.println(future.get());
  }

程序执行结果:

1
2
3
lua复制代码Hello,execute
Hello,submit
Success

从以上结果可以看出submit()方法可以配合Futrue来接收线程执行的返回值。它们的另一个区别是execute()方法属于Executor接口的方法,而 submit() 方法则是属于 ExecutorService 接口的方法

  • 线程池拒绝策略

当线程池中的任务队列已经被存满,再有任务添加时会先判断当前线程池中的线程数是否大于等于线程池的最大值,如果是,则会触发线程池的拒绝策略。

Java自带的拒绝策略有四种:

  • AbortPolicy,终止策略,线程池会抛出异常并终止执行,它是默认的拒绝策略;
  • CallerRunsPolicy,把任务交给当前线程来执行
  • DiscardPolicy,忽略此任务(最新的任务)
  • DiscardOldestPolicy,忽略最早的任务(最先加入队列的任务)

AbortPolicy拒绝策略示例:

1
2
3
4
5
6
java复制代码ThreadPoolExecutor executor = new ThreadPoolExecutor(1,3,10,TimeUnit.SECONDS,new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.AbortPolicy());
       for (int i=0;i<6;i++){
           executor.execute(()->{
               System.out.println(Thread.currentThread().getName());
          });
      }

程序运行结果:

1
2
3
4
5
6
7
8
9
10
arduino复制代码pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-3
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task example.ThreadPoolExecutorExample$$Lambda$1/1149319664@4dd8dc3 rejected from java.util.concurrent.ThreadPoolExecutor@6d03e736[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at example.ThreadPoolExecutorExample.main(ThreadPoolExecutorExample.java:34)

可以看出当第6个任务来的时候,线程池则执行了AbortPolicy 拒绝策略,抛出了异常。因为队列最多存储2个任务,最大可以创建3个线程来执行任务(2+3=5),所以当第6个任务来的时候,此线程池就“忙”不过来了。

自定义拒绝策略

自定义拒绝策略只需要新建一个RejectedExecutionHandler对象,然后重写它的rejectedExecution()方法即可,如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
               //添加自定义拒绝策略
               new RejectedExecutionHandler() {
           @Override
           public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
               //业务执行方法
               System.out.println("执行自定义拒绝策略");
          }
      });
       for (int i=0;i<6;i++){
           executor.execute(()->{
               System.out.println(Thread.currentThread().getName());
          });
      }

程序执行结果:

1
2
3
4
5
6
arduino复制代码执行自定义拒绝策略
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3

可以看出线程池执行了自定义的拒绝策略,我们可以在rejectedExecution中添加自己业务处理的代码。

ThreadPoolExecutor扩展

ThreadPoolExecutor的扩展主要是通过重写它的beforeExecute()afterExecute()方法实现的,我们可以在扩展方法中添加日志或者实现数据统计,比如统计线程的执行时间,如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码public class ThreadPoolExtend {
   public static void main(String[] args) {
       //线程池扩展调用
       MyThreadPoolExtend executor = new MyThreadPoolExtend(2,4,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
       for (int i=0;i<3;i++){
           executor.execute(()->{
               System.out.println(Thread.currentThread().getName());
          });
      }
  }

   /**
    * 线程池扩展
    */
   static class MyThreadPoolExtend extends ThreadPoolExecutor{
       //保存线程开始执行的时间
       private final ThreadLocal<Long> localTime = new ThreadLocal<>();
       public MyThreadPoolExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
           super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
      }

       /**
        * 开始执行之前
        * @param t 线程
        * @param r 任务
        */
       @Override
       protected void beforeExecute(Thread t, Runnable r) {
           //开始时间(单位:纳秒)
           Long sTime = System.nanoTime();
           localTime.set(sTime);
           System.out.println(String.format("%s|before|time=%s",t.getName(),sTime));
           super.beforeExecute(t, r);
      }

       /**
        * 执行 完成之后
        * @param r 任务
        * @param t 抛出的遗产
        */
       @Override
       protected void afterExecute(Runnable r, Throwable t) {
           //结束时间(单位:纳秒)
           long eTime = System.nanoTime();
           long totalTime = eTime - localTime.get();
           System.out.println(String.format("%s|after|time=%s|耗时:%s毫秒",Thread.currentThread().getName(),eTime,totalTime/1000000.0));
           super.afterExecute(r, t);
      }
  }
}

程序执行结果:

1
2
3
4
5
6
7
8
9
ini复制代码pool-1-thread-1|before|time=95659085427600
pool-1-thread-1
pool-1-thread-2|before|time=95659085423300
pool-1-thread-2
pool-1-thread-2|after|time=95659113130200|耗时:27.7069毫秒
pool-1-thread-1|after|time=95659112193700|耗时:26.7661毫秒
pool-1-thread-2|before|time=95659117635200
pool-1-thread-2
pool-1-thread-2|after|time=95659117822000|耗时:0.1868毫秒

小结

线程池的使用必须要通过 ThreadPoolExecutor 的方式来创建,这样才可以更加明确线程池的运行规则,规避资源耗尽的风险。同时,也介绍了 ThreadPoolExecutor 的七大核心参数,包括核心线程数和最大线程数之间的区别,当线程池的任务队列没有可用空间且线程池的线程数量已经达到了最大线程数时,则会执行拒绝策略,Java 自动的拒绝策略有 4 种,用户也可以通过重写 rejectedExecution() 来自定义拒绝策略,还可以通过重写 beforeExecute() 和 afterExecute() 来实现 ThreadPoolExecutor 的扩展功能。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%