Dubbo30 Filter 过滤链

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

上一篇看完了 Dubbo 3.0 的 Server 端接收 , 这一篇来看一下 Dubbo 的过滤链 . 过滤链也是整个流程中非常重要的一环 .

二 . 处理的起点

1
2
3
4
java复制代码// 调用逻辑
C- ChannelEventRunnable # run : 调用的起点 , 此处接受到消息
C- ExchangeHandler # reply : 发起 Invoke 处理流程
C- FilterChainBuilder # invoke : 开启 Invoke Filter 链构建流程

2.1 Invoke 发起流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码// 上一篇文章已经看过了 , 通过构建一个 ExchangeHandlerAdapter 发起 reply 调用
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
// 只保留核心逻辑
Invocation inv = (Invocation) message;
// 此处实际调用 FilterChainBuilder
Invoker<?> invoker = getInvoker(channel, inv);
//...........
Result result = invoker.invoke(inv);

}
}

2.2 Filter 链构建流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码C- FilterChainBuilder
// 这里构建 Filter 链 , 发起调用操作
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// filter 链的处理
asyncResult = filter.invoke(nextNode, invocation);
} catch (Exception e) {
// 省略 ,主要是进行 Listerner 的监听 , 又在玩异步
throw e;
} finally {

}
return asyncResult.whenCompleteWithContext((r, t) -> {
// 处理完成后 ,还是要通知监听器
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {

if (listener != null) {
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
listener.onError(t, originalInvoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}

} else if (filter instanceof FILTER.Listener) {
// 对 BaseFilter.Listener 接口进行处理
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
listener.onError(t, originalInvoker, invocation);
}
}
});
}

2.3 Filter 链的调用结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码// 来看一下 FilterChainBuilder , 他的核心对象是其中的一个内部类
class FilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T>{
// 源 Invoke 类型
TYPE originalInvoker;
// 下一个 invoke 节点
Invoker<T> nextNode;
// 当前的 filter 对象
FILTER filter;
}

// 下面看一下Filter 的构建的地方 -> DefaultFilterChainBuilder
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
Invoker<T> last = originalInvoker;
// 加载外部 Extension
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group);

if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
// 可以看到 , 这里循环创建了 FilterChainNode , 这里是责任链模式
last = new FilterChainNode<>(originalInvoker, next, filter);
}
}

return last;
}


// 这里是SPI 的加载方式 , 对应的 SPI 类为
dubbo-rpc-api/META-INF.dubbo.internal -> org.apache.dubbo.rpc.Filter

// 其中包含如下内容
echo=org.apache.dubbo.rpc.filter.EchoFilter
generic=org.apache.dubbo.rpc.filter.GenericFilter
genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
token=org.apache.dubbo.rpc.filter.TokenFilter
accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter
classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter
context=org.apache.dubbo.rpc.filter.ContextFilter
exception=org.apache.dubbo.rpc.filter.ExceptionFilter
executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
compatible=org.apache.dubbo.rpc.filter.CompatibleFilter
timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
tps=org.apache.dubbo.rpc.filter.TpsLimitFilter


// PS : Dubbo SPI 的加载方式类似 , 以后有时间再仔细看

SPI 可以看一下 @ juejin.cn/post/698945…

2.4 Dubbo 3.0 对比 Dubbo 2.0 的区别

2 者最大的区别在于 Filter 的机构变动 ,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// Step 1 : Filter 结构
@SPI
public interface Filter extends BaseFilter {
}


// Step 2 : BaseFilter 结构
public interface BaseFilter {

// 始终调用实现中的invoke.invoke(),将请求移交给下一个筛选器节点
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;

interface Listener {
// 当调用结束后 ,根据情况选择是否调用
void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
}

// 这里接口里面套了一个接口 , 虽然知道可以这么用 ,但是从来没做过 , 原来还能这么用
至于实现的时候 , 和内部类使用方式一样 :
public class ExceptionFilter implements Filter, Filter.Listener

三 . Filter 链

Filter 体系结构

Filter-ListenableFilter.png

主要的 Filter 的列表如下 :

