netty(四)nio之网络编程 一、阻塞与非阻塞 二、多路

「这是我参与11月更文挑战的第9天,活动详情查看:2021最后一次更文挑战」。

一、阻塞与非阻塞

1.1 阻塞

1.1.1 阻塞模式会存在哪些问题?

1)在阻塞模式下,以下的方法都会导致线程暂停

  • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
  • SocketChannel.read 会在没有数据可读时让线程暂停
  • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程处于闲置状态

2)单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持

3)多线程下,有新的问题,体现在以下方面

  • 32 位 jvm 一个线程最大堆栈是 320k,64 位 jvm 一个线程 最大堆栈是1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低。
  • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive(不活跃),会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

1.1.2 测试代码:

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
csharp复制代码public class BioServerTest {

public static void main(String[] args) throws IOException {
// 使用 nio 来理解阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
System.out.println("connecting...");
// 阻塞方法,线程停止运行
SocketChannel sc = ssc.accept();
System.out.println("connected... " + sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
System.out.println("before read..."+channel);
try {
// 阻塞方法,线程停止运行
channel.read(buffer);
} catch (IOException e) {
}
buffer.flip();
System.out.println(print(buffer));
buffer.clear();
System.out.println("after read..."+channel);
}
}
}

static String print(ByteBuffer b) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < b.limit(); i++) {
stringBuilder.append((char) b.get(i));
}
return stringBuilder.toString();
}
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class SocketClientTest{

public static void main(String[] args) throws Exception {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");
//模拟长连接一直存在
while (true) {
}
}
}

启动服务端看结果,一直在connecting,此时线程阻塞了:

1
erlang复制代码connecting...

启动客户端看结果,此时连接成功,又阻塞到收到消息之前:

1
2
3
arduino复制代码connecting...
connected... java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:64800]
before read...java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:64800]

1.2 非阻塞

1.2.1 相比阻塞改变了什么?

非阻塞模式下,相关方法都不会让线程暂停

  • 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行
  • SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept
  • 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去

1.2.2 非阻塞模型存在哪些问题?

1)但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu

2)数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)

1.2.3 测试代码

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
arduino复制代码public class NonIoServerTest {

public static void main(String[] args) throws IOException {
// 使用 nio 来理解非阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 非阻塞模式
ssc.configureBlocking(false);
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
// 非阻塞,线程还会继续运行,如果没有连接建立,但sc是null
SocketChannel sc = ssc.accept();
if (sc != null) {
System.out.println("connected... " + sc);
// 非阻塞模式
sc.configureBlocking(false);
channels.add(sc);
}
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0
int read = channel.read(buffer);
if (read > 0) {
buffer.flip();
System.out.println(print(buffer));
buffer.clear();
System.out.println("after read..."+channel);
}
}
// 用于查非阻塞状态,连接客户端时可关闭,便于观看结果
System.out.println("wait connecting...");
}
}

static String print(ByteBuffer b) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < b.limit(); i++) {
stringBuilder.append((char) b.get(i));
}
return stringBuilder.toString();
}
}
启动服务端结果如下,不断刷新:
1
2
3
4
5
6
7
8
9
10
erlang复制代码wait connecting...
wait connecting...
wait connecting...
wait connecting...
wait connecting...
wait connecting...
wait connecting...
wait connecting...
wait connecting...
... ...

服务端与前面测试阻塞时一样,我们将服务端的System.out.println(“wait connecting…”);这行代码注释掉,方便看结果。

启动客户端,看服务端结果:

1
arduino复制代码connected... java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:61254]

二、多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用。

  • 多路复用仅针对网络 IO,普通文件 IO 没法利用多路复用
  • 如果使用非阻塞模式,而不使用selector,则线程大部分时间都在做无用功,使用Selector 能够保证以下三点:
+ 有可连接事件时才去连接
+ 有可读事件才去读取
+ 有可写事件才去写入( 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件)

2.1 Selector

image.png

上述方案的好处:

  • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功。
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

2.1.1 如何使用Selector?

如下代码及注释描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ini复制代码public class SelectorTest {

public static void main(String[] args) throws IOException {
// 创建Selector
Selector selector = Selector.open();

// 绑定channel事件(SelectionKey当中有四种事件:OP_ACCEPT,OP_CONNECT, OP_READ, OP_WRITE)
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_ACCEPT);

// 监听channel事件,返回值是发生事件的channel数
// 监听方法1,阻塞直到绑定事件发生
int count1 = selector.select();
// 监听方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)
int count2 = selector.select(1000);
// 监听方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件
int count3 = selector.selectNow();
}
}

