netty(十四)Netty提升 - 粘包与半包 一、现象分

「这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战

一、现象分析

1.1 粘包

通过代码的方式演示下粘包的现象:

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
scss复制代码public class HalfPackageServer {

public static void main(String[] args) {

NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
//设置服务器接收端缓冲区大小为10
serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//此处打印输出,看看收到的内容是10次16字节,还是一次160字节
printBuf((ByteBuf) msg);
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待连接
channelFuture.sync();
//阻塞等待释放连接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 释放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}

static void printBuf(ByteBuf byteBuf){
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i< byteBuf.writerIndex();i++) {
stringBuilder.append(byteBuf.getByte(i));
stringBuilder.append(" ");
}

stringBuilder.append("| 长度:");
stringBuilder.append(byteBuf.writerIndex());
stringBuilder.append("字节");
System.out.println(stringBuilder);
}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码public class HalfPackageClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//建立连接成功后,会触发active事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//循环发送,每次16字节,共10次
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
//释放连接
channelFuture.channel().closeFuture().sync();

} catch (InterruptedException e) {
System.out.println("client error :" + e);
} finally {
//释放EventLoopGroup
worker.shutdownGracefully();
}
}
}

结果:

1
复制代码0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | 长度:160字节

如上所示,发送了10次的16个字节,接收到了一个160字节,而不是10个16字节。这就是粘包现象。

1.2 半包

半包现象我们仍然使用前面的代码,但是服务端需要多设置一个属性,即修改服务端接收缓冲区的buffer大小,我们这里修改为10个字节。

serverBootstrap.option(ChannelOption.SO_RCVBUF,10);

这行代码起初运行完我有点不理解,明明设置是10,但是接收到的内容确是40,可见这个设置的值,不是10个字节,通过源码跟踪,我发现这个数值ChannelOption的泛型是个Ingteger,估计是进行了类型的计算,一个Integer是4个字节,所以有了设置是10,最终却接收40个字节。

如果同学们觉得这个问题说的不对的话,可以帮我指正一下。感谢!!

public static final ChannelOption SO_RCVBUF = valueOf(“SO_RCVBUF”);

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
scss复制代码public class HalfPackageServer {

public static void main(String[] args) {

NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
//设置服务器接收端缓冲区大小为10
serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//此处打印输出,看看收到的内容是10次16字节,还是一次160字节
printBuf((ByteBuf) msg);
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待连接
channelFuture.sync();
//阻塞等待释放连接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 释放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}

static void printBuf(ByteBuf byteBuf){
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i< byteBuf.writerIndex();i++) {
stringBuilder.append(byteBuf.getByte(i));
stringBuilder.append(" ");
}

stringBuilder.append("| 长度:");
stringBuilder.append(byteBuf.writerIndex());
stringBuilder.append("字节");
System.out.println(stringBuilder);
}
}

客户端与前面的粘包相同。

结果:

1
2
3
4
5
复制代码0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 | 长度:36字节
4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 | 长度:40字节
12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 | 长度:40字节
4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 0 1 2 3 4 5 6 7 8 9 10 11 | 长度:40字节
12 13 14 15 | 长度:4字节

如上所示,总共160字节,总共发送了五次,每一次都含有不完整16字节的数据。这就是半包,其实里面也包含的粘包。

至于为什么第一个只有36,这里没有具体分析,但是我们可以猜想下,每次连接的首次应该是有表示占用了4个字节。

二、粘包、半包分析

产生粘包和半包的本质:TCP是流式协议,消息是无边界的

2.1 滑动窗口

TCP是一种可靠地传出协议,每发送一个段就需要进行一次确认应答(ack)处理。如何没收到ack,则会再次发送。

但是如果对于一个客户端来说,每次发送一个请求,都要等到另一个客户端应该的话,才能发送下一个请求,那么整个构成就成了串行化的过程,大大降低了连接间的传输效率。

TCP如何解决效率地下的问题?

引入滑动窗口。窗口大小即决定了无需等待应答而可以继续发送的数据最大值。

简易滑动过程如下所示:

image.png

窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用。

  • 只有窗口内的数据才允许被发送(绿色),当应答(蓝色)未到达前,窗口必须停止滑动
  • 如果 0-100 这个段的数据 ack 回来了,窗口就可以向下滑动,新的数据段会被加入进来进行发送。
  • 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收

2.2 粘包现象分析

  • 现象,发送了10次的16个字节,接收到了一个160字节,而不是10个16字节。
  • 产生原因:
+ 应用层:接收方 ByteBuf 设置太大(Netty 默认 1024)
+ 滑动窗口(TCP):假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当接收方滑动窗口中缓冲了多个报文就会粘包。
+ [Nagle 算法(TCP)](https://baike.baidu.com/item/Nagle%E7%AE%97%E6%B3%95/5645172?fr=aladdin):会造成粘包

2.3 半包现象分析

  • 现象,前面的半包示例代码发送了10次的16个字节,接收到的数据有部分是被阶段的,不是完整的16字节。
  • 产生原因
+ 应用层:接收方 ByteBuf 小于实际发送数据量
+ 滑动窗口(TCP):假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时放不下了,只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
+ MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包

2.4 引申:Nagle 算法 和 MSS限制

2.4.1 Nagle算法

TCP中为了提高网络的利用率,经常使用一个叫做Nagle的算法。

该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送的一种处理机制。

如果以下两个条件都不满足时,则进行一段时间延迟后,才进行发送:

  • 已发送的数据都已经收到确认应答时
  • 可以发送最大段长度(MSS)的数据时

根据这个算法虽然网络利用率可以提高,但是可能会发生某种程度的延迟。对于时间要求准确的系统,往往需要关闭该算法。

在Netty中如下的条件:

  • 如果 SO_SNDBUF 的数据达到 MSS(maximum segment size),则需要发送。
  • 如果 SO_SNDBUF 中含有 FIN(表示需要连接关闭)这时将剩余数据发送,再关闭。
  • 如果 TCP_NODELAY = true,则直接发送。
  • 已发送的数据都收到 ack 时,则需要发送。
  • 上述条件不满足,但发生超时(一般为 200ms)则需要发送。
  • 除上述情况,延迟发送

2.4.2 MSS限制

MSS 限制

数据链路层对一次能够发送的最大数据有限制,每种数据链路的最大传输单元(MTU)都不尽相同。

  • 以太网的 MTU 是 1500
  • FDDI(光纤分布式数据接口)的 MTU 是 4352
  • 本地回环地址的 MTU 是 65535 - 本地测试不走网卡

MSS 是最大段长度(maximum segment size),它是 MTU 刨去 tcp 头和 ip 头后剩余能够作为数据传输的字节数

  • ipv4 tcp 头占用 20 bytes,ip 头占用 20 bytes,因此以太网 MSS 的值为 1500 - 40 = 1460
  • TCP 在传递大量数据时,会按照 MSS 大小将数据进行分割发送
  • MSS 的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为 MSS

三、粘包和半包的解决方案

3.1 短连接

每当客户端。发送一条消息后,就与服务器断开连接。

我们需要对客户端的代码进行改造,其实就是,将内部发送10次消息的循环抽出来,一次连接只发送一个消息,需要建立10次连接,这个我就不演示了,不用想都能得到结果:

  • 客户端发送一定不会产生粘包问题。
  • 服务端只要接收缓冲区足够大,一定不会产生半包的问题,但是不能完全保证。

此方案的缺点:

1)建立大量的链接,效率低。

2)不能解决半包问题。

3.2 固定长度消息

