把Java多学一点 ------Java异步编程工具:Exe

同步、异步

同步:Synchronous

异步:Asynchronous

大家可以多看几遍这两个单词,熟记于心,因为不管是阅读异步框架代码,调用异步API,或者说编写异步UI程序、异步服务器程序,或多或少都会碰到这两个单词或者变体。

接下来我们简单了解一下这两个概念:

我们把处理器所要处理的计算称之为任务。

同步任务,即以同步方式执行任务,其任务的发起与任务的执行是同一条时间线上进行的,也就是说任务的发起与任务的执行是串行的。

异步任务,即以异步方式执行任务,其任务的发起和任务的执行是在不同的时间线上执行的,也就是说任务的发起与任务的执行是并发的。

再来说一下同步、异步任务的表现:

同步任务的发起线程在其发起该任务之后必须等待该任务执行结束才能执行其他操作,这种等待往往意味着阻塞。

异步任务的发起线程在其发起该任务之后不必等待该任务结束便可以继续执行其他操作,所以异步任务可以使发起线程不必因等待其执行结束而被阻塞,异步任务往往伴随着非阻塞。

💡这里说一个问题:同步任务一定表现为阻塞?异步任务一定表现为非阻塞?

答案是不对的。阻塞、非阻塞只是任务执行方式的一种属性,与任务的执行方式没有必然的联系。

同步任务也可能是非阻塞的:Polling。轮询指的是任务的发起线程在发起任务之后并不是等待任务执行结束再去执行后续的操作,而是不断地检查其发起的任务是否执行结束,如果任务执行结束则继续执行后续的操作,如果没有执行结束,则继续检查。所以说轮询状态下,发起线程的状态仍然是Runnable,并非阻塞状态下Blocked或Waiting状态,只是此时发起线程主要做的工作是检查其发起任务是否执行结束。

异步任务也可能伴随着阻塞。如果向线程池提交一个任务之后便马上获取任务的执行结果,而此时线程池中的工作线程并没有将任务执行完成,就会导致主线程阻塞等待。

💡再说一个问题:同步与异步取决于什么?

任务是同步执行还是异步执行是取决于任务的执行方式。

如果直接调用task的run方法执行任务,则该任务肯定是同步执行。

如果创建一个专门的线程来执行该任务:new Thread(Task).start(),或者说将该任务提交给线程池执行,那么该任务是异步执行。

另外:任务是同步还是异步还取决于我们的观察角度。

📎推荐阅读:

异步编程

unix5种IO模型

Executor框架

上面介绍同步异步提到了任务是同步还是异步与任务本身无关,而是由任务的执行方式决定的。

任务本身处理逻辑由Runnable和Callable接口进行抽象,表现为具体的方法签名:Runnable.run() Callable.call()

Executor接口则是对任务的执行方式进行了抽象,将任务的提交与任务执行细节进行解耦,可以屏蔽同步任务与异步任务执行的差异。任务的执行细节对任务的提交方来说是透明的,不管是同步执行还是异步执行,这也使得任务的提交方可以轻松更改任务的执行方式。

下面是JUC中Executor的实现类图:

Executor实现类图

可以看到,线程池ThreadPoolExecutor也是Executor的实现。

下面我们分析Executor接口是如何实现任务的执行方式与任务的处理逻辑解耦:

1
2
3
csharp复制代码public interface Executor {
   void execute(Runnable command);
}

可以看到,Executor接口很简单,内部只提供了一个execute方法。command参数代表要执行的任务,这个任务在将来某个时间会被执行,该任务可以在线程池、新线程和执行线程中执行。

Executor接口使得任务提交方只需要调用executor.execute方法便可以使得指定的任务被执行,而无需关心任务将如何运行,包括线程使用,调用机制等。

如何使用Executor的execute方法?任务的执行都有哪几种方式?

  1. Executor并没有严格规定任务的执行必须是异步的,所以可以直接在调用线程中执行任务。
1
2
3
4
5
typescript复制代码class DirectExecutor implements Executor {
 public void execute(Runnable r) {
   r.run();
}
}}
  1. 使用新线程执行任务,下面的Executor为每个任务生成一个新线程来执行任务。
