netty(六)NIO、BIO与AIO 一、BIO与NIO

「这是我参与11月更文挑战的第11天,活动详情查看:2021最后一次更文挑战」。

一、BIO与NIO

本小节将BIO与NIO放到一起进行分析,主要为了突出其差别。

1.1 对比stream和channel

以前我们写代码,涉及到IO操作,首先想到的必然是一系列的stream,如InputStream等。如今随着java中nio的引入,我们多了一个选择,channel。那么两者相比有哪些不同,channel又有哪些优势呢?

1)stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)。
2)stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用。
3)二者均为全双工,即读写可以同时进行。

二、IO模型

当调用一次 channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

1)等待数据准备好

2)从内核向用户进程复制数据

阻塞IO模型

image.png

非阻塞IO模型

应用进程不停的轮询内核空间,会造成CPU浪费。

image.png

多路IO复用模型

用户进程首先阻塞于select方法,当内核返回可读状态后,根据事件类型去做调用,将数据复制到用户空间缓冲区,处理区间状态阻塞。

image.png

异步IO模型

AIO是java中IO模型的一种,作为NIO的改进和增强随JDK1.7版本更新被集成在JDK的nio包中,因此AIO也被称作是NIO2.0。AIO提供了从建立连接到读、写的全异步操作。AIO可用于异步的文件读写和网络通信。

image.png

三、零拷贝

3.1 原始IO分析

如下伪代码,读取本地文件,通过socket写出:

1
2
3
4
5
6
7
8
9
10
ini复制代码File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
// 包括2次拷贝(DMA(硬件到内核缓冲区),内核缓冲到用户缓冲)
file.read(buf);

Socket socket = ...;
//包括两次拷贝(用户缓冲区到socket缓冲区,DMA(socket缓冲区到网卡))
socket.getOutputStream().write(buf);

其内部实际的工作构成如下所示:

image.png

我们根据代码的过程,结合图上的步骤逐步分析:

1)创建文件类file,定义byte数组,当真正开始执行read方法时,才开始获取数据。

java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(内核)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access,可以理解为硬件单元,解放CPU的同时,完成文件IO)来实现文件读,其间也不会使用 cpu。

此处可以算作第一次数据拷贝,但是通过DMA技术解决了IO问题。

2) 从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA。

此处是第二次数据拷贝。

3)调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝。

此处是第三次拷贝。

4)接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu。

此处算作第四次拷贝,DMA技术解决IO问题。

总结
通过上面的分析,我们得到java本身不具备物理设备级别的IO读写,而是缓存级别的读写,通过调用操作系统来完成硬件级别的读写。

上述步骤总共经历3次状态切换,4次的数据拷贝。

3.2 NIO优化

3.2.1 使用直接内存

在前面我们学习ByteBuffer时,介绍到了其可以使用直接内存DirectByteBuffer。

ByteBuffer buffer = ByteBuffer.allocateDirect(16);

那么通过这个直接内存,能使我们前面的过程做到哪些优化呢?

image.png

如上图所示,由于直接内存的引入,java 可以使用 DirectByteBuf 将堆外内存(内核缓冲区)映射到 jvm 内存(用户缓冲区)中来直接访问使用。而其他的步骤没有变化。

  • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写。
  • java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成以下两步:
    1)DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
    2)通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

3.2.2 channel的transferTo/transferFrom

底层采用了 linux 2.1
进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用transferTo/transferFrom 方法拷贝数据。

其过程如下图所示:

image.png

1)java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu

2)数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝

3)最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

如上图和过程所示,其中只经历了一次状态切换,数据拷贝仍然是3次。

底层采用了 linux 2.4
linux底层对于整体的效率又有了优化,如下图所示:

image.png

1)java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu

2)只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗

3)使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

整个过程只发生了一次状态切换,实际只通过DMA经过两次数据拷贝。

实际所谓的零拷贝并不是真正的没有拷贝过程,而是不会有数据拷贝到用户态,即jvm内存中的过程。

四、AIO

4.1 简单介绍及使用

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
csharp复制代码public class TestAio {

public static void main(String[] args) throws IOException {
try {
AsynchronousFileChannel s = AsynchronousFileChannel.open(
Paths.get("C:\\Users\\P50\\Desktop\\text.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println("begin...");
s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("read completed..." + result);
buffer.flip();
System.out.println(Thread.currentThread().getName() + ",内容是:" + print(buffer));
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("ead failed...");
}
});

} catch (IOException e) {
e.printStackTrace();
}
System.out.println("do other things...");
System.in.read();
}

static String print(ByteBuffer b) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < b.limit(); i++) {
stringBuilder.append((char) b.get(i));
}
return stringBuilder.toString();
}
}

结果,打印内容的并不是主线程,多次尝试,每次都是不同的,并且主线程并没有阻塞:

1
2
3
4
arduino复制代码begin...
do other things...
read completed...10
Thread-7,内容是:helloworld

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束。

4.2 网络编程

服务端示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
typescript复制代码public class AioServer {

public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
}

private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}

private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}

private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;

public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}

@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}

@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%