并发编程神器CompletableFuture高级用法与实战

一、异步任务的异常处理

  • 如果在 supplyAsync 任务中出现异常,后续的 thenApply 和 thenAccept 回调都不会执行,CompletableFuture 将传入异常处理。
  • 如果在第一个thenApply任务中出现异常,第二个 thenApply 和最后的 thenAccept 回调不会被执行,CompletableFuture 将转入异常处理,依次类推。

1、exceptionally

exceptionally 用于处理回调链上的异常, 回调链上出现的任何异常,回调链不继续向下执行,都在exceptionally中处理异常。

1
java复制代码CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

image.png
因为 exceptionally 只处理一次异常,所以常常用在回调链的末端。

2、handle

CompletableFuture API 还提供了一种更通用的方法handle() 表示从异常中恢复 handle() 常常被用来恢复回调链中的一次特定的异常,回调链恢复后可进一步向下传递。

1
java复制代码CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> handle = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return "result";
}).handle((s, e) -> {
if (Objects.nonNull(e)) {
System.out.println("出现异常:" + e.getMessage());
return "unKnown";
}
return s;
});
CommonUtils.printTheadLog("main continue");
String ret = handle.get();
CommonUtils.printTheadLog("ret = " + ret);
CommonUtils.printTheadLog("main end");
}

异步任务不管是否发生异常,handle方法都会执行。所以,handle核心作用在于对上一步异步任务进行现场修复。
案例:对回调链中的一次异常进行恢复处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> handle = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return "result1";
}).handle((s, e) -> {
if (Objects.nonNull(e)) {
System.out.println("出现异常:" + e.getMessage());
return "unKnown1";
}
return s;
}).thenApply(reuslt -> {
String str = null;
int length = str.length();
return reuslt + "result2";
}).handle((s, throwable) -> {
if (Objects.nonNull(throwable)) {
System.out.println("出现异常" + throwable.getMessage());
return "unknown2";
}
return s;
}).thenApply(s -> s + "result3");
CommonUtils.printTheadLog("main continue");
String ret = handle.get();
CommonUtils.printTheadLog("ret = " + ret);
CommonUtils.printTheadLog("main end");
}

和以往一样,为了提供并行化,异常处理可以方法单独的线程执行,以下是它们的异步回调版本:

1
2
3
4
5
6
7
java复制代码CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) // jdk17+
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) // jdk17+

CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

二、异步任务的交互

异步任务的交互是指在异步任务获取结果的速度相比较中,按一定的规则(先到先得)进行下一步处理。

1、applyToEither

applyToEither() 把两个异步任务做比较,异步任务先得到结果的,就对其获得的结果进行下一步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public static void main(String[] args) throws ExecutionException, InterruptedException {
//异步任务1
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int x = new Random().nextInt(3);
CommonUtils.sleepSecond(x);
CommonUtils.printTheadLog("任务1耗时" + x + "秒");
return x;
});

//异步任务2
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int y = new Random().nextInt(3);
CommonUtils.sleepSecond(y);
CommonUtils.printTheadLog("任务2耗时" + y + "秒");
return y;
});

//哪个异步任务结果先到达,使用哪个异步任务的结果
CompletableFuture<Integer> future3 = future1.applyToEither(future2, result -> {
CommonUtils.printTheadLog("最先到达的是" + result);
return result;
});
CommonUtils.sleepSecond(4);
Integer ret = future3.get();
CommonUtils.printTheadLog("ret ="+ret);
}

以下是applyToEither 和其对应的异步回调版本:

1
2
3
java复制代码CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)

2、acceptEither

acceptEither()把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步操作(消费使用)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public static void main(String[] args) throws ExecutionException, InterruptedException {
//异步任务1
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int x = new Random().nextInt(3);
CommonUtils.sleepSecond(x);
CommonUtils.printTheadLog("任务1耗时" + x + "秒");
return x;
});

//异步任务2
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int y = new Random().nextInt(3);
CommonUtils.sleepSecond(y);
CommonUtils.printTheadLog("任务2耗时" + y + "秒");
return y;
});

//哪个异步任务结果先到达,使用哪个异步任务的结果
future1.acceptEither(future2, result -> {
CommonUtils.printTheadLog("最先到达的是" + result);
});
CommonUtils.sleepSecond(4);
}

