Netty之HelloWorld 〇 准备工作 一 Se

〇. 准备工作

  • 导包
1
2
3
4
5
xml复制代码<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>

一. Server端

  • server从宏观上来看,会处理两方面的事件:客户端连接请求、业务相关的事件(包括server端接受client端的消息、client端断开连接等等)。
  • 针对这两类事件,netty分别采用了两个线程组来实现。当有连接请求过来的时候,使用BossGroup进行处理;当连接成功之后,将该channel传递给WorkerGroup,WorkerGroup管理该channel的请求。
1
2
java复制代码EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

其中,值得说明的是,NioEventLoopGroup构造方法传递的参数,表示该线程组里面的线程池个数。上述表示,bossGroup有一个线程池负责处理连接请求。如果不提供参数,就默认为CPU核心数*2。

  • 之后就可以初始化netty server的启动类ServerBootstrap
  • 首先,将上面初始化的两个EventLoopGroup作为参数放入。
  • 设置通道类型为NioServerSocketChannel
  • 设置已连接队列的大小,这个参数与TCP建立连接有关。详细可看:blog.csdn.net/weixin_4473…
  • 将连接设置为keep-alive,避免短时间内重复多次TCP握手。详细可看:www.cnblogs.com/caoweixiong…
  • 设置childHandler(实际上处理事件的是处理器链,后面再详细描述)。childHandler就是处理WorkerGroup的事件。在这首先添加一个初始化器,在初始化器里面给处理链添加了一个自定义的真正处理类的类MessageHandler
1
2
3
4
5
6
7
8
9
10
11
java复制代码ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new MessageHandler());
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
  • 一般处理器都会实现ChannelInboundHandlerAdapterSimpleChannelInboundHandlerSimpleChannelInboundHandlerChannelInboundHandlerAdapter的子类,会帮忙释放直接内存,避免内存泄露,因此继承SimpleChannelInboundHandler会比较方便。
  • channelRead0方法就是真正处理消息的方法
  • 指定client端发送ByteBuf类型的数据,因此接收的时候直接获取即可。(也可以指定不同类型的数据,需要指定不同的编解码器)
  • ChannelHandlerContext是该通道处理器相关的上下文对象,携带了通道本身、pipline等上下文。详细内容之后再分析。在本次代码中,通过上下文获取到channel本身,再通过channel获取到client端的ip地址。
  • 最好重写exceptionCaught,及时捕获异常并且进行处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class MessageHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
System.out.println("receive message from " + ctx.channel().remoteAddress() + ", message: " + byteBuf.toString(CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
}
  • 完成启动类的初始化之后,还没完成。因为server端没有正式启动。
  • bind方法表示服务在PORT端口正式启动,并且调用sync方法,表示同步阻塞等待启动完成。
1
java复制代码ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
  • 获取到channelFuture对象之后,就可以同步阻塞地监听关闭事件了。
1
java复制代码channelFuture.channel().closeFuture().sync();
  • 为了在关闭服务的时候,及时将线程池关闭,需要在finally时,关闭所有线程池
1
2
java复制代码bossGroup.shutdownGracefully(); 
workerGroup.shutdownGracefully();
  • 至此,最简单的一个netty server已完成。
  • 完整代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码public class HelloWorldServer {
public static final int PORT = 9999;

public static void main(String[] args) {
// set one thread for listening on port
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// depend on the number of core
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new MessageHandler());
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);

System.out.println("finish initialize the parameter for boostrap.");
System.out.println("begin to start netty server");

ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
System.out.println("shutdown bossGroup and workerGroup gracefully.");
}
}
}

二、Client端

  • 对于client端来说,会比server端少一个处理连接请求的EventLoopGroup,只有一个处理基本业务的EventLoopGroup
1
java复制代码EventLoopGroup eventExecutors = new NioEventLoopGroup();
  • client的启动类为Bootstrap
  • 设置线程组为eventExecutors
  • 设置通道类型为NioSocketChannel(注意,与server端的是不一样的)
  • 因为只有线程组,因此handler也只有一组,因此在handler方法里面放一个初始化器就好。在初始化器里面再添加事件的真正处理类。
1
2
3
4
5
6
7
8
9
java复制代码Bootstrap bootstrap = new Bootstrap()
.channel(NioSocketChannel.class)
.group(eventExecutors)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SendMessageHandler());
}
});
  • 这个处理器重写了channelActive方法,表示当通道创建成功之后就运行的方法。该方法用于接收用户输入的消息,并通过通道发送给server。
  • 其中,writeAndFlush方法可以实现写入缓存并发送给server。Unpooled.copiedBuffer可以将Sring类型的字符串变成netty自定义的一个二进制缓存类ByteBuf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class SendMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.print("please input you message: ");
Scanner scan = new Scanner(System.in);
while (scan.hasNextLine()) {
String line = scan.nextLine();
ctx.channel().writeAndFlush(Unpooled.copiedBuffer(line, CharsetUtil.UTF_8));
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
  • 启动类通过连接指定的地址、端口号实现连接,并且调用sync()方法实现同步阻塞等待。
  • 并且同步阻塞地等待关闭通道事件。
1
2
java复制代码ChannelFuture channelFuture = bootstrap.connect(ADDRESS, PORT).sync();
channelFuture.channel().closeFuture().sync();
  • 最后,需要保证在关闭client端之后,优雅地关闭线程组
1
java复制代码eventExecutors.shutdownGracefully();
  • 至此,完成了最简单的netty client端实现。
  • 以下是完整代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public class HelloWorldClient {
public static final String ADDRESS = "localhost";
public static final int PORT = 9999;

public static void main(String[] args) {
EventLoopGroup eventExecutors = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap()
.channel(NioSocketChannel.class)
.group(eventExecutors)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SendMessageHandler());
}
});

System.out.println("finish initialize netty client");
ChannelFuture channelFuture = bootstrap.connect(ADDRESS, PORT).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
System.out.println("shutdown eventExecutors gracefully.");
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%