Dubbo 30 RPC Server 接收消息的处理

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

Dubbo 3.0 在去年就已经发布了 , 整体来说使用没有太大的区别 , 之前一直没有好好读一下 Dubbo 的源码 , 这里把这些补上 , 顺便来看看和 2.0 有什么区别.

Dubbo 2.0 中把 Dubbo 分成了 10 个层次 , 相对 3.0 同样如此 , 这一篇只要对应 Proxy , Exchange , Transport 三个部分

1.1 补充一 : RPC 简介

RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

一个基本的RPC架构里面应该至少包含以下4个组件:

1、客户端(Client):服务调用方(服务消费者)

2、客户端存根(Client Stub):存放服务端地址信息,将客户端的请求参数数据信息打包成网络消息,再通过网络传输发送给服务端

3、服务端存根(Server Stub):接收客户端发送过来的请求消息并进行解包,然后再调用本地服务进行处理

4、服务端(Server):服务的真正提供者

Dubbo3 的核心内容之一就是定义了下一代 RPC 协议 , 除了通信功能 , 还具有以下的功能 :

  • 统一的跨语言二进制格式
  • 支持Streaming 和应用层全双工调用模型
  • 易于扩展
  • 能够被各层设备识别

二 . RPC Server 端

2.1 Service 服务类案例

以官方案例为例 , 先看一下 Service 端有什么关键点 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@DubboService
public class DemoServiceImpl implements DemoService {
private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);

@Override
public String sayHello(String name) {
logger.info("Hello " + name + ", request from consumer: " + RpcContext.getServiceContext().getRemoteAddress());
return "Hello " + name + ", response from provider: " + RpcContext.getServiceContext().getLocalAddress();
}

@Override
public CompletableFuture<String> sayHelloAsync(String name) {
return null;
}

}


// 核心主要是 @DubboService
// 实现接口并不是Dubbo 流程上的必需点 ,只是为了保证规范 , Client 端的 @Reference 如果与此接口不匹配 , 会相对抛出异常

2.2 Server 端创建代理

忽略 @DubboService 的扫描逻辑 , 我们只是来看一下是怎么扫描和创建 Server 端的代理的

2.3 Server 端被调流程

该环节中将属性映射到对应的方法 , 主要看一下 Server 端的反射流程 :

  • C- ChannelEventRunnable # run : 对 channel 监听和处理
  • C- DecodeHandler # received : 接受消息
  • C- HeaderExchangeHandler # received : Exchange 接收
  • C- HeaderExchangeHandler # handleRequest : 处理请求
  • C- DubboProtocol # requestHandler : 反射到指定的方法
  • C- xxxFilter.invoke : Filter 拦截链处理
  • C- InvokerWrapper # invoke
  • C- AbstractProxyInvoker # invoke
  • C- JavassistProxyFactory # doInvoke

这里可以看到 , 主体还是经过 Exchange - Protocol - Proxy 进行处理的

2.3.1 接收消息

在从 received 看之前 , 要先看一下 Dubbo Remote 远程调用的相关逻辑 , 可以看懂 ,入口类为ChannelEventRunnable :

补充 : ChannelEventRunnable 的注册

1
2
3
4
5
6
7
8
9
java复制代码// ChannelEventRunnable 的注册是在调用时进行 , 首先构建了一个 NettyServerHandler 进行 Netty 服务器的创建
C- NettyServerHandler # channelActive
C- AbstractServer # connected
C- AllChannelHandler # connected : Channel 连接处理
C- ChannelEventRunnable # ChannelEventRunnable : 构造器加载 ChannelEventRunnable

// PS : 更之前通过 DubboBootstrap # exportServices 发起的
// 经过 DubboProtocol # createServer 逻辑
// 最终创建了 NettyServer , 这里先不深入

正式处理 : 看看 ChannelEventRunnable 如何实现监听分类