以下是acceptEither和其对应的异步回调版本:

1
2
3
java复制代码CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)

3、runAfterEither

如果不关心最先到达的结果,只想在有一个异步任务完成时得到完成的通知,可以使用 runAfterEither()。

1
2
3
java复制代码CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public static void main(String[] args) {
//异步任务1
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int x = new Random().nextInt(3);
CommonUtils.sleepSecond(x);
CommonUtils.printTheadLog("任务1耗时" + x + "秒");
return x;
});
//异步任务2
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int y = new Random().nextInt(3);
CommonUtils.sleepSecond(y);
CommonUtils.printTheadLog("任务2耗时" + y + "秒");
return y;
});
future1.runAfterEither(future2, () -> {
CommonUtils.printTheadLog("有一个异步任务执行完成");
});
CommonUtils.sleepSecond(4);
}

三、get()和join()的区别

get() 和 join() 都是CompletableFuture提供的以阻塞方式获取结果的方法。
使用时,我们发现,get() 抛出检查时异常,需要程序必须处理;而join() 方法抛出运行时异常,程序可以不处理。所以, join()更适合用在流式编程中。

四、ParallelStream VS CompletableFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class MyTask {

private int duration;

public MyTask(int duration) {
this.duration = duration;
}

// 模拟耗时的长任务
public int doWork() {
CommonUtils.printTheadLog("doWork");
CommonUtils.sleepSecond(duration);
return duration;
}
}

1、使用串行流执行并统计总耗时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码 public static void main(String[] args) {
//需求:创建10个MyTask耗时的任务,统计它们执行完的总耗时
//方案一:在主线程中使用串行执行
//step 1: 创建1日个MyTask对象,每个任务持续1s,存入List集合
IntStream intStream = IntStream.range(0, 10);
List<MyTask> tasks = intStream.mapToObj(item -> {
return new MyTask(1);
}).collect(Collectors.toList());

//step 2: 执行10个MyTask,统计总耗时
long start = System.currentTimeMillis();
List<Integer> results = tasks.stream().map(myTask -> {
return myTask.doWork();
}).collect(Collectors.toList());

long end = System.currentTimeMillis();

double costTime = (end - start) / 1000.0;
System.out.printf("processed %d tasks %.2f second", tasks.size(), costTime);
}

2、使用并行流执行并统计总耗时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public static void main(String[] args) {

//需求:创建10个MyTask耗时的任务,统计它们执行完的总耗时
//方案二:使用并行流
//step 1: 创建1日个MyTask对象,每个任务持续1s,存入List集合
IntStream intStream = IntStream.range(0, 10);
List<MyTask> tasks = intStream.mapToObj(item -> {
return new MyTask(1);
}).collect(Collectors.toList());

//step 2: 执行10个MyTask,统计总耗时
long start = System.currentTimeMillis();
List<Integer> results = tasks.parallelStream().map(myTask -> {
return myTask.doWork();
}).collect(Collectors.toList());

long end = System.currentTimeMillis();

double costTime = (end - start) / 1000.0;
System.out.printf("processed %d tasks %.2f second", tasks.size(), costTime);
}

3、使用CompletableFutre执行并统计总耗时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码public static void main(String[] args) {

//需求:创建10个MyTask耗时的任务,统计它们执行完的总耗时
//方案三:使用并行流和CompletableFutre组合使用
//step 1: 创建1日个MyTask对象,每个任务持续1s,存入List集合
IntStream intStream = IntStream.range(0, 10);
List<MyTask> tasks = intStream.mapToObj(item -> {
return new MyTask(1);
}).collect(Collectors.toList());

//step 2: 根据MyTask对象构建10个耗时的异步任务
long start = System.currentTimeMillis();
// List<CompletableFuture<Integer>> futures = tasks.parallelStream().map(myTask -> {
// return CompletableFuture.supplyAsync(() -> {
// return myTask.doWork();
// });
// }).collect(Collectors.toList());

List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {
return CompletableFuture.supplyAsync(() -> {
return myTask.doWork();
});
}).collect(Collectors.toList());


//step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入L1st集合中
List<Integer> results = futures.stream().map(future -> {
return future.join();
}).collect(Collectors.toList());
long end = System.currentTimeMillis();

double costTime = (end - start) / 1000.0;
System.out.printf("processed %d tasks %.2f second", tasks.size(), costTime);

}

