spring cloud gateway 一个请求到转发走过

spring cloud gateway 一个请求到转发走过的路途-源码解析

简介

Spring Cloud Gateway是基于Spring Boot 2.x,Spring WebFlux构建的,是新一代的网关解决方案。目前在打算用gateway网关替换掉原有的zuul网关,利用gateway提供的特性来提升原有网关性能。所以借此机会分析了下网关源码。

工作原理

这里贴一张官网的图
image.png
客户端向Gateway网关发出请求。如果网关处理映射请求与路由匹配,则将其发送到网关处理请求。请求经过网关多个过滤器链(这是涉及一个设计模式:责任链模式)。过滤器由虚线分隔的原因是,过滤器可以在发送请求之前和之后运行逻辑。所有“前置”过滤器逻辑均被执行。然后发出代理请求。发出代理请求后,将运行“后置”过滤器逻辑。

源码解析

首先从GatewayAutoConfiguration看起

负载均衡

GatewayAutoConfiguration注解中的GatewayLoadBalancerClientAutoConfiguration注解中有个LoadBalancerClientFilter是处理负载均衡的关键代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);

if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}

final ServiceInstance instance = choose(exchange);

if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}

URI uri = exchange.getRequest().getURI();

// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}

URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri);

if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}

exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}

我们在配置路由转发的时候在配置服务时会写lb://xxx,源码中看到了熟悉的lb。

1
2
3
4
java复制代码if (url == null 
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}

url为null或不是lb则跳过此过滤器,否则进行负载均衡处理。
负载关键代码是choose方法,返回ServiceInstance对象(serviceId,host,port等信息)作为负载均衡后的结果。

1
2
3
4
java复制代码protected ServiceInstance choose(ServerWebExchange exchange) {
return loadBalancer.choose(
((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost());
}

choose方法内部通过serviceId,通过ribbon去nacos里找到服务名对应的实例,并负载均衡选出一台实例服务ip和端口返回。将lb://xxx那部分替换成具体ip+端口后接请求路径,放入到上下文中key为gatewayRequestUrl。后通过NettyRoutingFilter过滤器(可以看到这个过滤器的order顺序是Integer.MAX_VALUE,目的就是为了处在最后的位置发送请求)使用httpclient发送请求到下游服务。

负载均衡流程图

image.png

请求转发

核心代码:DispatcherHandler.handle(ServcerWebExchange exchange),它是org.springframework.web.reactive包下的,所有的请求都会经过这里。webflux暂时没有研究,不过大体能看出关键代码和逻辑。

1
2
3
4
5
6
7
8
9
java复制代码if (this.handlerMappings == null) {
return createNotFoundError();
}
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));

我们可以看到mapping -> mapping.getHandler(exchange) debug进去发现getHandlerInternal()方法里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug(
"Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}

exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for ["
+ getExchangeDesc(exchange) + "]");
}
})));
}

它的实现方法在RoutePredicateHandlerMapping类中。开头if判断不用看,核心方法是lookupRoute(exchange)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error(
"Error applying predicate for route: " + route.getId(),
e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});

/*
* TODO: trace logging if (logger.isTraceEnabled()) {
* logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
*/
}

首先outeLocator.getRoutes()的实现方法RouteDefinitionRouteLocator.getRoutes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions()
.map(this::convertToRoute);

if (!gatewayProperties.isFailOnRouteDefinitionError()) {
// instead of letting error bubble up, continue
routes = routes.onErrorContinue((error, obj) -> {
if (logger.isWarnEnabled()) {
logger.warn("RouteDefinition id " + ((RouteDefinition) obj).getId()
+ " will be ignored. Definition has invalid configs, "
+ error.getMessage());
}
});
}

return routes.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition matched: " + route.getId());
}
return route;
});

routeDefinitionLocator.getRouteDefinitions()又有很多实现方法。。。分别都是从缓存中获取路由信息,从注册中心获取路由信息,从配置文件中获取路由信息等,总而言之就是获取到路由RouteDefinition对象,通过convertToRoute方法将RouteDefinition对象转换成Route对象(咱们网关配置的谓词和过滤器都放入了Route对象,构建Route对象的时候又涉及一个设计模式:建造者模式)。
再往上看,获取到路由信息后Mono.just(route).filterWhen()大概就是我们请求过来的url对某个路由信息做匹配过滤。将我们在路由里配置的id放入上下文中,key为GATEWAY_PREDICATE_ROUTE_ATTR(id如不指定,则为UUID)
image.png
我这里配置的路由有两个,从图中断点可以看出,第一个路由信息和当前访问的url不匹配,返回为false,第二个路由信息匹配上了,返回为true。
这样我们再一路返回到lookupRoute方法,经过上面的一顿操作,又将route路由放入上下文中key为GATEWAY_ROUTE_ATTR。
再一路往上返回,回到最初的handle
image.png
经过对mapping和路由的一系列前置处理,我们是不是就应该执行真正的过滤等处理逻辑了,下面就是执行处理的关键代码。
关键代码:invokeHandler(exchange, handler)

1
2
3
4
5
6
7
8
9
10
java复制代码private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
if (this.handlerAdapters != null) {
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
if (handlerAdapter.supports(handler)) {
return handlerAdapter.handle(exchange, handler);
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}

我们进入handlerAdapter.handle(exchange, handler)方法,再经过多个实现
SimpleHandlerAdapter -> FilteringWebHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();

List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
// TODO: needed or cached?
AnnotationAwareOrderComparator.sort(combined);

if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}

return new DefaultGatewayFilterChain(combined).filter(exchange);
}

我们看到之前放入上下文的key为GATEWAY_ROUTE_ATTR的路由现在可以用到了!我们取出Route路由对象,记得之前RouteDefinition对象转Route的时候做了什么吗?是不是放入了过滤器?这里取出之前set进的过滤器集合,然后new DefaultGatewayFilterChain(combined).filter(exchange),执行过滤!(又是个设计模式:责任链模式,设计模式写法不固定,主要是思想哈,它这里的写法是通过index标记改执行哪一个过滤器,然后通过index游标移动来经过过滤器链条)
还记得上面的负载均衡过滤器吗,他的order为int最大值,所以肯定最后要走到LoadBalancerClientFilter.filter,然后执行choose方法通过负载均衡算法选举出服务器实例,再通过httpClient调用下游服务。是不是又和前面负载均衡源码解析的步骤连起来了!

ps

里面有很多细节其实还没有写进去,比如路由信息会放入本地缓存中,路由信息获取也可自定义获取方式,比如从数据库中,从redis中等。大体上的流程就是这样了,有不对的地方欢迎指正!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%