Netty针对固定长度消息提供了一个入站处理器FixedLengthFrameDecoder,能够制定接收消息的固定长度。

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
scss复制代码public class FixedLengthServer {

public static void main(String[] args) {

NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//此处打印输出,看看收到的内容是10次16字节,还是一次160字节
printBuf((ByteBuf) msg);
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待连接
channelFuture.sync();
//阻塞等待释放连接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 释放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}

static void printBuf(ByteBuf byteBuf) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < byteBuf.writerIndex(); i++) {
stringBuilder.append(byteBuf.getByte(i));
stringBuilder.append(" ");
}

stringBuilder.append("| 长度:");
stringBuilder.append(byteBuf.writerIndex());
stringBuilder.append("字节");
System.out.println(stringBuilder);
}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
ini复制代码public class FixedLengthClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//建立连接成功后,会触发active事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 发送内容随机的数据包
Random r = new Random();
char c = 1;
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
byte[] bytes = new byte[8];
for (int j = 0; j < r.nextInt(8); j++) {
bytes[j] = (byte) c;
}
c++;
buffer.writeBytes(bytes);
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
//释放连接
channelFuture.channel().closeFuture().sync();

} catch (InterruptedException e) {
System.out.println("client error :" + e);
} finally {
//释放EventLoopGroup
worker.shutdownGracefully();
}
}
}

结果:

1
2
3
4
5
6
7
8
9
10
复制代码1 1 0 0 0 0 0 0 | 长度:8字节
2 2 2 2 0 0 0 0 | 长度:8字节
3 0 0 0 0 0 0 0 | 长度:8字节
4 4 4 0 0 0 0 0 | 长度:8字节
5 5 5 5 0 0 0 0 | 长度:8字节
6 0 0 0 0 0 0 0 | 长度:8字节
7 0 0 0 0 0 0 0 | 长度:8字节
8 8 0 0 0 0 0 0 | 长度:8字节
9 9 9 0 0 0 0 0 | 长度:8字节
10 10 10 10 10 0 0 0 | 长度:8字节

使用固定长度也有一定的缺点,消息长度不好确定:

1)过大,造成空间浪费。

2)过小,对于某些数据包可能不够。

3.3 分隔符

1)Netty提供了LineBasedFrameDecoder(换行符帧解码器)。

默认以 \n 或 \r\n 作为分隔符,需要指定最大长度,如果超出指定长度仍未出现分隔符,则抛出异常。

2)Netty提供了DelimiterBasedFrameDecoder(自定义分隔符帧解码器)。
需要自己指定一个ByteBuf类型的分隔符,且需要指定最大长度。

LineBasedFrameDecoder示例代码:

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码public class LineBasedServer {

public static void main(String[] args) {

NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.toString(StandardCharsets.UTF_8));
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待连接
channelFuture.sync();
//阻塞等待释放连接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 释放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码public class LineBasedClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//建立连接成功后,会触发active事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 发送带有分隔符的数据包
ByteBuf buffer = ctx.alloc().buffer();
String str = "hello world\nhello world\n\rhello world\nhello world";
buffer.writeBytes(str.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
//释放连接
channelFuture.channel().closeFuture().sync();

} catch (InterruptedException e) {
System.out.println("client error :" + e);
} finally {
//释放EventLoopGroup
worker.shutdownGracefully();
}
}
}

结果:

1
2
3
复制代码hello world
hello world
hello world

DelimiterBasedFrameDecoder代码示例,大体与前一种相同,下面只给出不同位置的代码:

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf buffer = ch.alloc().buffer();
buffer.writeBytes("||".getBytes(StandardCharsets.UTF_8));
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buffer));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.toString(StandardCharsets.UTF_8));
super.channelRead(ctx, msg);
}
});
}

客户端:

1
ini复制代码String str = "hello world||hello world||hello world||hello world";

使用分隔符的这种方式,也有其缺点:处理字符数据比较合适,但如果内容本身包含了分隔符,那么就会解析错误。逐个字节去比较,相率也不是很好。

3.4 预设长度

LengthFieldBasedFrameDecoder长度字段解码器,允许我们在发送的消息当中,指定消息长度,然后会根据这个长度取读取响应的字节数。

1
2
3
4
5
6
java复制代码public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset,
int lengthFieldLength,
int lengthAdjustment,
int initialBytesToStrip)

主要看这个有5个字段的构造器,下面我们分别看每个字段的含义是什么。

字段含义我先列在这:

maxFrameLength:最大长度。

lengthFieldOffset:长度字段偏移量。

lengthFieldLength:长度字段长度。

lengthAdjustment:以长度字段为基准,还有几个字节是内容。