可以看到 , ChannelEventRunnable 中会针对不同的 state 去调用不同的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码// 接收到数据
- ChannelState.RECEIVED <---> handler.received(channel, message)
// Channel 连接
- ChannelState.CONNECTED <---> handler.connected(channel)
// Channel 连接失败
- ChannelState.DISCONNECTED <---> handler.disconnected(channel)
// Channel 发送状态
- ChannelState.SENT <---> handler.sent(channel, message)
// 出现异常
- ChannelState.CAUGHT <---> handler.caught(channel, exception)

// Step 1 : 调用 CONNECTED 建立连接
channel -> [id: 0xb761cccc, L:/192.168.181.2:20880 - R:/192.168.181.2:52575]
url : dubbo://192.168.181.2:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=192.168.181.2&bind.port=20880&channel.readonly.sent=true&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&interface=org.apache.dubbo.demo.DemoService&metadata-type=remote&methods=sayHello,sayHelloAsync&pid=5676&release=&side=provider&threadname=DubboServerHandler-192.168.181.2:20880&timestamp=1627140229352

// Step 2 : 接收消息
handler.received(channel, message)

此处只关注 received , 代码和接收的数据如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
JAVA复制代码// C- DecodeHandler # received : 接受消息
public void received(Channel channel, Object message) throws RemotingException {
// Step 1 : 省略对 message 进行 decode 转换逻辑 ...

// Step 2 : 调用 Handler 处理
handler.received(channel, message);
}

channel -> NettyChannel [channel=[id: 0x665df7bf, L:/192.168.181.2:20880 - R:/192.168.181.2:61537]]
message -> Request
[id=1, version=2.0.2, twoway=true, event=false, broken=false, data=RpcInvocation
[methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world],
attachments=
{
input=272, dubbo=2.0.2, path=org.apache.dubbo.demo.DemoService, version=0.0.0,
remote.application=dubbo-demo-annotation-consumer,
interface=org.apache.dubbo.demo.DemoService
}
]
]

2.3.2 received 处理 request

先来看一下 ExchangeHandler 体系结构

dubbo-system-ExchangeHandler.png

这里在 DecodeHandler 中就先进行了一遍处理 , 分别通过 message 的类型 , 进行了一次 decode 处理 :

  • Decodeable –> decode(message);
  • Request –> decode(((Request) message).getData())
  • Response –> decode(((Response) message).getResult());

然后才是 HeaderExchangeHandlerChannel 的请求的处理 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码C- HeaderExchangeHandler
M- received :
- HeaderExchangeChannel.getOrAddChannel(channel)
1- handlerEvent(channel, request)
2- handleRequest(exchangeChannel, request)
3- handler.received(exchangeChannel, request.getData())
4- handleResponse(channel, (Response) message)
5- channel.send(echo)


public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
// 通过具体的类型进行分别的处理 , 前面的 message 已经进行过 Decode
// 这里分开做可以看到很明显的解耦想法
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
// 针对事件或者双向标识(是否 response ,ack )进行分别处理 , 默认直接转向下层 Handler
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
// 单纯的异常处理 ,说明什么类型都不是
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
}

2.3.3 处理 Request , 构建 Response

因为 isTwoWay 为 true ,说明需要通过 response 告知调用者已经接收到消息

这里准备了 Response , 并且进行返回 , 同时调用对应方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {

// 准备 Response 进行返回
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
// ... 此处是出现异常 , 省略构建异常的相关逻辑

// channle 返回 response
channel.send(res);
return;
}
// 从 messsage 中获取 InvokeMehod
Object msg = req.getData();
try {
// 此处构建了一个 CompletableFuture , 用于发起异步处理 -> DubboProtocol
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 处理完成后直接返回 , 舒服
channel.send(res);
} catch (RemotingException e) {

}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}

2.3.4 获取反射到的方法

上面那个环节构建了一个 CompletableFuture , 此对象由 DubboProtocol # requestHandler 完成构建 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
java复制代码C- DubboProtocol
P- requestHandler : 注意 ,这是一个属性 , 在创建 DubboProtocol 时完成创建的


private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

// 这才是流程中调用的方法
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

if (!(message instanceof Invocation)) {
throw new RemotingException(....);
}

