初识NIO之Java小Demo

Java中的IO、NIO、AIO:

BIO:在Java1.4之前,我们建立网络连接均使用BIO,属于同步阻塞IO。默认情况下,当有一条请求接入就有一条线程专门接待。所以,在客户端向服务端请求时,会询问是否有空闲线程进行接待,如若没有则一直等待或拒接。当并发量小时还可以接受,当请求量一多起来则会有许多线程生成,在Java中,多线程的上下文切换会消耗计算机有限的资源和性能,造成资源浪费。

NIO:NIO的出现是为了解决再BIO下的大并发量问题。其特点是能用一条线程管理所有连接。如下图所示:

图片来自网络

NIO是同步非阻塞模型,通过一条线程控制选择器(Selector)来管理多个Channel,减少创建线程和上下文切换的浪费。当线程通过选择器向某条Channel请求数据但其没有数据时,是不会阻塞的,直接返回,继续干其他事。而一旦某Channel就绪,线程就能去调用请求数据等操作。当该线程对某条Channel进行写操作时同样不会被阻塞,所以该线程能够对多个Channel进行管理。

NIO是面向缓冲流的,即数据写入写出都是通过 Channel —— Buffer 这一途径。(双向流通)

AIO:与之前两个IO模型不同的是,AIO属于异步非阻塞模型。当进行读写操作时只须调用api的read方法和write方法,这两种方法均是异步。对于读方法来说,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。换言之就是当调用完api后,操作系统完成后会调用回调函数。

总结:一般IO分为同步阻塞模型(BIO),同步非阻塞模型(NIO),异步阻塞模型,异步非阻塞模型(AIO)

同步阻塞模型指的是当调用io操作时必须等到其io操作结束

同步非阻塞模型指当调用io操作时不必等待可以继续干其他事,但必须不断询问io操作是否完成。

异步阻塞模型指应用调用io操作后,由操作系统完成io操作,但应用必须等待或去询问操作系统是否完成。

异步非阻塞指应用调用io操作后,由操作系统完成io操作并调用回调函数,应用完成放手不管。

NIO的小Demo之服务端

首先,先看下服务端的大体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码public class ServerHandle implements Runnable{
//带参数构造函数
public ServerHandle(int port){

}
//停止方法
public void shop(){

}
//写方法
private void write(SocketChannel socketChannel, String response)throws IOException{

}
//当有连接进来时的处理方法
private void handleInput(SelectionKey key) throws IOException{

}

//服务端运行主体方法
@Override
public void run() {

}
}

首先我们先看看该服务端的构造函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码public ServerHandle(int port){
try {
//创建选择器
selector = Selector.open();
//打开监听通道
serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//传入端口,并设定连接队列最大为1024
serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
//监听客户端请求
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//标记启动标志
started = true;
System.out.println("服务器已启动,端口号为:" + port);
} catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}

在这里创建了选择器和监听通道,并将该监听通道注册到选择器上并选择其感兴趣的事件(accept)。后续其他接入的连接都将通过该 监听通道 传入。

然后就是写方法的实现:

1
2
3
4
5
6
7
8
9
复制代码    private void doWrite(SocketChannel channel, String response) throws IOException {
byte[] bytes = response.getBytes();
ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
wirteBuffer.put(bytes);
//将写模式改为读模式
wirteBuffer.flip();
//写入管道
channel.write(wirteBuffer);
}

其次是当由事件传入时,即对连接进来的链接的处理方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
复制代码    private void handleInput(SelectionKey key) throws IOException{
//当该键可用时
if (key.isValid()){
if (key.isAcceptable()){
//返回该密钥创建的通道。
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
通过该通道获取链接进来的通道
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()){
//返回该密钥创建的通道。
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(byteBuffer);
if (readBytes > 0){
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String expression = new String(bytes, "UTF-8");
System.out.println("服务器收到的信息:" + expression);
//此处是为了区别打印在工作台上的数据是由客户端产生还是服务端产生
doWrite(socketChannel, "+++++" + expression + "+++++");
} else if(readBytes == 0){
//无数据,忽略
}else if (readBytes < 0){
//资源关闭
key.cancel();
socketChannel.close();
}
}
}
}