initialBytesToStrip:从头剥离几个字节。

通过Netty提供的几个例子,解释这几个字段的意思。

例子1:

  • lengthFieldOffset = 0
  • lengthFieldLength = 2
  • lengthAdjustment = 0
  • initialBytesToStrip = 0

则发送前,总共14个字节,其中lengthFieldLength占据两个字节,0x000C表示消息长度是12:

Length Actual Content
0x000C “HELLO, WORLD”

接收解析后,仍然是14个字节:

Length Actual Content
0x000C “HELLO, WORLD”

例子2:

  • lengthFieldOffset = 0
  • lengthFieldLength = 2
  • lengthAdjustment = 0
  • initialBytesToStrip = 2

则发送前,总共14个字节,其中lengthFieldLength占据两个字节,0x000C表示消息长度是12,initialBytesToStrip表示从头开始剥离2个字节,则解析后,将长度的字段剥离掉了:

Length Actual Content
0x000C “HELLO, WORLD”

接收解析后,只有12个字节:

Actual Content
“HELLO, WORLD”

例子3:

  • lengthFieldOffset = 2
  • lengthFieldLength = 3
  • lengthAdjustment = 0
  • initialBytesToStrip = 0

则发送前,总共17个字节,其中lengthFieldLength占据3个字节,长度字段偏移量是2,偏移量用来存放魔数Header 1:

Header 1 Length Actual Content
0xCAFE 0x00000C “HELLO, WORLD”

接收解析后,只有17个字节:

Header 1 Length Actual Content
0xCAFE 0x00000C “HELLO, WORLD”

例子4:

  • lengthFieldOffset = 0
  • lengthFieldLength = 3
  • lengthAdjustment = 2
  • initialBytesToStrip = 0

则发送前,总共17个字节,其中lengthFieldLength占据3个字节,lengthAdjustment 表示从长度字段开始,还有2个字节是内容:

Length Header 1 Actual Content
0x00000C 0xCAFE “HELLO, WORLD”

接收解析后,只有17个字节:

Length Header 1 Actual Content
0x00000C 0xCAFE “HELLO, WORLD”

例子5:

  • lengthFieldOffset = 1
  • lengthFieldLength = 2
  • lengthAdjustment = 1
  • initialBytesToStrip = 3

则发送前,总共17个字节,长度字段便宜1个字节,长度字段为2个字节,从长度字段开始,还有一个字节后是内容,忽略前三个字节。

Header 1 Length Header 2 Actual Content
0xCA 0x000C 0xFE “HELLO, WORLD”

接收解析后,只有13个字节:

Header 2 Actual Content
0xFE “HELLO, WORLD”

模拟例子5,写一段实例代码,其中内容略有不同:

示例代码:

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码/**
* @description: TODO
* @author:weirx
* @date:2021/11/12 15:34
* @version:3.0
*/
public class LineBasedServer {

public static void main(String[] args) {

NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,1,4,1,5));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.readByte() + "|" + byteBuf.toString(StandardCharsets.UTF_8));
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待连接
channelFuture.sync();
//阻塞等待释放连接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 释放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
scss复制代码public class LineBasedClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//建立连接成功后,会触发active事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
send(ctx,"hello, world");
send(ctx,"HI!");
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
//释放连接
channelFuture.channel().closeFuture().sync();

} catch (InterruptedException e) {
System.out.println("client error :" + e);
} finally {
//释放EventLoopGroup
worker.shutdownGracefully();
}
}

static void send(ChannelHandlerContext ctx,String msg){
ByteBuf buffer = ctx.alloc().buffer();
byte[] bytes = msg.getBytes();
int length = bytes.length;
//先写Header 1
buffer.writeByte(1);
//再写长度
buffer.writeInt(length);
//再写Header 2
buffer.writeByte(2);
//最后写内容
buffer.writeBytes(bytes);
ctx.writeAndFlush(buffer);
}
}

结果:

1
2
复制代码2|hello, world
2|HI!

关于粘包和半包的介绍就这么多了,有用的话,帮忙点个赞吧~~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%