Spring Cloud Gateway源码解析-09-结合

示例

SCG配置

1
2
3
4
5
6
7
8
9
10
11
json复制代码spring:
cloud:
gateway:
discovery:
locator:
enabled: true # 默认关闭
url-expression: "'lb://'+serviceId" #路由目标地址表达式,默认值就是"'lb://'+serviceId"
#配置nacos注册中心
nacos:
discovery:
server-addr: 127.0.0.1:8848 #地址

假如我们已经在nacos中注册了一个user-service,user-service有一个接口为/api/hello,当我们请求SCG /user-service/api/hello时就会请求到user-service。

在这里插入图片描述

原理

核心类之RouteDefinition的装配

在这里插入图片描述

补充了下之前的图
在这里插入图片描述

右上角红色的部分为结合注册中心涉及的类。从图中可以看到它们之间的关系,DiscoveryLocatorPropertiesGatewayProperties类似用于读取discovery相关的配置,通过DiscoveryLocatorPropertis装配DiscoveryClientRouteDefinitionLocatorDiscoveryClientRouteDefinitionLocatorRouteDefinitionLocator的子类,也是用来存放RouteDefinition的,最终会同PropertiesRouteDefinitionLocator一样被组合到CompositeRouteDefinitionLocator中。

DiscoveryLocatorPropertiesDiscoveryClientRouteDefinitionLocator
是在GatewayDiscoveryClientAutoConfiguration装配的。

DiscoveryLocatorProperties

