《DUBBO系列》源码解析-dubbo协议

网络协议

网络协议(应用层)是计算机网络中双方共同确定的交流语义,因为在通信双方都是以字节流的形式进行交换信息,所以需要一种约定对信息进行编解码操作。

常见的协议模式

既然协议是一种双方的约定,那如何制定这种通信的规范呢?在TCP协议中是以字节流的形式进行传输数据,并且为了提高数据传输效率,会对消息进行缓冲再一起发送。这就导致一个问题,每次发送的内容不一定是一个具有完整意义的报文。基于TCP协议之上的应用层协议需要知道一个完整的数据从何开始,从何结束,这就是所谓粘包和拆包。因此有以下常见处理方式:

  • 定长
    固定长度的协议报文,每次读取固定长度的字节进行解析。缺点是真实场景中报文长度都是不固定的,灵活性太差。
  • 使用特殊字符
    使用某个特殊字符作为结束标志,如果需要传的内容和特殊字符一样就很蛋疼。
  • 协议头+payload
    使用固定长度的消息头加可变长度的payload,在消息头中标识payload的长度,这种方式就比较好一些,也是常用的方式。

dubbo协议

dubbo协议是开源RPC框架DUBBO中自定义的私有化协议,使用协议头+payload的方式。

image.png
duboo协议大概就长这样,接下来我们来看一下DUBBO框架在消息的编解码这块是怎么处理的。DUBBO的模块划分很清晰,因此可以先从这个入口开始看Netty服务的启动。
org.apache.dubbo.remoting.transport.netty.NettyServer#doOpen

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ini复制代码@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("backlog", getUrl().getPositiveParameter(BACKLOG_KEY, Constants.DEFAULT_BACKLOG));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); // 适配器设计模式
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder()); // 解码器
pipeline.addLast("encoder", adapter.getEncoder()); // 编码器
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}

可以看出这里对Netty的编解码接口进行了适配,NettyCodecAdapter 类中组合了一个重要的接口 Codec2,这是DUBBO中编解码抽象的一个接口。还有两个内部类InternalEncoder 和 InternalDecoder 实现了Netty中的编解码的接口,接口方法中会调用Codec2的方法从而实现接口适配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
ini复制代码private class InternalEncoder extends OneToOneEncoder {

@Override
protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
org.apache.dubbo.remoting.buffer.ChannelBuffer buffer =
org.apache.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
codec.encode(channel, buffer, msg); // 真正进行编码
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
}
}

private class InternalDecoder extends SimpleChannelUpstreamHandler {

private org.apache.dubbo.remoting.buffer.ChannelBuffer buffer =
org.apache.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
Object o = event.getMessage();
if (!(o instanceof ChannelBuffer)) {
ctx.sendUpstream(event);
return;
}

ChannelBuffer input = (ChannelBuffer) o;
int readable = input.readableBytes();
if (readable <= 0) {
return;
}

org.apache.dubbo.remoting.buffer.ChannelBuffer message;
if (buffer.readable()) {
if (buffer instanceof DynamicChannelBuffer) {
buffer.writeBytes(input.toByteBuffer());
message = buffer;
} else {
int size = buffer.readableBytes() + input.readableBytes();
message = org.apache.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
size > bufferSize ? size : bufferSize);
message.writeBytes(buffer, buffer.readableBytes());
message.writeBytes(input.toByteBuffer());
}
} else {
message = org.apache.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
input.toByteBuffer());
}

NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
Object msg;
int saveReaderIndex;

try {
// decode object.
do {
saveReaderIndex = message.readerIndex();
try {
msg = codec.decode(channel, message); // 真正进行解码
} catch (IOException e) {
buffer = org.apache.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw e;
}
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
if (saveReaderIndex == message.readerIndex()) {
buffer = org.apache.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw new IOException("Decode without read data.");
}
if (msg != null) {
Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
}
}
} while (message.readable());
...// 以下省略

了解了接口的适配,接下来看看Codec2 接口

image.png
可以看到Codec2使用了SPI注解是一个扩展点,但是框架本身只写了dubbo协议一种编解码的实现,和dubbo协议相关的类有

  • AbstractCodec // 抽象类,定义了一些静态方法
  • ExchangeCodec // duboo协议编解码主要逻辑
  • DubboCodec // 解析payload主要逻辑
  • DubboCountCodec // 包装类,添加额外功能
    接下详细看 org.apache.dubbo.remoting.exchange.codec.ExchangeCodec#decode(),解码过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
ini复制代码protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number. 这里需要找到魔数的位置进行保存然后继续读取,逻辑对应org.apache.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder#messageReceived 中
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// check length. HEADER_LENGTH即消息头的长度16个字节
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}

// get data length. 16-12=4个字节记录payload的大小
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);

int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}

// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

try {
return decodeBody(channel, is, header); // 解析payload
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}

接下来开始解析payload,会根据header中的信息使用不同的对象序列化等策略,此时应该对应默认dubbo协议
org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
scss复制代码protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); // 由此可知header中第三个字节保存的是序列化方式
// get request id.
long id = Bytes.bytes2long(header, 4); // 由此可知从第5个到12个共8个字节记录 请求Id(异步转同步很重要的参数,下回分析)
if ((flag & FLAG_REQUEST) == 0) { // 这里在第3个字节中同时记录了是请求还是响应,是twoway类型(双方有来有回)还是event类型(不需要回应),没理解为啥要这样,这应该是后面加的功能
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(true);
}
// get status.
byte status = header[3]; // 第4个字节记录响应的状态
res.setStatus(status);
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); // 对象序列化
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
data = decodeEventData(channel, in);
} else {
data = decodeResponseData(channel, in, getRequestData(id));
}
res.setResult(data);
} else {
res.setErrorMessage(in.readUTF());
}
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
}
...省略
}

通过源码的追踪,我们了解了dubboo 协议大致如下

1
2
3
4
5
6
7
lua复制代码header--16个字节
--1,2 魔数dabb
--3 序列化标记
--4 status
--5~12 requestId
--12~16 date length
payload-- Bytes.bytes2int(header, 12);

dubbo 协议看起来比较紧凑,在header中没有预留多余的字段,然后其实RPC 调用中还有很多参数是放在payload里的需要进行反序列化才能解析得到。通过阅读源码我们发现了一个问题,ExchangeCodec类从名字来看可以支持不同协议的转换,但是魔数和请求头的校验确固定了,并且其中的 decodeBody方法和子类DubboCodec中是一样的…总之能根据自己的业务场景设计出一个自定义协议还是挺有挑战性的。关于DUBBO3 中的主推的Triple 协议请听下回分解。

Triple 协议是 Dubbo3 推出的主力协议。Triple 意为第三代,通过 Dubbo1.0/ Dubbo2.0 两代协议的演进,以及云原生带来的技术标准化浪潮,Dubbo3 新协议 Triple 应运而生。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%