并发编程的扩展-Future&CompletableFutu

Future模式是多线程并发编程的扩展,这种方式支持返回结果,使用get()方法阻塞当前线程。它的核心思想是异步调用,当我们执行某个函数时,它可能很慢,但是我们又不着急要结果。因此,我们可以让它立即返回,让它异步去执行这个请求,当我们需要结果时,阻塞调用线程获取结果。

一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:

  • get():获取结果(可能会等待)
  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
  • cancel(boolean mayInterruptIfRunning):取消当前任务;
  • isDone():判断任务是否已完成。

商品查询

一个简单的生产实践例子,在维护促销活动时需要查询商品信息(包括商品基本信息、商品价格、商品库存、商品图片、商品销售状态等)。这些信息分布在不同的业务中心,由不同的系统提供服务。假设一个接口需要50ms,那么一个商品查询下来就需要200ms-300ms,这对于我们来说时不满意的。如果使用Future改造则需要的就是最长耗时服务的接口,也就是50ms左右。

Future商品查询.png

当然这里并不能解决的是一个接口服务突然很慢的问题,如果要解决这个问题,需要辅助其他组件(流控,降级等)。
伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
java复制代码public class FutureTest {
static class T1Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T1:查询商品基本信息...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品基本信息查询成功";
}
}

static class T2Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T2:查询商品价格...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品价格查询成功";
}
}

static class T3Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T3:查询商品库存...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品库存查询成功";
}
}

static class T4Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T4:查询商品图片...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品图片查询成功";
}
}

static class T5Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T5:查询商品销售状态...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品销售状态查询成功";
}
}

public static void main(String[] args) throws InterruptedException {
FutureTask<String> ft1 = new FutureTask<>(new T1Task());
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
FutureTask<String> ft3 = new FutureTask<>(new T3Task());
FutureTask<String> ft4 = new FutureTask<>(new T4Task());
FutureTask<String> ft5 = new FutureTask<>(new T5Task());
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.submit(ft1);
executorService.submit(ft2);
executorService.submit(ft3);
executorService.submit(ft4);
executorService.submit(ft5);
// 创建阻塞队列
BlockingQueue<String> bq = new LinkedBlockingQueue<>();
System.out.println(System.currentTimeMillis());
executorService.execute(() -> {
try {
bq.put(ft1.get());
bq.put(ft2.get());
bq.put(ft3.get());
bq.put(ft4.get());
bq.put(ft4.get());
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.println(System.currentTimeMillis());
for (int i = 0; i < 5; i++) {
System.out.println(bq.take());
}
executorService.shutdown();
}
}

工作原理

Future.png

增强的Future:CompletableFuture

CompletableFuture时Java8新增的一个超大型工具类。它不仅实现了Future接口,还实现了CompletionStage接口,该接口总共拥有40多种方法(为了函数式编程中的流程调用准备的)。

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

CompletableFuture针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(
CompletableFutureDemo::fetchPrice);
// 如果执行成功:
cf.thenAccept((result) -> System.out.println("price: " + result));
// 如果执行异常:
cf.exceptionally((e) -> {
e.printStackTrace();
return null;
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(200);
}

static Double fetchPrice() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
if (Math.random() < 0.3) {
throw new RuntimeException("fetch price failed!");
}
return 5 + Math.random() * 20;
}
}

创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:

1
2
3
java复制代码public interface Supplier<T> {
T get();
}

这里我们用lambda语法简化了一下,直接传入CompletableFutureDemo::fetchPrice,因为CompletableFutureDemo.fetchPrice()静态方法的签名符合Supplier接口的定义(除了方法名外)。

紧接着,CompletableFuture已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例。完成时,CompletableFuture会调用Consumer对象:

1
2
3
java复制代码public interface Consumer<T> {
void accept(T t);
}

异常时,CompletableFuture会调用:

1
2
3
java复制代码public interface Function<T, R> {
R apply(T t);
}

可见CompletableFuture的优点是:

  • 异步任务结束时,会自动回调某个对象的方法;
  • 异步任务出错时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行。

除此之外,CompletableFuture还允许将多个CompletableFuture进行组合。

CompletionStage接口

描述and汇聚关系:

  1. thenCombine:任务合并,有返回值
  2. thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。
  3. runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。

描述or汇聚关系

  1. applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值。
  2. acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。
  3. runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。

CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行

重写商品查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码public class CompletableFutureTest {

public static void main(String[] args) {


List<String> queryList = Lists.newArrayList("商品基本信息", "商品价格", "商品库存", "商品图片", "商品销售状态");

List<CompletableFuture<String>> futureList = queryList.stream()
.map(v -> CompletableFuture.supplyAsync(() -> doQuery(v)))
.collect(Collectors.toList());

CompletableFuture<Void> allCompletableFuture = CompletableFuture
.allOf(futureList.toArray(new CompletableFuture[0]));

List<String> resultList = allCompletableFuture.thenApply(e ->
futureList.stream().map(CompletableFuture::join).collect(Collectors.toList())).join();
System.out.println(resultList);

}

private static String doQuery(String type) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 省略代码了
return type + "查询成功";
}
}

CompletableFuture还有很多花样,毕竟有那么多接口方法,留待读者们自己去尝试吧。感谢阅读!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%