这里要说明的是,只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监控这些事件是否发生。
如在构造方法中有一通道serverSocketChannel注册了accept事件。当其就绪时就可以通过调用selector的selectorKeys()方法,访问”已选择键集“中的就绪通道。

压轴方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
复制代码    @Override
public void run() {
//循环遍历
while (started) {
try {
//当没有就绪事件时阻塞
selector.select();
//返回就绪通道的键
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
SelectionKey key;
while (iterator.hasNext()){
key = iterator.next();
//获取后必须移除,否则会陷入死循环
iterator.remove();
try {
//对就绪通道的处理方法,上述有描述
handleInput(key);
} catch (Exception e){
if (key != null){
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}catch (Throwable throwable){
throwable.printStackTrace();
}
}
}

此方法为服务端的主体方法。大致流程如下:

  1. 打开ServerSocketChannel,监听客户端连接
  2. 绑定监听端口,设置连接为非阻塞模式(阻塞模式下不能注册到选择器)
  3. 创建Reactor线程,创建选择器并启动线程
  4. 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
  5. Selector轮询准备就绪的key
  6. Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路
  7. 设置客户端链路为非阻塞模式
  8. 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
    异步读取客户端消息到缓冲区
  9. 调用write将消息异步发送给客户端

NIO的小Demo之客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码public class ClientHandle implements Runnable{
//构造函数,构造时顺便绑定
public ClientHandle(String ip, int port){

}
//处理就绪通道
private void handleInput(SelectionKey key) throws IOException{

}
//写方法(与服务端的写方法一致)
private void doWrite(SocketChannel channel,String request) throws IOException{

}
//连接到服务端
private void doConnect() throws IOException{

}
//发送信息
public void sendMsg(String msg) throws Exception{

}
}

首先先看构造函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码    public ClientHandle(String ip,int port) {
this.host = ip;
this.port = port;
try{
//创建选择器
selector = Selector.open();
//打开监听通道
socketChannel = SocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
socketChannel.configureBlocking(false);
started = true;
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}

接下来看对就绪通道的处理办法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
复制代码    private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
//这里的作用将在后面的代码(doConnect方法)说明
if(sc.finishConnect()){
System.out.println("已连接事件");
}
else{
System.exit(1);
}
}
//读消息
if(key.isReadable()){
//创建ByteBuffer,并开辟一个1k的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客户端收到消息:" + result);
}lse if(readBytes==0){
//忽略
}else if(readBytes<0){
//链路已经关闭,释放资源
key.cancel();
sc.close();
}
}
}
}

在run方法之前需先看下此方法的实现:

1
2
3
4
5
6
7
8
9
10
复制代码    private void doConnect() throws IOException{

if(socketChannel.connect(new InetSocketAddress(host,port))){
System.out.println("connect");
}
else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
System.out.println("register");
}
}

当SocketChannel工作于非阻塞模式下时,调用connect()时会立即返回:
如果连接建立成功则返回的是true(比如连接localhost时,能立即建立起连接),否则返回false。

在非阻塞模式下,返回false后,必须要在随后的某个地方调用finishConnect()方法完成连接。
当SocketChannel处于阻塞模式下时,调用connect()时会进入阻塞,直至连接建立成功或者发生IO错误时,才从阻塞状态中退出。

所以该代码在connect服务端后返回false(但还是有作用的),并在else语句将该通道注册在选择器上并选择connect事件。