DiscoveryRouteDefinition会使用PathRoutePredicateFactoryRewritePathGatewayFilterFactory,进行Path匹配和请求Path重写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
json复制代码@ConfigurationProperties("spring.cloud.gateway.discovery.locator")
public class DiscoveryLocatorProperties {

//开启标识,默认关闭
private boolean enabled = false;

/**
* 路由ID前缀,默认为DiscoveryClient的类名称 {@link org.springframework.cloud.client.discovery.DiscoveryClient}
*/
private String routeIdPrefix;

//是否使用SpEL表达式
private String includeExpression = "true";

//用来创建路由Route的uri表达式,最终会被解析为类似uri=lb://user-service,可覆盖
private String urlExpression = "'lb://'+serviceId";

/**
* Option to lower case serviceId in predicates and filters, defaults to false. Useful
* with eureka when it automatically uppercases serviceId. so MYSERIVCE, would match
* /myservice/**
*/
private boolean lowerCaseServiceId = false;

private List<PredicateDefinition> predicates = new ArrayList<>();

private List<FilterDefinition> filters = new ArrayList<>();

GatewayDiscoveryClientAutoConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
json复制代码public static List<PredicateDefinition> initPredicates() {
ArrayList<PredicateDefinition> definitions = new ArrayList<>();
// add a predicate that matches the url at /serviceId/**
PredicateDefinition predicate = new PredicateDefinition();
//设置Predicate名称,Path,DiscoveryRouteDefinition会使用PathRoutePredicateFactory
predicate.setName(normalizeRoutePredicateName(PathRoutePredicateFactory.class));
//设置Path参数,serviceId会在DiscoveryClientRouteDefinitionLocator#getRouteDefinition中替换为注册中心上的服务名,例如user-service
predicate.addArg(PATTERN_KEY, "'/'+serviceId+'/**'");
definitions.add(predicate);
return definitions;
}

public static List<FilterDefinition> initFilters() {
ArrayList<FilterDefinition> definitions = new ArrayList<>();

// add a filter that removes /serviceId by default
FilterDefinition filter = new FilterDefinition();
//设置使用的过滤器,此处使用RewritePathGatewayFilterFactory,因为后边会重写请求Path
filter.setName(normalizeFilterFactoryName(RewritePathGatewayFilterFactory.class));
//同Predicate,会在DiscoveryClientRouteDefinitionLocator#getRouteDefinition中将'service-id'替换为注册中心上的服务名,例如 /user-service/(?<remaining>.*)
String regex = "'/' + serviceId + '/(?<remaining>.*)'";
String replacement = "'/${remaining}'";
filter.addArg(REGEXP_KEY, regex);
filter.addArg(REPLACEMENT_KEY, replacement);
definitions.add(filter);

return definitions;
}

@Bean
public DiscoveryLocatorProperties discoveryLocatorProperties() {
DiscoveryLocatorProperties properties = new DiscoveryLocatorProperties();
//设置Predicate
properties.setPredicates(initPredicates());
//设置GatewayFilter
properties.setFilters(initFilters());
return properties;
}

结合注册中心其实有两种DiscoveryClient使用,一种是原始的DiscoveryClient,一种是ReactiveDiscoveryClient,不同的注册中心都有相应的实现,如nacos的NacosReactiveDiscoveryClient。可以通过配置spring.cloud.discovery.reactive.enabled=true来开启使用Reactive模式的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
json复制代码@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(value = "spring.cloud.discovery.reactive.enabled",
matchIfMissing = true)
public static class ReactiveDiscoveryClientRouteDefinitionLocatorConfiguration {

/**
*
* @param discoveryClient Reactive的实现,如果使用nacos,这里注入的为 {@link com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClient}
*/
@Bean
@ConditionalOnProperty(name = "spring.cloud.gateway.discovery.locator.enabled")
public DiscoveryClientRouteDefinitionLocator discoveryClientRouteDefinitionLocator(
ReactiveDiscoveryClient discoveryClient,
DiscoveryLocatorProperties properties) {
return new DiscoveryClientRouteDefinitionLocator(discoveryClient, properties);
}

}

/**
* @deprecated In favor of the native reactive service discovery capability.
*/
@Configuration(proxyBeanMethods = false)
@Deprecated
@ConditionalOnProperty(value = "spring.cloud.discovery.reactive.enabled",
havingValue = "false")
public static class BlockingDiscoveryClientRouteDefinitionLocatorConfiguration {

@Bean
@ConditionalOnProperty(name = "spring.cloud.gateway.discovery.locator.enabled")
public DiscoveryClientRouteDefinitionLocator discoveryClientRouteDefinitionLocator(
DiscoveryClient discoveryClient, DiscoveryLocatorProperties properties) {
return new DiscoveryClientRouteDefinitionLocator(discoveryClient, properties);
}

}

DiscoveryClientRouteDefinitionLocator

DiscoveryClientRouteDefinitionLocator的主要工作是获取到所有的注册中心上的服务实例,根据服务信息创建PredicateDefnition->FilterDefinition->RouteDefinition。供CompositeRouteDefinitionLocator获取。
每一个服务都会生成一个RouteDefinition。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
json复制代码public class DiscoveryClientRouteDefinitionLocator implements RouteDefinitionLocator {

private final DiscoveryLocatorProperties properties;

private final String routeIdPrefix;

private final SimpleEvaluationContext evalCtxt;

private Flux<List<ServiceInstance>> serviceInstances;

public DiscoveryClientRouteDefinitionLocator(ReactiveDiscoveryClient discoveryClient,
DiscoveryLocatorProperties properties) {
this(discoveryClient.getClass().getSimpleName(), properties);
//通过对应注册中心的discoveryClient获取到所有的服务实例
serviceInstances = discoveryClient.getServices()
.flatMap(service -> discoveryClient.getInstances(service).collectList());
}

private DiscoveryClientRouteDefinitionLocator(String discoveryClientName,
DiscoveryLocatorProperties properties) {
this.properties = properties;
//判断是否有路由ID前缀,如果没有则
if (StringUtils.hasText(properties.getRouteIdPrefix())) {
routeIdPrefix = properties.getRouteIdPrefix();
}
else {
routeIdPrefix = discoveryClientName + "_";
}
evalCtxt = SimpleEvaluationContext.forReadOnlyDataBinding().withInstanceMethods()
.build();
}

@Override
public Flux<RouteDefinition> getRouteDefinitions() {

return serviceInstances.filter(instances -> !instances.isEmpty())
.map(instances -> instances.get(0)).filter(includePredicate)
.map(instance -> {
//创建RouteDefinition
RouteDefinition routeDefinition = buildRouteDefinition(urlExpr,
instance);

final ServiceInstance instanceForEval = new DelegatingServiceInstance(
instance, properties);

for (PredicateDefinition original : this.properties.getPredicates()) {
//根据服务信息重新构建PredicateDefinition
PredicateDefinition predicate = new PredicateDefinition();
predicate.setName(original.getName());
for (Map.Entry<String, String> entry : original.getArgs()
.entrySet()) {
//将Path参数值的service-id替换为服务名称,如/user-service/**
String value = getValueFromExpr(evalCtxt, parser,
instanceForEval, entry);
predicate.addArg(entry.getKey(), value);
}
routeDefinition.getPredicates().add(predicate);
}

for (FilterDefinition original : this.properties.getFilters()) {
FilterDefinition filter = new FilterDefinition();
filter.setName(original.getName());
for (Map.Entry<String, String> entry : original.getArgs()
.entrySet()) {
//将Filter的regex -> '/' + serviceId + '/(?<remaining>.*)' 中的serviceId替换为服务ID 如user-service
String value = getValueFromExpr(evalCtxt, parser,
instanceForEval, entry);
filter.addArg(entry.getKey(), value);
}
routeDefinition.getFilters().add(filter);
}

return routeDefinition;
});
}

protected RouteDefinition buildRouteDefinition(Expression urlExpr,
ServiceInstance serviceInstance) {
//获取服务ID,默认小写
String serviceId = serviceInstance.getServiceId();
RouteDefinition routeDefinition = new RouteDefinition();
//设置路由ID
routeDefinition.setId(this.routeIdPrefix + serviceId);
//通过Spel解析器生成RouteUri
String uri = urlExpr.getValue(this.evalCtxt, serviceInstance, String.class);
routeDefinition.setUri(URI.create(uri));
//设置元数据信息,包括权重、健康状态等
routeDefinition.setMetadata(new LinkedHashMap<>(serviceInstance.getMetadata()));
return routeDefinition;
}
}

请求处理

RewritePathGatewayFilterFactory

上边讲到了当结合注册中心时SCG会为每个路由添加PathRoutePredicateFactory
RewritePathGatewayFilterFactoryPathRoutePredicateFactory用来计算请求是否符合当前路由的条件,RewritePathGatewayFilterFactory用来重写请求Path,参数regexp=/user-service/(?<remaining>.*),replacement=$(remaining),例如请求的Path为/user-service/api/hello,会被重写为/api/hello

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
json复制代码public class RewritePathGatewayFilterFactory
extends AbstractGatewayFilterFactory<RewritePathGatewayFilterFactory.Config> {

/**
* Regexp key.
*/
public static final String REGEXP_KEY = "regexp";

/**
* Replacement key.
*/
public static final String REPLACEMENT_KEY = "replacement";

public RewritePathGatewayFilterFactory() {
super(Config.class);
}

@Override
public List<String> shortcutFieldOrder() {
return Arrays.asList(REGEXP_KEY, REPLACEMENT_KEY);
}

@Override
public GatewayFilter apply(Config config) {
String replacement = config.replacement.replace("$\\", "$");
return new GatewayFilter() {
@Override
public Mono<Void> filter(ServerWebExchange exchange,
GatewayFilterChain chain) {
ServerHttpRequest req = exchange.getRequest();
//每次进行重写时,都在上下文中保留一次原址的请求URI
addOriginalRequestUrl(exchange, req.getURI());
String path = req.getURI().getRawPath();
//根据配置的正则进行替换
// regexp=/user-service/(?<remaining>.*),replacement=$(remaining),例如请求的Path为/user-service/api/hello,会被重写为/api/hello。
String newPath = path.replaceAll(config.regexp, replacement);
//基于重写后的Path构建新的请求
ServerHttpRequest request = req.mutate().path(newPath).build();
//将新的请求URI放入上下文中,供后边的Filter使用
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, request.getURI());

return chain.filter(exchange.mutate().request(request).build());
}

@Override
public String toString() {
return filterToStringCreator(RewritePathGatewayFilterFactory.this)
.append(config.getRegexp(), replacement).toString();
}
};
}

RouteToRequestUrlFilter

RewritePathGatewayFilterFactory重写完请求Path后会执行GlobalFilterRouteToRequestUrlFilter,该Filter在结合注册中心的情况下,主要是用来将RewritePathGatewayFilterFactory生成的新的request的scheme修改为路由的lb,例如RewritePathGatewayFilterFactory生成的请求URI为http://locahost:8080/api/helloRouteToRequestUrlFilter会将其修改为lb://user-service/api/hello,供lbClicentFilter使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
json复制代码@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
//判断上下中是否有GATEWAY_ROUTE_ATTR,在RoutePredicateHandlerMapping中放入的
//如果没有则不执行
if (route == null) {
return chain.filter(exchange);
}
log.trace("RouteToRequestUrlFilter start");
//获取请求的URI
URI uri = exchange.getRequest().getURI();
//判断是否包含编码的部分,如%
boolean encoded = containsEncodedParts(uri);
//获取Route的uri
URI routeUri = route.getUri();

//判断是否为其他类型的协议
if (hasAnotherScheme(routeUri)) {
// this is a special url, save scheme to special attribute
// replace routeUri with schemeSpecificPart
//将当前请求的schema放入上下文中
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,
routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}
//如果RouteUri以lb开头,必须请求中带有host
if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
// Load balanced URIs should always have a host. If the host is null it is
// most
// likely because the host name was invalid (for example included an
// underscore)
throw new IllegalStateException("Invalid host: " + routeUri.toString());
}
//生成RequestURL,并放入上下文中
//使用RouteUri的scheme,如果使用lb的话,那么此处生成的mergedUrl则是lb://xxxxxx
URI mergedUrl = UriComponentsBuilder.fromUri(uri)
// .uri(routeUri)
.scheme(routeUri.getScheme()).host(routeUri.getHost())
.port(routeUri.getPort()).build(encoded).toUri();
//将新的URL放入请求上下文
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
return chain.filter(exchange);
}

}