  • EchoFilter
  • ClassLoaderFilter
  • GenericFilter
  • ContextFilter
  • ExceptionFilter
  • MonitorFilter
  • TimeoutFilter
  • TraceFilter

3.1 EchoFilter

ECHO 表示回声 , 该类的作用主要是为了检测服务是否可用

1
2
3
4
5
6
7
8
java复制代码public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// String $ECHO = "$echo";
if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
// 这里直接构建了一个异步 Result 返回 , 回声服务只为了检测服务是否可用
return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
}
return invoker.invoke(inv);
}

3.2 ClassLoaderFilter

该 Filter 主要为了对 ClassLoader 进行二次处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码// 将当前执行线程类 ClassLoader 设置为服务接口的类 ClassLoader
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
ClassLoader ocl = Thread.currentThread().getContextClassLoader();
// 对 ClassLoader 进行切换
Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
try {
return invoker.invoke(invocation);
} finally {
// 这里重新放入最初的 ClassLoad
Thread.currentThread().setContextClassLoader(ocl);
}
}

// PS : 这里进行切换的原因猜测应该和双亲委派有关 , 整个体系的 classLoader 不能处理外部 jar 加载的类
// 当出现这种多 ClassLoader 时, 又因为双亲委派的机制 , 父加载类没有加载成功 , 又最终的 classLoader 加载了 , 这里就需要进行相关的切换

3.3 GenericFilter

GenericFilter 同样实现了2个接口 : Filter, Filter.Listener , 这就意味着会有 Response 处理 ,

TODO : GenericFilter 这个类主要用途还没搞清楚 ,而且这个类太长了, 后面搞清楚了再开一个单章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码GenericFilter implements Filter, Filter.Listener

@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// String $INVOKE = "$invoke";
// String $INVOKE_ASYNC = "$invokeAsync";
if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
//................
}
return invoker.invoke(inv);
}


@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation inv) {
if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {

//................
}
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {

}

3.4 ContextFilter

ContextFilter 主要是对上下文进行处理 , 上下文中存放的是当前调用过程中所需的环境信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
java复制代码static {
UNLOADING_KEYS = new HashSet<>(128);
UNLOADING_KEYS.add(PATH_KEY);
UNLOADING_KEYS.add(INTERFACE_KEY);
UNLOADING_KEYS.add(GROUP_KEY);
UNLOADING_KEYS.add(VERSION_KEY);
UNLOADING_KEYS.add(DUBBO_VERSION_KEY);
UNLOADING_KEYS.add(TOKEN_KEY);
UNLOADING_KEYS.add(TIMEOUT_KEY);
UNLOADING_KEYS.add(TIMEOUT_ATTACHMENT_KEY);

// 删除async属性以避免传递给以下调用链
UNLOADING_KEYS.add(ASYNC_KEY);
UNLOADING_KEYS.add(TAG_KEY);
UNLOADING_KEYS.add(FORCE_USE_TAG);
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Map<String, Object> attachments = invocation.getObjectAttachments();
if (attachments != null) {
Map<String, Object> newAttach = new HashMap<>(attachments.size());
// attachments 中主要为请求的元数据信息
// {"input":"265","dubbo":"2.0.2","path":"org.apache.dubbo.metadata.MetadataService","version":"1.0.0","group":"dubbo-demo-annotation-provider"}
for (Map.Entry<String, Object> entry : attachments.entrySet()) {
String key = entry.getKey();
// ["path","_TO","group","dubbo.tag","version","dubbo.force.tag","dubbo","interface","timeout","token","async"]
if (!UNLOADING_KEYS.contains(key)) {
// 收集在排除之外的属性
newAttach.put(key, entry.getValue());
}
}
attachments = newAttach;
}

// 设置 invoke 到 ServiceContext5 中
RpcContext.getServiceContext().setInvoker(invoker)
.setInvocation(invocation);

RpcContext context = RpcContext.getServerAttachment();

context.setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());

String remoteApplication = (String) invocation.getAttachment(REMOTE_APPLICATION_KEY);
if (StringUtils.isNotEmpty(remoteApplication)) {
RpcContext.getServiceContext().setRemoteApplicationName(remoteApplication);
} else {
RpcContext.getServiceContext().setRemoteApplicationName((String) context.getAttachment(REMOTE_APPLICATION_KEY));
}

// 调用超时时间 , 未配置为 -1
long timeout = RpcUtils.getTimeout(invocation, -1);
if (timeout != -1) {
// pass to next hop
RpcContext.getClientAttachment().setObjectAttachment(TIME_COUNTDOWN_KEY, TimeoutCountDown.newCountDown(timeout, TimeUnit.MILLISECONDS));
}

// 可能已经在这个过滤器之前添加了一些附件到RpcContext
if (attachments != null) {
if (context.getObjectAttachments() != null) {
// 此处会报前文的额外属性进行设置
context.getObjectAttachments().putAll(attachments);
} else {
context.setObjectAttachments(attachments);
}
}

if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}