// 构建代理类
Invocation inv = (Invocation) message;
// Step 2 : 核心逻辑 , 获取 Invoke 代理类
Invoker<?> invoker = getInvoker(channel, inv);
// 如果是回调,需要考虑向后兼容性
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
return null;
}
}

// 配置容器消息 : RemoteAddress
RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());

// Step 3 : 注意 . 此处开始的是拦截器链 -> 2.4 Filter 体系
// invoker : FilterChainBuilder$FilterChainNode
Result result = invoker.invoke(inv);

// 处理完成 ,Future 返回
return result.thenApply(Function.identity());
}


@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);

} else {
super.received(channel, message);
}
}

// 在第一步连接的时候 ,就会调用该方法 , 此时不会做 invoke 处理
// 但是, 我认为这里应该有定制的途径 ,后面有机会看一下
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
invoke(channel, ON_DISCONNECT_KEY);
}

private void invoke(Channel channel, String methodKey) {
// 创建代理 ,代理不为空 ,才会继续逻辑
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}

// channel.getUrl()总是绑定到一个固定的服务,并且这个服务是随机的
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
// 此处会从 url 获取对应的属性 , 常量有 : onconnect , ondisconnect , 此时不会创建代理
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}

// 通过
RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]);
invocation.setAttachment(PATH_KEY, url.getPath());
invocation.setAttachment(GROUP_KEY, url.getGroup());
invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
invocation.setAttachment(VERSION_KEY, url.getVersion());
if (url.getParameter(STUB_EVENT_KEY, false)) {
invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
}

return invocation;
}
};


// Step 2 : 具体的方法调用 , 该方法中解析出 ServiceKey , 并且查询出 Invoke
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;

// Step 2-1 : 准备标识数据
// 拿到端口 : 20880
int port = channel.getLocalAddress().getPort();
// 拿到需要的配置类型 : org.apache.dubbo.metadata.MetadataService
String path = (String) inv.getObjectAttachments().get(PATH_KEY);

// Step 2-2 : 判断它是否为客户端的回调服务
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}

// 是否有回调代理
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY);
inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}

// Step 2-3 : 获取 ServiceKey
// dubbo-demo-annotation-provider/org.apache.dubbo.metadata.MetadataService:1.0.0:20880
String serviceKey = serviceKey(
port,
path,
(String) inv.getObjectAttachments().get(VERSION_KEY),
(String) inv.getObjectAttachments().get(GROUP_KEY)
);

// Step 2-4 : 查询对应的 DubboExporter
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

if (exporter == null) {
throw new RemotingException(....;
}

// Step 2-5 : 返回 invoke 类
return exporter.getInvoker();
}

// 补充 Step 2-3 :
protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}

2.4 Filter 体系

之前看到 , 构建 invoke 的时候是构建 FilterChain , 感觉这种写法挺不错 , 以后有机会试试 , 这里的 Filter 主要有 :

  • EchoFilter
  • ClassLoaderFilter
  • GenericFilter
  • ContextFilter
  • ExceptionFilter
  • MonitorFilter
  • TimeoutFilter
  • TraceFilter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码C- FilterChainBuilder

public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// filter 链的处理
asyncResult = filter.invoke(nextNode, invocation);
} catch (Exception e) {
// 省略 ,主要是进行 Listerner 的监听 , 又在玩异步
throw e;
} finally {

}
return asyncResult.whenCompleteWithContext((r, t) -> {
// 处理完成后 ,还是要通知监听器
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
listener.onError(t, originalInvoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
listener.onError(t, originalInvoker, invocation);
}
}
});
}

2.5 最终的方法调用

以及最后一个Filter 的方法调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码C- AbstractProxyInvoker

// 反正就是各种异步和 Future , 用的已经炉火纯青了
public Result invoke(Invocation invocation) throws RpcException {
try {
// 调用 JavassistProxyFactory 处理方法
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse(invocation);
//.....
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
} catch (InvocationTargetException e) {
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
throw new RpcException(....);
}
}