上述代码当中,在selector进行channel时间监听时,会发生阻塞,直到时间发生,那么有哪些情况会使线程变成不阻塞状态呢?如下所示:

1)事件发生时(SelectionKey当中有四种事件:OP_ACCEPT,OP_CONNECT, OP_READ, OP_WRITE)

  • 客户端发起连接请求,会触发 accept 事件
  • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
  • channel 可写,会触发 write 事件
  • 在 linux 下 nio bug 发生时

2)调用 selector.wakeup()

3)调用 selector.close()

4)selector 所在线程 interrupt

2.2 处理accept事件

服务端如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
ini复制代码public class AcceptEventServerTest {

public static void main(String[] args) {
try {
ServerSocketChannel channel = ServerSocketChannel.open();
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
int count = selector.select();
// int count = selector.selectNow();
System.out.println("select count: " + count);
// if(count <= 0) {
// continue;
// }

// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();

// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
System.out.println(sc);
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

客户端如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typescript复制代码public class ClientTest {

public static void main(String[] args) {
// accept事件
try (Socket socket = new Socket("localhost", 8080)) {
System.out.println(socket);
// read事件
socket.getOutputStream().write("world".getBytes());
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}

服务端打印结果:

1
2
3
ini复制代码sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
select count: 1
java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60469]

上述代码中服务端注释掉了使用selector.selectNow()的方法,如果使用该方法,需要自己去判断返回值是否为0。

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发。

2.3 处理read事件

此处仍然使用代码的方式讲解,客户端与前面的客户端相同,只是此处会同时启动两个客户端,其中发送的内容分别是“hello” 和 “world”。

服务端代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
ini复制代码public class ReadEventServerTest {
public static void main(String[] args) {
try {
ServerSocketChannel channel = ServerSocketChannel.open();
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
int count = selector.select();
System.out.println("select count:" + count);

// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();

// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
System.out.println("连接已建立:" + sc);
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if (read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
System.out.println(print(buffer));
}
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

static String print(ByteBuffer b) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < b.limit(); i++) {
stringBuilder.append((char) b.get(i));
}
return stringBuilder.toString();
}
}

启动服务端,并先后启动两个客户端,看结果,首先服务端channel自己注册到selector,客户端1发送accept事件,服务端接收到后,继续while循环,监听到read事件,打印内容为“hello”,客户端2步骤相同。

1
2
3
4
5
6
7
8
9
lua复制代码sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
select count:1
连接已建立:java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:61693]
select count:1
hello
select count:1
连接已建立:java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:51495]
select count:1
world

注意:最后的iter.remove(),为什么要移除?
因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 ssckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

上述代码中cancel有什么作用?
cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

2.3.1 关注消息边界

首先看如下的代码是否有问题,客户端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
arduino复制代码public class ServerTest {

public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(9000);
while (true) {
Socket s = ss.accept();
InputStream in = s.getInputStream();
// 每次读取4个字节
byte[] arr = new byte[4];
while (true) {
int read = in.read(arr);
// 读取长度是-1,则不读取了
if (read == -1) {
break;
}
System.out.println(new String(arr, 0, read));
}
}
}
}

服务端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
csharp复制代码public class ClientTest {

public static void main(String[] args) throws IOException {
Socket max = new Socket("localhost", 9000);
OutputStream out = max.getOutputStream();
out.write("hello".getBytes());
out.write("world".getBytes());
out.write("你好".getBytes());
out.write("世界".getBytes());
max.close();
}
}

结果:

1
2
3
4
5
6
复制代码hell
owor
ld�
�好
世�
��

为什么会产生上述的问题?
这里面涉及到消息边界的问题。消息的长短是不同的,当我们指定相同长度的ByteBuffer去接收消息时,必然存在不同时间段存在很多种情况,如下所示:

image.png

由于buffer长度固定,必然存在消息被截断的情况,那么如何解决这些问题呢?

1)一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽

2)另一种思路是按分隔符拆分,缺点是效率低

3)TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量

  • Http 1.1 是 TLV 格式
  • Http 2.0 是 LTV 格式

通过面给出的答案都不是最好的解决方案,重点的问题在于如何分配Bytebuffer的大小?

buffer是给一个channel独立使用的,不能被多个channel共同使用,因为存在粘包、半包的问题。

buffer的大小又不能太大,如果要支持很大的连接数,同时又设置很大的buffer,则必然需要庞大的内存。

所以我们需要设置一个大小可变的ByteBuffer

