一、异步任务的异常处理
- 如果在 supplyAsync 任务中出现异常,后续的 thenApply 和 thenAccept 回调都不会执行,CompletableFuture 将传入异常处理。
- 如果在第一个thenApply任务中出现异常,第二个 thenApply 和最后的 thenAccept 回调不会被执行,CompletableFuture 将转入异常处理,依次类推。
1、exceptionally
exceptionally 用于处理回调链上的异常, 回调链上出现的任何异常,回调链不继续向下执行,都在exceptionally中处理异常。
1 | java复制代码CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) |
因为 exceptionally 只处理一次异常,所以常常用在回调链的末端。
2、handle
CompletableFuture API 还提供了一种更通用的方法handle() 表示从异常中恢复 handle() 常常被用来恢复回调链中的一次特定的异常,回调链恢复后可进一步向下传递。
1 | java复制代码CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) |
1 | java复制代码public static void main(String[] args) throws ExecutionException, InterruptedException { |
异步任务不管是否发生异常,handle方法都会执行。所以,handle核心作用在于对上一步异步任务进行现场修复。
案例:对回调链中的一次异常进行恢复处理
1 | java复制代码public static void main(String[] args) throws ExecutionException, InterruptedException { |
和以往一样,为了提供并行化,异常处理可以方法单独的线程执行,以下是它们的异步回调版本:
1 | java复制代码CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) |
二、异步任务的交互
异步任务的交互是指在异步任务获取结果的速度相比较中,按一定的规则(先到先得)进行下一步处理。
1、applyToEither
applyToEither() 把两个异步任务做比较,异步任务先得到结果的,就对其获得的结果进行下一步操作。
1 | java复制代码public static void main(String[] args) throws ExecutionException, InterruptedException { |
以下是applyToEither 和其对应的异步回调版本:
1 | java复制代码CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) |
2、acceptEither
acceptEither()把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步操作(消费使用)。
1 | java复制代码public static void main(String[] args) throws ExecutionException, InterruptedException { |
以下是acceptEither和其对应的异步回调版本:
1 | java复制代码CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) |
3、runAfterEither
如果不关心最先到达的结果,只想在有一个异步任务完成时得到完成的通知,可以使用 runAfterEither()。
1 | java复制代码CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) |
1 | java复制代码public static void main(String[] args) { |
三、get()和join()的区别
get() 和 join() 都是CompletableFuture提供的以阻塞方式获取结果的方法。
使用时,我们发现,get() 抛出检查时异常,需要程序必须处理;而join() 方法抛出运行时异常,程序可以不处理。所以, join()更适合用在流式编程中。
四、ParallelStream VS CompletableFuture
1 | java复制代码public class MyTask { |
1、使用串行流执行并统计总耗时
1 | java复制代码 public static void main(String[] args) { |
2、使用并行流执行并统计总耗时
1 | java复制代码public static void main(String[] args) { |
3、使用CompletableFutre执行并统计总耗时
1 | java复制代码public static void main(String[] args) { |
4、使用串行流和CompletableFutre组合执行并统计总耗时(优化:指定线程数量)
1 | java复制代码public static void main(String[] args) { |
5、合理配置线程池中的线程数
正如我们看到的,CompletableFuture 可以更好的控制线程池的数量,而 parallelStream 不能。
问题1:如何选用 CompletableFuture 和 ParallelStream?
如果你的任务是IO密集型,你应该使用 CompletableFuture。
如果你的任务是CPU密集型,使用比处理器更多的线程是没有意义的,所以选择 ParallelSteam,因为它不需要创建线程池,更容易使用。
问题2:IO密集型任务和CPU密集型任务的区别?
CPU密集型也叫计算密集型,此时,系统运行时大部分的状况是CPU占用率近乎100%,I/O在很短的时间可以完成,而CPU还有许多运算要处理,CPU使用率很高。比如计算1+2+3…+10万亿、天文计算、圆周率后几十位等,都属于CPU密集型程序。
CPU密集型任务的特点:大量计算,CPU占用率一般都很高,I/O时间很短
IO密集型指大部分的状况是CPU在等I/O(硬盘/内存)的读写操作,但CPU的使用率不高。
简单的说,就是需要大量的输入输出,例如读写文件、传输文件,网络请求。 IO密集型任务的特点:大量网络请求,文件操作,CPU运算少,很多时候CPU在等待资源才能进一步操作。
问题3:既然要控制线程池的数量,多少合适呢?
如果是CPU密集型任务,就需要尽量压榨CPU,参数值可以设为 Ncpu + 1。
如果是IO密集型任务,参考值可以设置为 2 * Ncpu,其中 Ncpu 表示核心数。
五、大数据商品比价
1、需求描述和分析
实现一个大数据比价服务,价格数据可以从京东、天猫、拼多多等平台去获取指定商品的价格、优惠金额,然后计算出实际付款金额(商品价格 -优惠金额),最终返回价格最优的平台与价格信息。
2、构建工具类和实体类
1 | java复制代码public class PriceResult { |
1 | java复制代码//获取当前时间 |
3、构建 HttpRequest
HttpRequest 用于模拟网络请求(耗时的操作)。
1 | java复制代码package com.zxh.base.concurrency; |
4、使用串行的方式操作商品比价
1 | java复制代码riceResult priceResult; |
5、使用Future+线程池增加并行
1 | java复制代码public PriceResult getCheapestPlatformPrice2(String productName) { |
6、使用CompletableFuture进一步增强并行
1 | java复制代码public PriceResult getCheapestPlatformPrice3(String productName) { |
7、需求变更:同一个平台比较同款产品(iPhone15)不同色系的价格
1 | java复制代码//需求变更:同一个平台比较同款产品(iPhone15)不同色系的价格 |
本文转载自: 掘金