1
2
3
4
5
typescript复制代码class ThreadPerTaskExecutor implements Executor {
 public void execute(Runnable r) {
   new Thread(r).start();
}
}}
  1. 许多Executor对任务的调度方式和时间加了限制,例如ThreadPoolExecutor。下面是将任务的提交序列化到另一个Executor,是一个复合Executor。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码 class SerialExecutor implements Executor {
  final Queue<Runnable> tasks = new ArrayDeque<>();
  final Executor executor;
  Runnable active;
 
  SerialExecutor(Executor executor) {
    this.executor = executor;
  }
 
  public synchronized void execute(Runnable r) {
    tasks.add(() -> {
      try {
        r.run();
      } finally {
        scheduleNext();
      }
    });
    if (active == null) {
      scheduleNext();
    }
  }

  protected synchronized void scheduleNext() {
    if ((active = tasks.poll()) != null) {
      executor.execute(active);
    }
}
}}

ExecutorService

Executor接口很简单,仅提供了execute方法供客户端调用,并且不能返回任务执行结果给客户端,其次,Executor接口实现类内部一般需要一些工作者线程来执行任务,当没有任务提交需要执行时,则需要主动关闭这些工作者线程,释放其资源。ExecutorService实现了Executor接口,主要为Executor补充了这两个功能。

ExecutorService提供了关闭的方法,关闭之后拒绝新任务提交。

1
2
3
4
5
6
7
8
9
10
csharp复制代码void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)
       throws InterruptedException;

提供了两个方法来关闭Executor:

shutdown方法是有序关闭,对先前提交的任务进行执行,但是不会再接受新的任务。

💡shutdown方法不会等待先前提交的任务执行完成,不是阻塞方法,使用awaitTermination方法可以做到这一点。

shutdownNow方法会尝试停止正在执行的任务,并取消等待任务的执行,返回等待任务列表。

💡shutdownNow方法通常是通过Thread.interrupt方法来取消正在执行的任务,所以说任何不能响应interrupt方法的任务都不会被取消,同样,shutdownNow方法也不会等待正在执行的任务终止,使用awaitTermination方法可以做到这一点。

另外ExecutorService提供了awaitTermination方法,该方法是阻塞方法,阻塞等待直到所有的任务在关闭请求后执行完成,或者超时,或者当前线程执行被中断。

ExecutorService还提供了两个方法来判断状态:isShutdown方法判断Executor是否已经关闭,isTerminated方法判断shutdown之后是否所有任务都已完成。

以下示例分两个阶段关闭ExecutorService,首先调用shutdown方法拒绝传入任务,然后在必要时候调用shutdownNow取消延时任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scss复制代码void shutdownAndAwaitTermination(ExecutorService pool) {
 pool.shutdown(); // Disable new tasks from being submitted
 try {
   // Wait a while for existing tasks to terminate
   if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
     pool.shutdownNow(); // Cancel currently executing tasks
     // Wait a while for tasks to respond to being cancelled
     if (!pool.awaitTermination(60, TimeUnit.SECONDS))
         System.err.println("Pool did not terminate");
  }
} catch (InterruptedException ie) {
   // (Re-)Cancel if current thread also interrupted
   pool.shutdownNow();
   // Preserve interrupt status
   Thread.currentThread().interrupt();
}
}}

ExecutorService提供了生成Future跟踪异步任务进度的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
swift复制代码<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
       throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                 long timeout, TimeUnit unit)
       throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
       throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                   long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException;

submit方法跟踪一个任务进度,invoke方法可以跟踪多个异步任务进度。

submit方法返回Future对象,泛型取决于提供的任务执行方法Runnable.run或者Callable.call方法的返回类型。任务完成后可以通过Future.get方法获取任务执行结果。

💡与Executor的execute方法对比,ExecutorService的submit方法最终也是通过执行execute方法来执行,但是submit可以捕获提交的任务执行过程抛出的异常,当调用Future.get获取任务执行结果时,会抛出RejectedExecutionException异常,而execute方法不能捕获异常。

ThreadPoolExecutor是ExecutorService的默认实现类。

工具类:Executors

Executors类是一个实用的工具类,为Executor、ExecutorService、ScheduledExecutorService、ThreadFactory、Callable都提供了工厂方法和一些实用方法。