// PS : AsyncRpcResult 的作用 (有必要后面再看)
这个类表示一个未完成的RPC调用,它将保存这个调用的一些上下文信息,例如RpcContext和Invocation
因此,当调用结束并返回结果时,它可以保证恢复的所有上下文与调用任何回调之前发出调用时相同。



// C- JavassistProxyFactory : 也没啥看头了 , 基本上东西前面都准备好了 , 这里反射处理就行
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}

- proxy : 被代理的类
- methodName : 调用的方法
- parameterTypes : 属性类型

2.6 RPC 的 线程调用

RPC 被调流程中 , 创建的是 InternalRunnable , 来看一下是如何调用的

其底层是通过 Netty 进行的交互 , 这里就不详述了 ,以后说 Netty 时再分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class InternalRunnable implements Runnable{
private final Runnable runnable;

public InternalRunnable(Runnable runnable){
this.runnable=runnable;
}

/**
* After the task execution is completed, it will call {@link InternalThreadLocal#removeAll()} to clear
* unnecessary variables in the thread.
*/
@Override
public void run() {
try{
runnable.run();
}finally {
InternalThreadLocal.removeAll();
}
}

}


// 调用的路径
public class NamedInternalThreadFactory extends NamedThreadFactory {
@Override
public Thread newThread(Runnable runnable) {
String name = mPrefix + mThreadNum.getAndIncrement();
InternalThread ret = new InternalThread(mGroup, InternalRunnable.Wrap(runnable), name, 0);
ret.setDaemon(mDaemon);
return ret;
}
}

三 . 深入梳理

3.1 方法的信息来源和校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码// 从 Netty 传入的参数是这样的 :
{
"broken": false,
"data": {
"arguments": ["world"],
"attachments": {
"input": "272",
"path": "org.apache.dubbo.demo.DemoService",
"remote.application": "dubbo-demo-annotation-consumer",
"dubbo": "2.0.2",
"interface": "org.apache.dubbo.demo.DemoService",
"version": "0.0.0"
},
"methodName": "sayHello",
"objectAttachments": {
"input": "272",
"dubbo": "2.0.2",
"path": "org.apache.dubbo.demo.DemoService",
"version": "0.0.0",
"remote.application": "dubbo-demo-annotation-consumer",
"interface": "org.apache.dubbo.demo.DemoService"
},
"parameterTypesDesc": "Ljava/lang/String;",
"targetServiceUniqueName": "org.apache.dubbo.demo.DemoService:0.0.0"
},
"event": false,
"heartbeat": false,
"id": 1,
"twoWay": true,
"version": "2.0.2"
}

// 这里可以看到 , 从Netty 传入的参数中就已经携带了方法信息 , 也就是说方法信息是 Client 端发送的 ,
// 注意 , 这个 methodName 并不是后置校验 , 在项目初始化的时候 , 就进行了校验 , 这个位置以后细说 , 只跟到最后校验的地方

C- ReferenceConfig
protected synchronized void init() {
// 前置流程省略 : DubboBootstrap 初始化和配置初始胡啊

// 校验 Invoke
checkInvokerAvailable();
}



// Step 2 :检验是否正确
private void checkInvokerAvailable() throws IllegalStateException {
if (shouldCheck() && !invoker.isAvailable()) {
invoker.destroy();
throw new IllegalStateException("Failed to check the status of the service "......);
}
}

总结

从这篇发现 , Dubbo 的 Method 代理信息是从远程 Client 传递过来的 ,而远程 Client 在启动时会校验 @ReferenceService 是否正确 , 从而保证了准确性

  • Dubbo Server 端首先会创建 NettyService
  • 处理的起点是 ChannelEventRunnable , 通过 DecodeHandler 进行解析
  • 核心的处理逻辑在 DubboProtocol , 这个环节中已经获取具体的 Invoke 类了
  • 最终的调用方为 JavassistProxyFactory , 发起代理的调用

整体来说 , 看 Dubbo 的代码还是更接地气 , 比看 Spring 的代码收获了更多的东西 !!!!!!!!!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%