LoadBalancerClientFilter

到目前为止,还没有获取到真正要调用的服务信息,LoadBalancerClientFilter就是做这件事的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
json复制代码@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
//如果不是lb的请求,则不执行
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
//保留原始的请求地址
addOriginalRequestUrl(exchange, url);

if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}
//负载均衡获取真实的服务信息
final ServiceInstance instance = choose(exchange);

if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}

URI uri = exchange.getRequest().getURI();

// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
//使用最终调用服务信息构建URI
URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri);

if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
//将请求URI放入上下文,供NettyRoutingFilter使用
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}

protected ServiceInstance choose(ServerWebExchange exchange) {
//此处调用RibbonLoadBalancer负载均衡获取真实服务信息
return loadBalancer.choose(
((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost());
}

动态路由刷新

事件机制。
在SCG中有RouteRefreshListener用来监听刷新的事件,比如Nacos使用NacosWatch来发送HeartbeatEvent。

1
2
3
4
5
6
7
json复制代码public void nacosServicesWatch() {

// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent(
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));

}

RouteRefreshListener中监听到HeartbeatEvent后会发送RefreshRoutesEventCachingRouteLocator中监听了该事件,而后触发DiscoveryClientRouteDefinitionLocator#getRouteDefinition从注册中心重新获取一次服务信息,生成RouteDefinition。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%