java线程池之ScheduledThreadPoolExe

java中异步周期任务调度有Timer,ScheduledThreadPoolExecutor等实现,目前单机版的定时调度都是使用ScheduledThreadPoolExecutor去实现,那么它是如何实现周期执行任务的呢?其实它还是利用ThreadPoolExecutor线程池去执行任务,这一点从它是继承自ThreadPoolExecutor救可以看的出来,其实关键在于如何实现任务的周期性调度

ScheduledThreadPoolExecutor类以及核心函数

首先ScheduledThreadPoolExecutor是实现ScheduledExecutorService接口,它主要定义了四个方法:

  • 周期调度一个Runnable的对象
  • 周期调度一个Callable的对象
  • 固定周期调度Runnable对象 (不管上一次Runnable执行结束的时间,总是以固定延迟时间执行 即 上一个Runnable执行开始时候 + 延时时间 = 下一个Runnable执行的时间点)
  • 以固定延迟调度unnable对象(当上一个Runnable执行结束后+固定延迟 = 下一个Runnable执行的时间点)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
arduino复制代码public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}

其次,ScheduledThreadPoolExecutor是继承ThreadPoolExecutor,所以它是借助线程池的能力去执行任务,然后自身去实现周期性调度。从构造方法调用父类的线程池的构造方法,核心线程数是构造方法传入,这里可以看到最大线程数是Integer的最大值即2147483647, 还有等待队列是DelayedWorkQueue,它是实现延时的关键.

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

scheduleAtFixedRate是实现周期性调度的方法,调度任务就是实现Runnable对象,
以及系统的开始延时时间,周期的调度的间隔时间。

  1. 计算初始触发时间和执行周期,并和传入的Runnable对象作为参数封装成 ScheduledFutureTask,然后调用decorateTask装饰Tas(默认实现为空)。
  2. 设置ScheduledFutureTask对象outerTask为t(默认就是它自己)。
  3. 调用delayedExecute延迟执行任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ini复制代码public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
** TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(init ialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
  1. 判断线程池状态,如果不是处于running状态,则拒绝该任务。
  2. 将该任务加入父类的延迟队列(实际为初始化的DelayedWorkQueue对象)
  3. 再次判断线程池不是处于running状态,并且,判断是否是处于shutdown状态并且continueExistingPeriodicTasksAfterShutdown标志是否是true(默认是false,表示是否线程次处于shutdown状态下是否继续执行周期性任务),若果为true,则从队列删除任务,false,则确保启动线程来执行周期性任务
1
2
3
4
5
6
7
8
9
10
11
12
13
scss复制代码private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
  1. 获取线程池数量
  2. 如果小于核心线程数,则启动核心线程执行任务,如果线程数为空,则启动非核心线程
1
2
3
4
5
6
7
csharp复制代码void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}

ScheduledFutureTask的run函数

  1. 获取是否是周期性任务
  2. 判断是否线程池状态是否可以执行任务,如果为true,则取消任务
    3 如果是非周期性任务,则直接调用父类FutureTask的run方法,
    4 如果是周期性任务,则调用FutureTask的runAndReset函数,
    如果该函数返回为true,则调用setNextRunTime设置下一次运行的时间,
    并且还行reExecutePeriodic再次执行周期性任务。
1
2
3
4
5
6
7
8
9
10
11
scss复制代码public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
  1. 判断线程池是否处于可执行任务的状态,如果为true,则重新将设置下一次运行时间的任务加入父类的等待队列,
  2. 如果线程池处于不可运行任务的状态,则并且从等待队列中移除成功,
    调用任务的取消操作,否则调用ensurePrestart确保启动线程执行任务
1
2
3
4
5
6
7
8
9
scss复制代码void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}

DelayedWorkQueue类核心函数

DelayedWorkQueue是继承AbstractQueue,并实现BlockingQueue接口

1
2
scala复制代码static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {

核心字段

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码// 初始容量为16
private static final int INITIAL_CAPACITY = 16;
// 等待队列,只能保存RunnableScheduledFuture对象
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 锁
private final ReentrantLock lock = new ReentrantLock();
//对俄大小
private int size = 0;
// leader线程,表示最近需要执行的任务的线程。
private Thread leader = null;
// 条件锁
private final Condition available = lock.newCondition();

offer函数:

  1. 将添加的参数转换成RunnableScheduledFuture对象。
  2. 加全局锁。
  3. 获取当前队列的size,如果等于队列的长度,则嗲用grow扩容,增加50%的数组长度。
  4. size加1。
  5. 如果数组为0,则将加入的对象放在索引为0的位置, 然后设置ScheduledFutureTask的heapIndex的索引(便于后续快速删除)。
  6. 调用siftUp做堆的上浮操作,这里是小根堆的操作。
  7. 如果队列中第一个元素是传入的对象,则将laader设置null
  8. 释放锁
  9. 返回true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ini复制代码public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}

siftUp主要就是做小根堆的上移操作,从if (key.compareTo(e) >= 0) 看出,如果key大于parent索引的元素,则停止。

1
2
3
4
5
6
7
8
9
10
11
12
13
ini复制代码private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}

poll函数

  1. 加锁
  2. 获取队列中索引为0的云元素,若果为null或者第一个元素的执行时间戳时间大于当前时间则直接返回null,否则调用finishPoll将第一个元素返回.
  3. 释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
csharp复制代码public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return finishPoll(first);
} finally {
lock.unlock();
}
}
  1. 将队列size 减 1
  2. 获取队列中队列中最后一个元素,并且设置队列最后一个为null
  3. 最后一个元素不为null,则调用sfitdown进行,将最后一个元素设置到索引为0的位置,将下移操作,重新调整小根堆。
  4. ScheduledFutureTask的heapIndex为-1
1
2
3
4
5
6
7
8
9
ini复制代码private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}

ScheduledFutureTask的compareTo函数

ScheduledFutureTask实现compareTo方法逻辑

  1. 首先比较是否是同一个对象
  2. 若果是ScheduledFutureTask对象,则比较time的大小,time是下一次执行的任务的时间戳,如果不是,则比较
    getDelay的时间大小
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kotlin复制代码public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

ScheduledThreadPoolExecutor的take函数就是ThreadPoolExecutor的从任务队列中获取任务,没有任务则一直等待(这里是线程数小于核心线程数的情况)

  1. 加可中断锁
  2. 获取队列中第一个元素的任务,从前面可以知道此任务执行的时间戳最小的任务
  3. 如果第一个任务为空,则再全局的锁的条件锁上等待,
  4. 如果第一个任务不为空,则获取延迟时间,如果延时时间小于0,说明第一个任务已经到时间了,则返回第一个任务。
  5. 如果leader线程不为空,则让线程在全局锁的条件锁上等待
  6. 如果leader为空,则将获取第一个任务的当前线程赋值为leader变量。
  7. 在全局锁的条件锁上等待delay纳秒, 等待结束后,如果当前线程还是等于leader线程,则重置leader为空
  8. 最后判断 leader为空并且第一个任务不为空,则唤醒全局锁上条件锁的等待的线程。
  9. 释放全局锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
ini复制代码public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

总结

综合前面所述,线程池从DelayedWorkQueue每次取出的任务就是延迟时间最小的任务, 若果到达时间的任务,则执行任务,否则则用条件锁Conditon的wait进行等待,执行完后,则用signal进行唤醒下一个任务的执行。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%