netty(九)初识Netty-Future & Promi

「这是我参与11月更文挑战的第14天,活动详情查看:2021最后一次更文挑战」。

一、简介

在netty当中,我们需要进行异步处理的时候,经常会调用以下的两个方法:

Future & Promise

其实在我们使用JDK的时候,就知道有一个Future接口,用于异步时接收任务结果。

在Netty当中,基于JDK当中的Future接口,进行了扩展;后面又在Netty的Future基础之上,增加了Promise接口。

关于三者的关系请看以下的类图:

image.png

  • JDK Future:只能同步等待任务结束(无论成功还是失败)才能得到结果。
  • Netty Future:可以同步等待任务结束得到结果,也可以异步方式得到结果,前提是任务必须要结束。
  • Netty Promise:不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。

二、扩展了哪些主要能力?

在这一章节,主要分析前面提到的netty新增的两个接口都新增了哪些功能。

功能/名称 jdk Future netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果

三、使用示例

下面针对Promise我们重点学习,看看针对不同场景下的使用。

例1 同步处理任务成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
csharp复制代码    public static void main(String[] args) throws ExecutionException, InterruptedException {
DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +", set success: " + 10);
promise.setSuccess(10);
});

System.out.println(Thread.currentThread().getName() +", start...");
//未产生结果,不阻塞,返回null
System.out.println(Thread.currentThread().getName() +", " + promise.getNow());
//阻塞等待结果
System.out.println(Thread.currentThread().getName() +", " + promise.get());
}

结果:

1
2
3
4
less复制代码main, start...
main, null
defaultEventLoop-1-1, set success: 10
main, 10

例2 异步处理任务成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
arduino复制代码    public static void main(String[] args) throws ExecutionException, InterruptedException {
DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +", set success: " + 10);
promise.setSuccess(10);
});

System.out.println(Thread.currentThread().getName() +", start...");

//添加异步监听,当有结果时调用getNow获取结果
promise.addListener(future->{
System.out.println(Thread.currentThread().getName() +", " + future.getNow());
});
}

结果:

1
2
3
arduino复制代码main, start...
defaultEventLoop-1-1, set success: 10
defaultEventLoop-1-1, 10

例3 同步处理任务失败 sync & get

sync或者get,区别是get会对异常信息再包一层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
scss复制代码    public static void main(String[] args) throws ExecutionException, InterruptedException {
DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +", set failure");
promise.setFailure(new RuntimeException());
});

System.out.println(Thread.currentThread().getName() +", start...");
//未产生结果,不阻塞,返回null
System.out.println(Thread.currentThread().getName() +", " + promise.getNow());
//阻塞等待结果
System.out.println(Thread.currentThread().getName() +", " + promise.get());
//System.out.println(Thread.currentThread().getName() +", " + promise.get());
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
less复制代码main, start...
main, null
defaultEventLoop-1-1, set failure
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException
at io.netty.util.concurrent.DefaultPromise.get(DefaultPromise.java:349)
at com.cloud.bssp.netty.promise.Test3.main(Test3.java:34)
Caused by: java.lang.RuntimeException
at com.cloud.bssp.netty.promise.Test3.lambda$main$0(Test3.java:27)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

例4 同步处理任务失败 await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
scss复制代码    public static void main(String[] args) throws ExecutionException, InterruptedException {
DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +", set failure");
promise.setFailure(new RuntimeException());
});

System.out.println(Thread.currentThread().getName() +", start...");
//未产生结果,不阻塞,返回null
System.out.println(Thread.currentThread().getName() +", " + promise.getNow());
//阻塞等待结果
promise.await();
//isSuccess判断任务是否成功
System.out.println(Thread.currentThread().getName() +", " + promise.isSuccess());
}

结果:

1
2
3
4
csharp复制代码main, start...
main, null
defaultEventLoop-1-1, set failure
main, false

例5 异步处理任务失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
scss复制代码    public static void main(String[] args) {
DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ", set failure");
promise.setFailure(new RuntimeException());
});

System.out.println(Thread.currentThread().getName() + ", start...");

//添加异步监听,当有结果时调用getNow获取结果
promise.addListener(future -> {
if (!promise.isSuccess()) {
//失败查看结果
System.out.println(Thread.currentThread().getName() + ", " + promise.cause());
}
System.out.println(Thread.currentThread().getName() + ", " + future.getNow());
});
}

结果:

1
2
3
4
csharp复制代码main, start...
defaultEventLoop-1-1, set failure
defaultEventLoop-1-1, java.lang.RuntimeException
defaultEventLoop-1-1, null

例6 await 死锁检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
csharp复制代码    public static void main(String[] args) {
DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.submit(()->{
System.out.println("1");
try {
promise.await();
// 注意不能仅捕获 InterruptedException 异常
// 否则 死锁检查抛出的 BlockingOperationException 会继续向上传播
// 而提交的任务会被包装为 PromiseTask,它的 run 方法中会 catch 所有异常然后设置为 Promise 的失败结果而不会抛出
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("2");
});
eventExecutors.submit(()->{
System.out.println("3");
try {
promise.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("4");
});
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
less复制代码1
2
3
4
io.netty.util.concurrent.BlockingOperationException: DefaultPromise@6a77ffc2(incomplete)
at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:461)
at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:246)
at com.cloud.bssp.netty.promise.Test6.lambda$main$0(Test6.java:21)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:106)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
io.netty.util.concurrent.BlockingOperationException: DefaultPromise@6a77ffc2(incomplete)
at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:461)
at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:246)
at com.cloud.bssp.netty.promise.Test6.lambda$main$1(Test6.java:33)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:106)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%