4、使用串行流和CompletableFutre组合执行并统计总耗时(优化:指定线程数量)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码public static void main(String[] args) {

// CompletableFuture 在流式操作中的优势
// 需求: 创建10个 MyTask 耗时的任务, 统计它们执行完的总耗时
// 方案四:使用CompletableFuture(指定线程数量)

// step 1: 创建10个MyTask对象,每个任务持续1s, 存入List集合
IntStream intStream = IntStream.range(0, 10);
List<MyTask> tasks = intStream.mapToObj(item -> {
return new MyTask(1);
}).collect(Collectors.toList());

// 准备线程池
int N_CPU = Runtime.getRuntime().availableProcessors();
// 设置线程池中的线程的数量至少为10
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(),N_CPU * 2));

// step 2: 根据MyTask对象构建10个异步任务
List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {
return CompletableFuture.supplyAsync(()-> {
return myTask.doWork();
},executor);
}).collect(Collectors.toList());

// step 3: 执行异步任务,执行完成后,获取异步任务的结果,存入List集合中,统计总耗时
long start = System.currentTimeMillis();
List<Integer> results = futures
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long end = System.currentTimeMillis();

double costTime = (end - start) / 1000.0;
System.out.printf("processed %d tasks %.2f second", tasks.size(), costTime);

// 关闭线程池
executor.shutdown();

/**
* 总结
* CompLetabLeFuture可以控制更多的线程数量,而ParalLelstream不能
*/
}

5、合理配置线程池中的线程数

正如我们看到的,CompletableFuture 可以更好的控制线程池的数量,而 parallelStream 不能。
问题1:如何选用 CompletableFuture 和 ParallelStream?
如果你的任务是IO密集型,你应该使用 CompletableFuture。
如果你的任务是CPU密集型,使用比处理器更多的线程是没有意义的,所以选择 ParallelSteam,因为它不需要创建线程池,更容易使用。
问题2:IO密集型任务和CPU密集型任务的区别?
CPU密集型也叫计算密集型,此时,系统运行时大部分的状况是CPU占用率近乎100%,I/O在很短的时间可以完成,而CPU还有许多运算要处理,CPU使用率很高。比如计算1+2+3…+10万亿、天文计算、圆周率后几十位等,都属于CPU密集型程序。
CPU密集型任务的特点:大量计算,CPU占用率一般都很高,I/O时间很短
IO密集型指大部分的状况是CPU在等I/O(硬盘/内存)的读写操作,但CPU的使用率不高。
简单的说,就是需要大量的输入输出,例如读写文件、传输文件,网络请求。 IO密集型任务的特点:大量网络请求,文件操作,CPU运算少,很多时候CPU在等待资源才能进一步操作。
问题3:既然要控制线程池的数量,多少合适呢?
如果是CPU密集型任务,就需要尽量压榨CPU,参数值可以设为 Ncpu + 1。
如果是IO密集型任务,参考值可以设置为 2 * Ncpu,其中 Ncpu 表示核心数。

五、大数据商品比价

1、需求描述和分析

实现一个大数据比价服务,价格数据可以从京东、天猫、拼多多等平台去获取指定商品的价格、优惠金额,然后计算出实际付款金额(商品价格 -优惠金额),最终返回价格最优的平台与价格信息。

