『Netty核心』Netty心跳机制

「这是我参与11月更文挑战的第28天,活动详情查看:2021最后一次更文挑战」。

点赞再看,养成习惯👏👏

心跳检测机制

所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线,以确保 TCP 连接的有效性。

心跳检测机制:客户端每隔一段时间发送PING消息给服务端,服务端接受到后回复PONG消息。客户端如果在一定时间内没有收到PONG响应,则认为连接断开,服务端如果在一定时间内没有收到来自客户端的PING请求,则认为连接已经断开。通过这种来回的PING-PONG消息机制侦测连接的活跃性。

在 Netty 中,本身也提供了 IdleStateHandler 用于检测连接闲置,该Handler可以检测连接未发生读写事件而触发相应事件。

看下它的构造器:

1
2
3
java复制代码public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

解释下三个参数的含义:

  • eaderIdleTimeSeconds 读超时:当在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLEIdleStateEvent 事件。
  • riterIdleTimeSeconds 写超时:即当在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLEIdleStateEvent 事件。
  • llIdleTimeSeconds 读/写超时:即当在指定的时间间隔内没有读或写操作时,会触发一个 ALL_IDLEIdleStateEvent 事件。

注:这三个参数默认的时间单位是。若需要指定其他时间单位,可以使用另一个构造方法:

1
2
3
java复制代码public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

要实现 Netty 服务端心跳检测机制需要在服务器端的 ChannelInitializer 中加入如下的代码:

1
java复制代码pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));

初步地看下 IdleStateHandler 源码,先看下 IdleStateHandler 中的 channelRead 方法:

红框代码其实表示该方法只是进行了透传,不做任何业务逻辑处理,让 channelPipe 中的下一个 handler 处理channelRead 方法

我们再看看 channelActive 方法:

这里有个 initialize 的方法,这是 IdleStateHandler 的精髓,接着探究:

image.png

这边会触发一个 TaskReaderIdleTimeoutTask,这个task里的 run 方法源码是这样的:

image.png

第一个红框代码是用当前时间减去最后一次 channelRead 方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead 已经是6s之前的事情了,你设置的是5s,那么 nextDelay 则为-1,说明超时了,那么第二个红框代码则会触发下一个 handler 的 userEventTriggered 方法:

如果没有超时则不触发 userEventTriggered 方法。

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码public class HeartBeatServer {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast("decoder",new StringDecoder());
channelPipeline.addLast("encoder",new StringEncoder());
channelPipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));
channelPipeline.addLast(new HeartBeatServerHandler());
}
});
System.out.println("netty server start。。");
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}

继承 SimpleChannelInboundHandler 重写 channelRead 方法和 userEventTriggered 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {

int readIdleTimes = 0;

@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(" ====== > [server] message received : " + s);
if ("Heartbeat Packet".equals(s)){
ctx.channel().writeAndFlush("ok");
}else{
System.out.println(" 其他信息处理...");
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType = "读空闲";
readIdleTimes++; // 读空闲的计数加1
break;
case WRITER_IDLE:
eventType = "写空闲";
// 不处理
break;
case ALL_IDLE:
eventType = "读写空闲";
// 不处理
break;
}
System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
if (readIdleTimes > 3){
System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
ctx.channel().writeAndFlush("idle close");
ctx.channel().close();
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码public class HeartBeatClient {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast("decoder",new StringDecoder());
channelPipeline.addLast("encoder",new StringEncoder());
channelPipeline.addLast(new HeartBeatClientHandler());
}
});
System.out.println("netty client start。。");
Channel channel = bootstrap.connect("127.0.0.1", 8888).sync().channel();
String text = "Heartbeat Packet";
Random random = new Random();
while(channel.isActive()){
int num = random.nextInt(8);
Thread.sleep(num*1000);
channel.writeAndFlush(text);
}
}catch (Exception e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}

static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(" client received :" + msg);
if (msg != null && msg.equals("idle close")) {
System.out.println(" 服务端关闭连接,客户端也关闭");
ctx.channel().closeFuture();
}
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%