下面列举看下Executors创建和返回ExecutorService实例的快捷方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                               60L, TimeUnit.SECONDS,
                               new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                               60L, TimeUnit.SECONDS,
                               new SynchronousQueue<Runnable>(),
                               threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegatedExecutorService
  (new ThreadPoolExecutor(1, 1,
                           0L, TimeUnit.MILLISECONDS,
                           new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
 return new FinalizableDelegatedExecutorService
  (new ThreadPoolExecutor(1, 1,
                           0L, TimeUnit.MILLISECONDS,
                           new LinkedBlockingQueue<Runnable>(),
                           threadFactory));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
                               0L, TimeUnit.MILLISECONDS,
                               new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
 return new ThreadPoolExecutor(nThreads, nThreads,
                               0L, TimeUnit.MILLISECONDS,
                               new LinkedBlockingQueue<Runnable>(),
                               threadFactory);
}

异步计算结果:Future

JUC提供了Future接口,表示异步计算的结果。提供了检查计算是否完成、等待计算完成以及检索计算结果的方法。计算结果只能在计算完成后使用get进行检索,否则阻塞,直到结果准备好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public interface Future<V> {

 // 尝试取消任务
 // 1. 任务已经完成、任务已经取消、任务由于其他原因不能被取消 都将返回失败
 // 2. 在调用cancle时,该任务尚未启动,则不运行任务
 // 3. 如果任务已经启动,则mayInterruptIfRunning参数确定是否应该中断执行该任务的线程来取消任务
 // 4. 该方法返回后,不管是否返回成功,后续调用isDone方法都返回true
 // 5. 该方法返回成功后,后续调用isCancleed方法都返回true
 boolean cancel(boolean mayInterruptIfRunning);
 
 // 判断计算是否在执行完之前被取消了
boolean isCancelled();  
 
 // 判断计算是否已经执行完成
 boolean isDone();
 
 // 阻塞检索计算结果
 V get() throws InterruptedException, ExecutionException;
 
 // 阻塞检索计算结果或者超时后返回
 V get(long timeout, TimeUnit unit)
   throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

Runnable和Callbale都是对任务进行抽象,任务的处理逻辑分别表现为Runnable.run()和Callable.call()两个方法签名,对比这两个类分别表示任务:

Runnable来表示任务,任务既可以交给一个专门的线程执行,也可以交给一个线程池或者Executor的任何实现类来执行(Executor的execute方法参数类型为Runnable),但是不能获得任务的执行结果。

Callable来代表任务,虽然可以通过ExecutorService.submit方法获取任务的执行结果,但是Callable只能交给ExecutorService的实现类,比如线程池来执行,而无法交给一个专门的工作线程或者Executor的实现类来执行,这使得Callable表示异步任务会使任务的执行方式大大受限。

JUC提供了FutureTask组合了Runnable和Callable分别代表任务的优势。

FutureTask实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable接口和Future接口:

由于继承了Runnable接口,FutureTask代表任务既可以交给专门的工作者线程执行,也可以交给Executor的其他任何实现类来执行。

由于继承了Future接口,FutureTask能够启动和取消异步任务、查看任务是否完成、检索任务结果。

💡当我们将FutureTask类型的任务作为参数传递给Executor.execute(Runnable task)方法的时候也可以获得任务的执行结果,一个工作者线程执行FutureTask的run方法,而另一个线程调用FutureTask的get方法来获取任务的执行结果。

分析ExecutorService的源代码的时候,我们可以看到submit方法参数类型既可以是Runnable也可以是Callable接口,下面我们看AbstractExecutorService.submit(Callable task)的实现:

1
2
3
4
5
6
7
8
9
scss复制代码public <T> Future<T> submit(Callable<T> task) {
 if (task == null) throw new NullPointerException();
 RunnableFuture<T> ftask = newTaskFor(task);
 execute(ftask);
 return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
 return new FutureTask<T>(callable);
}

该方法的返回实例便是FutureTask类型,通过FutureTask的构造方法FutureTask(Callable callable)将callable代表的任务实例转换为Runnable实例,然后提交给Executors.execute方法来执行,返回FutureTask实例,以便后面可以获取任务的执行结果。

FutureTask支持以回调的方式处理任务的执行结果,FutureTask.done方法在任务执行结束后会被调用,该方法是protected方法,所以可以在实现子类中覆盖该方法实现对任务结果的处理。

💡注意:如果在Future.done方法中的代码通过Future.get方法获取任务的处理结果,这个时候由于任务已经执行结束了,所以get方法不会阻塞,但是由于任务的结束任务的执行结束包括正常终止、异常终止、以及任务被取消而导致的终止,所以应该在调用FutureTask.get方法之前调用Future.isCanceld来判断任务是否被取消。​

完结

首先带大家回顾了同步、异步的概念,这两种编程模型的优缺点。然后深入了解了JUC中提供的两大异步编程工具:Executor和Future。这两个组件以及实现是Java异步编程或者说多线程编程的基础。

以上纯属个人阅读和分析所分享,如有不正确的地方欢迎指正。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%