2、构建工具类和实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
java复制代码public class PriceResult {

private int price;
private int discount;
private int realPrice;
private String platform;

public PriceResult() {
}

public PriceResult(String platform) {
this.platform = platform;
}

public PriceResult(int price, int discount, int realPrice, String platform) {
this.price = price;
this.discount = discount;
this.realPrice = realPrice;
this.platform = platform;
}

public int getPrice() {
return price;
}

public void setPrice(int price) {
this.price = price;
}

public int getDiscount() {
return discount;
}

public void setDiscount(int discount) {
this.discount = discount;
}

public int getRealPrice() {
return realPrice;
}

public void setRealPrice(int realPrice) {
this.realPrice = realPrice;
}

public String getPlatform() {
return platform;
}

public void setPlatform(String platform) {
this.platform = platform;
}

@Override
public String toString() {
return "PriceResult{" +
"平台='" + platform + '\'' +
", 平台价=" + price +
", 优惠价=" + discount +
", 最终价=" + realPrice +
'}';
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码//获取当前时间
private static String getCurrentTime() {
LocalTime now = LocalTime.now();
return now.format(DateTimeFormatter.ofPattern("[HH:mm::ss.SS"));
}

// 打印输出带线程信息的日志
public static void printTheadLog1(String message) {
// 当前时间 | 线程id | 线程名 | 日志信息
String result = new StringJoiner(" | ")
.add(getCurrentTime())
.add(String.format("%2d", Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(message)
.toString();
System.out.println(result);
}

3、构建 HttpRequest

HttpRequest 用于模拟网络请求(耗时的操作)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码package com.zxh.base.concurrency;

import com.zxh.base.concurrency.domin.PriceResult;
import com.zxh.base.utils.CommonUtils;

public class HttpRequest {

private static void mockCostTimeOperation() {
CommonUtils.sleepSecond(1);
}

// 获取淘宝平台的商品价格
public static PriceResult getTaobaoPrice(String productName) {
CommonUtils.printTheadLog("获取淘宝上" + productName + "价格");
mockCostTimeOperation();
PriceResult priceResult = new PriceResult("淘宝");
priceResult.setPrice(5199);
CommonUtils.printTheadLog("获取淘宝上" + productName + "价格完成:5199");
return priceResult;
}

// 获取淘宝平台的优惠
public static int getTaoBaoDiscount(String productName) {
CommonUtils.printTheadLog("获取淘宝上" + productName + "优惠");
mockCostTimeOperation();
CommonUtils.printTheadLog("获取淘宝上" + productName + "优惠完成:-200");
return 200;
}

// 获取京东平台的商品价格
public static PriceResult getJDongPrice(String productName) {
CommonUtils.printTheadLog1("获取京东上" + productName + "价格");
mockCostTimeOperation();
PriceResult priceResult = new PriceResult("淘宝");
priceResult.setPrice(5299);
CommonUtils.printTheadLog1("获取京东上" + productName + "价格完成:5299");
return priceResult;
}

// 获取京东平台的优惠
public static int getJDongDiscount(String productName) {
CommonUtils.printTheadLog1("获取京东上" + productName + "优惠");
mockCostTimeOperation();
CommonUtils.printTheadLog1("获取京东上" + productName + "优惠完成:-150");
return 150;
}

// 获取拼多多平台的商品价格
public static PriceResult getPDDPrice(String productName) {
CommonUtils.printTheadLog1("获取拼多多上" + productName + "价格");
mockCostTimeOperation();
PriceResult priceResult = new PriceResult("拼多多");
priceResult.setPrice(5399);
CommonUtils.printTheadLog1("获取拼多多上" + productName + "价格完成:5399");
return priceResult;
}

// 获取拼多多平台的优惠
public static int getPDDDiscount(String productName) {
CommonUtils.printTheadLog1("获取拼多多上" + productName + "优惠");
mockCostTimeOperation();
CommonUtils.printTheadLog1("获取拼多多上" + productName + "优惠完成:-5300");
return 5300;
}

}

4、使用串行的方式操作商品比价

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码riceResult priceResult;
int discount;

// 获取淘宝平台的商品价格和优惠
priceResult = HttpRequest.getTaobaoPrice(productName);
discount = HttpRequest.getTaoBaoDiscount(productName);
PriceResult taoBaoPriceResult = this.computeRealPrice(priceResult, discount);

// 获取京东平台的商品价格和优惠
priceResult = HttpRequest.getJDongPrice(productName);
discount = HttpRequest.getJDongDiscount(productName);
PriceResult jDongPriceResult = this.computeRealPrice(priceResult, discount);

// 获取拼多多平台的商品价格和优惠
priceResult = HttpRequest.getPDDPrice(productName);
discount = HttpRequest.getPDDDiscount(productName);
PriceResult pddPriceResult = this.computeRealPrice(priceResult, discount);

// 计算最优的平台和价格
Stream<PriceResult> stream = Stream.of(taoBaoPriceResult, jDongPriceResult, pddPriceResult);
Optional<PriceResult> minOpt = stream.min(Comparator.comparing(priceRes -> {
return priceRes.getRealPrice();
}));
PriceResult result = minOpt.get();
return result;

5、使用Future+线程池增加并行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码public PriceResult getCheapestPlatformPrice2(String productName) {
// 线程池
ExecutorService executor = Executors.newFixedThreadPool(4);

// 获取淘宝平台的商品价格和优惠
Future<PriceResult> taobaoFuture = executor.submit(() -> {
PriceResult priceResult = HttpRequest.getTaobaoPrice(productName);
int discount = HttpRequest.getTaoBaoDiscount(productName);
return this.computeRealPrice(priceResult, discount);
});

// 获取京东平台的商品价格和优惠
Future<PriceResult> jdFuture = executor.submit(() -> {
PriceResult priceResult = HttpRequest.getJDongPrice(productName);
int discount = HttpRequest.getJDongDiscount(productName);
return this.computeRealPrice(priceResult, discount);
});


// 获取拼多多平台的商品价格和优惠
Future<PriceResult> pddFuture = executor.submit(() -> {
PriceResult priceResult = HttpRequest.getPDDPrice(productName);
int discount = HttpRequest.getPDDDiscount(productName);
return this.computeRealPrice(priceResult, discount);
});

// 计算最优的平台和价格
PriceResult priceResult = Stream.of(taobaoFuture, jdFuture, pddFuture)
.map(item -> {
try {
//假设延时5s后,就不要它的结果,所以返回一个空
return item.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
return null;
}finally {
executor.shutdown();
}
}).filter(Objects::nonNull)
.min(Comparator.comparing(PriceResult::getRealPrice)).get();

return priceResult;
}

6、使用CompletableFuture进一步增强并行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码public PriceResult getCheapestPlatformPrice3(String productName) {

// 获取淘宝平台的商品价格和优惠
CompletableFuture<PriceResult> taobaofuture = CompletableFuture.supplyAsync(() -> HttpRequest.getTaobaoPrice(productName))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequest.getTaoBaoDiscount(productName)), (priceRsult, discount) -> {
return this.computeRealPrice(priceRsult, discount);
});


// 获取京东平台的商品价格和优惠
CompletableFuture<PriceResult> jdfuture = CompletableFuture.supplyAsync(() -> HttpRequest.getJDongPrice(productName))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequest.getJDongDiscount(productName)), (priceRsult, discount) -> {
return this.computeRealPrice(priceRsult, discount);
});


// 获取拼多多平台的商品价格和优惠
CompletableFuture<PriceResult> pddfuture = CompletableFuture.supplyAsync(() -> HttpRequest.getPDDPrice(productName))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequest.getPDDDiscount(productName)), (priceRsult, discount) -> {
return this.computeRealPrice(priceRsult, discount);
});


