「这是我参与11月更文挑战的第15天,活动详情查看:2021最后一次更文挑战」
所谓的协议,是由语法、语义、时序这三个要素组成的一种规范,通信双方按照该协议规范来实现网络数据传输,这样通信双方才能实现数据正常通信和解析。
由于不同的中间件在功能方面有一定差异,所以其实应该是没有一种标准化协议来满足不同差异化需求,因此很多中间件都会定义自己的通信协议,另外通信协议可以解决粘包和拆包问题。
在本篇文章中,我们来实现一个自定义消息协议。
自定义协议的要素
自定义协议,那这个协议必须要有组成的元素,
- 魔数: 用来判断数据包的有效性
- 版本号: 可以支持协议升级
- 序列化算法: 消息正文采用什么样的序列化和反序列化方式,比如json、protobuf、hessian等
- 指令类型:也就是当前发送的是一个什么类型的消息,像zookeeper中,它传递了一个Type
- 请求序号: 基于双工协议,提供异步能力,也就是收到的异步消息需要找到前面的通信请求进行响应处理
- 消息长度
- 消息正文
协议定义
1 | txt复制代码sessionId | reqType | Content-Length | Content | |
其中Version
,Content-Length
,SessionId
就是Header信息,Content
就是交互的主体。
定义项目结构以及引入包
1 | xml复制代码<dependency> |
项目结构如图4-1所示:
- netty-message-mic : 表示协议模块。
- netty-message-server :表示nettyserver。
图4-1
- 引入log4j.properties
在nettyMessage-mic中,包的结构如下。
定义Header
表示消息头
1 | java复制代码@Data |
定义MessageRecord
表示消息体
1 | java复制代码@Data |
OpCode
定义操作类型
1 | java复制代码public enum OpCode { |
定义编解码器
分别定义对该消息协议的编解码器
MessageRecordEncoder
1 | java复制代码@Slf4j |
MessageRecordDecode
1 | java复制代码@Slf4j |
测试协议的解析和编码
EmbeddedChannel是netty专门改进针对ChannelHandler的单元测试而提供的
1 | java复制代码public class CodesMainTest { |
编码包分析
运行上述代码后,会得到下面的一个信息
1 | txt复制代码 +-------------------------------------------------+ |
按照协议规范:
- 前面8个字节表示sessionId
- 一个字节表示请求类型
- 4个字节表示长度
- 后面部分内容表示消息体
测试粘包和半包问题
通过slice方法进行拆分,得到两个包。
ByteBuf中提供了一个slice方法,这个方法可以在不做数据拷贝的情况下对原始ByteBuf进行拆分。
1 | java复制代码public class CodesMainTest { |
运行上述代码会得到如下异常, readerIndex(0) +length(8)表示要读取8个字节,但是只收到7个字节,所以直接报错。
1 | txt复制代码2021-08-31 15:53:01,385 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ: 7B |
解决拆包问题
LengthFieldBasedFrameDecoder是长度域解码器,它是解决拆包粘包最常用的解码器,基本上能覆盖大部分基于长度拆包的场景。其中开源的消息中间件RocketMQ就是使用该解码器进行解码的。
首先来说明一下该解码器的核心参数
- lengthFieldOffset,长度字段的偏移量,也就是存放长度数据的起始位置
- lengthFieldLength,长度字段锁占用的字节数
- lengthAdjustment,在一些较为复杂的协议设计中,长度域不仅仅包含消息的长度,还包含其他数据比如版本号、数据类型、数据状态等,这个时候我们可以使用lengthAdjustment进行修正,它的值=包体的长度值-长度域的值
- initialBytesToStrip,解码后需要跳过的初始字节数,也就是消息内容字段的起始位置
- lengthFieldEndOffset,长度字段结束的偏移量, 该属性的值=lengthFieldOffset+lengthFieldLength
1 | java复制代码public class CodesMainTest { |
添加一个长度解码器,就解决了拆包带来的问题。运行结果如下
1 | java复制代码2021-08-31 16:09:35,115 [com.netty.example.codec.MessageRecordDecode]-[INFO] 序列化出来的结果:MessageRecord(header=Header(sessionId=123456, type=3, length=18), body=Hello World) |
基于自定义消息协议通信
下面我们把整个通信过程编写完整,代码结构如图4-2所示.
图4-2
服务端开发
1 | java复制代码@Slf4j |
ServerHandler
1 | java复制代码@Slf4j |
客户端开发
1 | java复制代码public class ProtocolClient { |
ClientHandler
1 | java复制代码@Slf4j |
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自
Mic带你学架构
!
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注同名微信公众号获取更多技术干货!
本文转载自: 掘金