目前有两种较为简单实现方案,其都有其优缺点:

1)预先分配一个较小的buffer,例如4k,如果发现不能装下全部内容,则创建一个更大的buffer,比如8k,将已写入的4k拷贝到新分配的8kbuffer,将剩下的内容继续写入。

其优点是消息必然是连续的,但是不断的分配和拷贝,必然会对性能造成较大的影响。

2)使用多个数组的形式组成buffer,当一个数组存不下数据内容,就将剩余数据放入下一个数组当中。在netty中的CompositeByteBuf类,就是这种方式。

其缺点是数据不连续,需要再次解析整合,优点是解决了上一个方案的造成性能损耗的问题。

2.4 处理write事件

什么是两阶段策略?

其出现原因有如下两个:

1)在非阻塞模式下,我们无法保证将buffer中的所有数据全部写入channel当中,所以我们需要追踪写入后的返回值,也就是实际写入字节的数值。

1
arduino复制代码int write = channel.write(buffer);

2)我们可以使所有的selector监听channel的可写事件,每个channel都会有一个key用来跟踪buffer,这样会占用过多的内存。(关于此点不太理解的,下面可以通过代码来理解)

鉴于以上问题,出现的两阶段策略:

1)当第一次写入消息时,我们才将channel注册到selector

2)如果第一次没写完,再次添加写事件, 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册(不取消则每次都会出现写事件)。

下面通过代码的方式演示:
服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
scss复制代码public class ServerTest {

public static void main(String[] args) throws IOException {
// 开启一个服务channel
ServerSocketChannel ssc = ServerSocketChannel.open();
// 设置非阻塞
ssc.configureBlocking(false);
// 绑定端口
ssc.bind(new InetSocketAddress(8080));

//初始化selector
Selector selector = Selector.open();
//注册服务端到selector
ssc.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
//监听事件,此处会阻塞
selector.select();

//获取所有事件key,遍历
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
//不管处理成功与否,移除key
iter.remove();
//如果是建立连接事件
if (key.isAcceptable()) {
//处理accept事件
SocketChannel sc = ssc.accept();
//设置非阻塞
sc.configureBlocking(false);
// 此处是第一阶段
//注册一个read事件到selector
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客户端发送内容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 30000000; i++) {
sb.append("a");
}
//字符串转buffer
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
//2. 写入数据到客户端channel
int write = sc.write(buffer);
// 3. write 表示实际写了多少字节
System.out.println("实际写入字节:" + write);
// 4. 如果有剩余未读字节,才需要关注写事件
// 此处是第二阶段
if (buffer.hasRemaining()) {
// 在原有关注事件的基础上,多关注 写事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作为附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
// 检索key中的附件buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
// 获取客户端channel
SocketChannel sc = (SocketChannel) key.channel();
// 根据上次的position继续写
int write = sc.write(buffer);
System.out.println("实际写入字节:" + write);
// 如果写完了,需要将绑定的附件buffer去掉,并且去掉写事件
// 如果没写完将会继续while,执行写事件,知道完成为止
if (!buffer.hasRemaining()) {
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
scss复制代码public class ClientTest {

public static void main(String[] args) throws IOException {
// 开启selector
Selector selector = Selector.open();
// 开启客户端channel
SocketChannel sc = SocketChannel.open();
//设置非阻塞
sc.configureBlocking(false);
// 注册一个连接事件和读事件
sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
//连接到服务端
sc.connect(new InetSocketAddress("localhost", 8080));
int count = 0;
while (true) {
//此处监测事件,阻塞
selector.select();
//获取时间key集合,并遍历
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
//无论成功与否,都要移除
iter.remove();
//连接
if (key.isConnectable()) {
System.out.println(sc.finishConnect());
} else if (key.isReadable()) {
//分配内存buffer
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
//读数据到buffer
count += sc.read(buffer);
//清空buffer
buffer.clear();
//打印总字节数
System.out.println(count);
}
}
}
}
}

分别启动服务端和客户端,结果如下:

1
2
3
4
5
6
7
8
9
10
makefile复制代码实际写入字节:3801059
实际写入字节:3014633
实际写入字节:4063201
实际写入字节:4718556
实际写入字节:2490349
实际写入字节:2621420
实际写入字节:2621420
实际写入字节:2621420
实际写入字节:2621420
实际写入字节:1426522
1
2
3
4
5
6
7
8
9
10
erlang复制代码true
131071
262142
393213
524284
655355
... ...
29753117
29884188
30000000

兄弟们,看到这了就给个赞呗,感谢

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%