客户端的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
复制代码    @Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循环遍历selector
while(started){
try{
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key ;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector关闭后会自动释放里面管理的资源
if(selector != null){
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}

}

发送信息到服务端的方法:

1
2
3
4
5
复制代码    public void sendMsg(String msg) throws Exception{
//覆盖其之前感兴趣的事件(connect),将其更改为OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}

完整代码:

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码/**
* Created by innoyiya on 2018/8/20.
*/
public class Service {
private static int DEFAULT_POST = 12345;
private static ServerHandle serverHandle;
public static void start(){
start(DEFAULT_POST);
}

public static synchronized void start(int post) {
if (serverHandle != null){
serverHandle.shop();
}
serverHandle = new ServerHandle(post);
new Thread(serverHandle,"server").start();
}
}

服务端主体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
复制代码import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
* Created by innoyiya on 2018/8/20.
*/
public class ServerHandle implements Runnable{

private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean started;

public ServerHandle(int port){
try {
//创建选择器
selector = Selector.open();
//打开监听通道
serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//判定端口,并设定连接队列最大为1024
serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
//监听客户端请求
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//标记启动标志
started = true;
System.out.println("服务器已启动,端口号为:" + port);
} catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void shop(){
started = false;
}

private void doWrite(SocketChannel channel, String response) throws IOException {
byte[] bytes = response.getBytes();
ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
wirteBuffer.put(bytes);
wirteBuffer.flip();
channel.write(wirteBuffer);
}

private void handleInput(SelectionKey key) throws IOException{
if (key.isValid()){
if (key.isAcceptable()){
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()){
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(byteBuffer);
if (readBytes > 0){
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String expression = new String(bytes, "UTF-8");
System.out.println("服务器收到的信息:" + expression);
doWrite(socketChannel, "+++++" + expression + "+++++");
} else if (readBytes < 0){
key.cancel();
socketChannel.close();
}
}
}
}

@Override
public void run() {
//循环遍历
while (started) {
try {
selector.select();
//System.out.println(selector.select());
Set<SelectionKey> keys = selector.selectedKeys();
//System.out.println(keys.size());
Iterator<SelectionKey> iterator = keys.iterator();
SelectionKey key;
while (iterator.hasNext()){
key = iterator.next();
iterator.remove();
try {
handleInput(key);
} catch (Exception e){
if (key != null){
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}catch (Throwable throwable){
throwable.printStackTrace();
}
}
}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
复制代码/**
* Created by innoyiya on 2018/8/20.
*/
public class Client {
private static String DEFAULT_HOST = "localhost";
private static int DEFAULT_PORT = 12345;
private static ClientHandle clientHandle;
private static final String EXIT = "exit";

public static void start() {
start(DEFAULT_HOST, DEFAULT_PORT);
}

public static synchronized void start(String ip, int port) {
if (clientHandle != null){
clientHandle.stop();
}
clientHandle = new ClientHandle(ip, port);
new Thread(clientHandle, "Server").start();
}

//向服务器发送消息
public static boolean sendMsg(String msg) throws Exception {
if (msg.equals(EXIT)){
return false;
}
clientHandle.sendMsg(msg);
return true;
}

}

客户端主体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
复制代码import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
* Created by innoyiya on 2018/8/20.
*/

public class ClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean started;

public ClientHandle(String ip,int port) {
this.host = ip;
this.port = port;
try{
//创建选择器
selector = Selector.open();
//打开监听通道
socketChannel = SocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
socketChannel.configureBlocking(false);
started = true;
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started = false;
}

private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect()){
System.out.println("已连接事件");
}
else{
System.exit(1);
}
}
//读消息
if(key.isReadable()){
//创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客户端收到消息:" + result);
} else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//异步发送消息
private void doWrite(SocketChannel channel,String request) throws IOException{
byte[] bytes = request.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//发送缓冲区的字节数组
channel.write(writeBuffer);

}
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port))){
System.out.println("connect");
}
else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
System.out.println("register");
}
}
public void sendMsg(String msg) throws Exception{
//覆盖其之前感兴趣的事件,将其更改为OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}

@Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循环遍历selector
while(started){
try{
selector.select();

Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key ;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector关闭后会自动释放里面管理的资源
if(selector != null){
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}

}
}

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码import java.util.Scanner;

/**
* Created by innoyiya on 2018/8/20.
*/
public class Test {
public static void main(String[] args) throws Exception {
Service.start();
Thread.sleep(1000);
Client.start();
while(Client.sendMsg(new Scanner(System.in).nextLine()));
}
}

控制台打印:

1
2
3
4
5
6
7
8
9
复制代码服务器已启动,端口号为:12345
register
已连接事件
1234
服务器收到的信息:1234
客户端收到消息:+++++1234+++++
5678
服务器收到的信息:5678
客户端收到消息:+++++5678+++++

如有不妥之处,请告诉我。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%