try {
context.clearAfterEachInvoke(false);
return invoker.invoke(invocation);
} finally {
context.clearAfterEachInvoke(true);
RpcContext.removeServerAttachment();
RpcContext.removeServiceContext();
// 对于异步场景,我们必须从当前线程中删除上下文,因此我们总是为同一线程的下一次调用创建一个新的RpcContext
RpcContext.getClientAttachment().removeAttachment(TIME_COUNTDOWN_KEY);
RpcContext.removeServerContext();
}
}

@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// 将附件传递到结果
appResponse.addObjectAttachments(RpcContext.getServerContext().getObjectAttachments());
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {

}

3.5 ExceptionFilter

ExceptionFilter 主要是针对 response 返回中的异常进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}


@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = appResponse.getException();

// 如果它被检查异常,直接抛出
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return;
}
// 如果异常出现在签名中,直接抛出
try {
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class<?>[] exceptionClasses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClasses) {
if (exception.getClass().equals(exceptionClass)) {
return;
}
}
} catch (NoSuchMethodException e) {
return;
}

// .. 省略 : 对于在方法签名中未找到的异常,在服务器日志中打印ERROR消息


// 如果异常类和接口类在同一个jar文件中,则直接抛出
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return;
}
// 如果是JDK异常,直接抛出
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return;
}
// 如果是 Dubbo 异常,直接抛出
if (exception instanceof RpcException) {
return;
}

// 否则,用RuntimeException包装并返回给客户机
appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {

}
}
}

3.6 MonitorFilter

MonitorFilter 用于监控操作 ,他会 调用截取器,并将收集关于此调用的调用数据并将其发送到监视中心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
java复制代码// Invoke 流程
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
// private static final String MONITOR_FILTER_START_TIME = "monitor_filter_start_time";
// private static final String MONITOR_REMOTE_HOST_STORE = "monitor_remote_host_store";
invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getServiceContext().getRemoteHost());
getConcurrent(invoker, invocation).incrementAndGet(); // count up
}
return invoker.invoke(invocation); // proceed invocation chain
}

// 主要的 Monitor 流程是以下流程
@Override
public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
collect(invoker, invocation, result,
(String) invocation.get(MONITOR_REMOTE_HOST_STORE),
(long) invocation.get(MONITOR_FILTER_START_TIME), false);

getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
collect(invoker, invocation, null, (String) invocation.get(MONITOR_REMOTE_HOST_STORE), (long) invocation.get(MONITOR_FILTER_START_TIME), true);
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
try {
Object monitorUrl;
// 获取上文构建的 MONITOR_KEY
monitorUrl = invoker.getUrl().getAttribute(MONITOR_KEY);
if(monitorUrl instanceof URL) {
//
Monitor monitor = monitorFactory.getMonitor((URL) monitorUrl);
if (monitor == null) {
return;
}
// 创建数据的url
URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
// 收集监控数据
monitor.collect(statisticsURL.toSerializableURL());
}
} catch (Throwable t) {
logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
}
}

// PS : 这里涉及到 MonitorService 对象 , 这里先不深入 ,只是过一下流程
C- MonitorService
- void collect(URL statistics) : 收集监测数据
- List<URL> lookup(URL query)