// 计算最优的平台和价格
PriceResult priceResult = Stream.of(taobaofuture, jdfuture, pddfuture)
.map(future -> future.join())
.min(Comparator.comparing(item -> item.getRealPrice()))
.get();
return priceResult;
}

7、需求变更:同一个平台比较同款产品(iPhone15)不同色系的价格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码//需求变更:同一个平台比较同款产品(iPhone15)不同色系的价格
public PriceResult batchComparePrice(List<String> products) {
// step 1:遍历每个商品的名字, 根据商品名称开启异步任务获取最终价, 归集到List集合中
List<CompletableFuture<PriceResult>> futureList = products.stream()
.map(productName -> {
return CompletableFuture.supplyAsync(() -> HttpRequest.getTaobaoPrice(productName))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequest.getTaoBaoDiscount(productName)), (priceRsult, discount) -> {
return this.computeRealPrice(priceRsult, discount);
});
}).collect(Collectors.toList());

// step 2: 把多个商品的最终价进行排序获取最小值
PriceResult priceResult = futureList.stream()
.map(future -> future.join())
.sorted(Comparator.comparing(item -> item.getRealPrice()))
.findFirst()
.get();
return priceResult;
}

public static void main(String[] args) {
// 异步任务的批量操作
// 测试在一个平台比较同款产品(iPhone15)不同色系的价格
ComparePriceService service = new ComparePriceService();
long start = System.currentTimeMillis();
PriceResult priceResult = service.batchComparePrice(Arrays.asList("iphone15午夜黑","iphone15白色","iphone15淡青"));
long end = System.currentTimeMillis();
double costTime = (end - start)/1000.0;
System.out.printf("cost %.2f second processed\n",costTime);
System.out.println("priceResult = " + priceResult);
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%