3.7 TimeoutFilter

对超时情况进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}

@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// String TIME_COUNTDOWN_KEY = "timeout-countdown";
Object obj = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
if (obj != null) {
TimeoutCountDown countDown = (TimeoutCountDown) obj;
//
if (countDown.isExpired()) {
// // 在超时的情况下清除响应
((AppResponse) appResponse).clear();
// 这里仅仅是打印一个 log
if (logger.isWarnEnabled()) {
logger.warn("invoke timed out. method: " + invocation.getMethodName() + " arguments: " +
Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() +
", invoke elapsed " + countDown.elapsedMillis() + " ms.");
}
}
}
}

3.8 TraceFilter

这个 Filter 主要对请求进行跟踪 , 该对象中存在一个集合用于维护 TRACES Key 和 Channel 的列表 , 在 invoke 调用完成后 , 会将日志发送到相关的 Channel 中

主要是监听 某个接口的任意方法或者某个方法 n次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
java复制代码// 这里涉及到一个主要的集合
private static final ConcurrentMap<String, Set<Channel>> TRACERS = new ConcurrentHashMap<>();


@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

// 常见的获取前后调用时间的方式
long start = System.currentTimeMillis();
Result result = invoker.invoke(invocation);
long end = System.currentTimeMillis();

// 如果 trace 存在
if (TRACERS.size() > 0) {

// 构建 trace key , 为代理接口.代理方法
String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
Set<Channel> channels = TRACERS.get(key);
if (channels == null || channels.isEmpty()) {
key = invoker.getInterface().getName();
channels = TRACERS.get(key);
}

// 从集合中获取对应的 channel 集合
if (CollectionUtils.isNotEmpty(channels)) {
for (Channel channel : new ArrayList<>(channels)) {
// 如果 channel 为连接或者出现异常 , 都会从集合中移除该 channel
if (channel.isConnected()) {
try {
int max = 1;
Integer m = (Integer) channel.getAttribute(TRACE_MAX);
if (m != null) {
max = m;
}
int count = 0;
AtomicInteger c = (AtomicInteger) channel.getAttribute(TRACE_COUNT);
if (c == null) {
c = new AtomicInteger();
channel.setAttribute(TRACE_COUNT, c);
}
count = c.getAndIncrement();
// 此处涉及到2个重要的参数 , 他们分别表示监听的最大次数和当前监听的次数
// private static final String TRACE_MAX = "trace.max";
// private static final String TRACE_COUNT = "trace.count";
if (count < max) {
String prompt = channel.getUrl().getParameter(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT);
channel.send("\r\n" + RpcContext.getServiceContext().getRemoteAddress() + " -> "
+ invoker.getInterface().getName()
+ "." + invocation.getMethodName()
+ "(" + JSON.toJSONString(invocation.getArguments()) + ")" + " -> " + JSON.toJSONString(result.getValue())
+ "\r\nelapsed: " + (end - start) + " ms."
+ "\r\n\r\n" + prompt);
}
if (count >= max - 1) {
channels.remove(channel);
}
} catch (Throwable e) {
channels.remove(channel);
}
} else {
channels.remove(channel);
}
}
}
}
return result;
}

// PS : 此处补充一下 Trace 的添加流程 , TraceFilter 中提供了2个方法对 Trace Channel 进行管理
public static void addTracer(Class<?> type, String method, Channel channel, int max)
- String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName();
?- 可用注意到 , tracer key j

public static void removeTracer(Class<?> type, String method, Channel channel)

// 而 Trace 主要的管理和处理类为 TraceTelnetHandler
public class TraceTelnetHandler implements TelnetHandler {

}

总结

  • Filter 链通过 SPI 加载
  • Filter 链通过 FilterChainBuilder 构建
  • Filter 中通过 invocation.getObjectAttachments() 获取属性

这篇文章深度不大 , 主要是由于笔者还在表层学习 , 第二由于这篇文章是一个概括性的文章 , 后续要慢慢的深入 Dubbo 3.0 相关

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%