Spring Cloud Gateway源码解析-11-扩展

由来

Spring Cloud Gateway源码解析-10-自定义Predicate实现黑名单中我们自定义了Predicate来实现黑名单,但发现每次更改黑名单规则都要重启项目来实现,因此需要将路由信息存储在外部数据源中,定时刷新SCG内存中的路由信息。

思路

在这里插入图片描述

Spring Cloud Gateway源码解析-03-RouteDefinitionLocator、RouteLocator解析中我们已经介绍过RouteDefinitionRepository,该接口在SCG中只有一个实现InMemoryRouteDefinitionRepository,并且该接口继承了RouteDefinitionWriterRouteDefinitionWriter中定义了save、delete方法,通过方法名称可以知道是用来保存/添加/删除路由信息。

  1. 因此我们可以实现RouteDefinitionRepository用来保存从Redis中获取到的RouteDefinitionRedisRouteDefinitionRepository,由于RouteDefinitionRepository继承了RouteDefinitionLocator,因此会被CompositeRouteDefinitionLocator组合进去,从而被CachingRouteLocator拿到对应的Redis中的RouteDefinition装换成Route。
  2. 有了地方存储Redis中的定义的RouteDefinition,那是不是要有一个角色用来获取Redis中的数据,并组装成RouteDefinition保存到RedisRouteDefinitionRepository中,因此需要定义RedisRouteDefinitionRepositoryOperator用来从Redis中获取到数据库后生成RouteDefinition。可能我们的路由信息以后会放到MySQL、MongoDB等,因此可以抽象出一个从Repository中获取数据转换为RouteDefinition的接口RouteDefinitionRepositoryOperator
  3. 基于上边这些,我们就实现了当SCG启动时从Redis中获取数据转换为RouteDefinition,并保存到RedisRouteDefinitionRepository中,但是想要实现当修改了Redis中的路由信息后同步SCG更新,还不够,需要有一个类似Nacos的心跳机制,定时通知SCG去重新获取一次Redis中的数据。因此可以模仿Nacos的心跳机制实现RedisRouteDefinitionWatch发送心跳事件,触发CachingRouteLocator重新获取RouteDefinition来重新生成Route。

实现

RouteDefinitionRepositoryOperator

1
2
3
4
5
6
7
8
9
10
11
json复制代码/**
* 定义从不同数据源获取RouteDefinition的抽象
* @author li.hongjian
* @email lhj502819@163.com
* @Date 2021/4/1
*/
public interface RouteDefinitionRepositoryOperator {

Flux<RouteDefinition> getRouteDefinitions();

}

RedisRouteDefinitionRepositoryOperator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
json复制代码/**
* Description:用来获取Redis中的RouteDefinition 并保存到{@link RedisRouteDefinitionRepository}
*
* @author li.hongjian
* @email lhj502819@163.com
* @Date 2021/4/1
*/
public class RedisRouteDefinitionRepositoryOperator implements RouteDefinitionRepositoryOperator {

private final String REDIS_ROUTE_ID_PREFIX = "route-*";

private StringRedisTemplate redisTemplate;

public RedisRouteDefinitionRepositoryOperator(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}


@Override
public Flux<RouteDefinition> getRouteDefinitions() {
//获取指定前缀的RedisKey。Redis的数据结构使用Hash,value的结构为predicates和filters,
//predicates数据结构JsonArray,可配置多个
// 由于PredicateDefinition的构造方法支持传入类似Path=/api/hello这种格式的参数,并会自动封装为name和args,因此我们取巧可以在Redis中存储如下结构
// 如:["Path=/api/hello","BlackRemoteAddr=172.17.30.1/18,172.17.31.1/18"],表示PathRoutePredicateFactory和BlackRemoteAddrRoutePredicateFactory
//filters与predicates一样
return Flux.fromStream(redisTemplate.keys(REDIS_ROUTE_ID_PREFIX).parallelStream().map(routeId -> {
RouteDefinition routeDefinition = new RouteDefinition();
//以RedisKey作为RouteID
routeDefinition.setId(routeId);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(routeId);
String uri = (String) entries.get("uri");
try {
routeDefinition.setUri(new URI(uri));
} catch (URISyntaxException e) {
e.printStackTrace();
}
//初始化PredicateDefinition,并添加到RouteDefinition中
initPredicate(routeDefinition, entries);

//初始化FilterDefinition,并添加到RouteDefinition中
initFilter(routeDefinition, entries);
return routeDefinition;
}));
}

private void initPredicate(RouteDefinition routeDefinition, Map<Object, Object> entries) {
Object predicates = entries.get("predicates");
if (predicates == null) {
return;
}
JSONArray predicateArry = JSONArray.parseArray((String) predicates);
predicateArry.parallelStream().forEach(predicate -> {
//遍历predicates,创建RouteDefinition,并添加到RouteDefinition中
PredicateDefinition predicateDefinition = new PredicateDefinition((String) predicate);
routeDefinition.getPredicates().add(predicateDefinition);
});
}

private void initFilter(RouteDefinition routeDefinition, Map<Object, Object> entries) {
Object filters = entries.get("filters");
if (filters == null) {
return;
}
JSONArray predicateArry = JSONArray.parseArray((String) filters);
predicateArry.parallelStream().forEach(filter -> {
//遍历predicates,创建RouteDefinition,并添加到RouteDefinition中
FilterDefinition filterDefinition = new FilterDefinition((String) filter);
routeDefinition.getFilters().add(filterDefinition);
});
}
}

RedisRouteDefinitionRepository

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
json复制代码/**
* Description:基于Redis作为RouteDefinition Repository
*
* @author li.hongjian
* @email lhj502819@163.com
* @Date 2021/4/1
*/
public class RedisRouteDefinitionRepository implements RouteDefinitionRepository{

private final Map<String, RouteDefinition> routes = synchronizedMap(
new LinkedHashMap<String, RouteDefinition>());

private RedisRouteDefinitionRepositoryOperator redidRouteDefinitionOperator;

/**
* 将RedisRouteDefinitionRepositoryOperator组装进来
* @param redidRouteDefinitionOperator
*/
public RedisRouteDefinitionRepository(RedisRouteDefinitionRepositoryOperator redidRouteDefinitionOperator) {
this.redidRouteDefinitionOperator = redidRouteDefinitionOperator;
}

/**
* 在{@link CompositeRouteDefinitionLocator#getRouteDefinitions()}调用时 调用redidRouteDefinitionOperator去Redis中取数据
* @return
*/
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
redidRouteDefinitionOperator.getRouteDefinitions().flatMap(r -> save(Mono.just(r))).subscribe();
return Flux.fromIterable(routes.values());
}

@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return route.flatMap(r -> {
if (StringUtils.isEmpty(r.getId())) {
return Mono.error(new IllegalArgumentException("id may not be empty"));
}
routes.put(r.getId(), r);
return Mono.empty();
});
}

@Override
public Mono<Void> delete(Mono<String> routeId) {
return routeId.flatMap(id -> {
if (routes.containsKey(id)) {
routes.remove(id);
return Mono.empty();
}
return Mono.defer(() -> Mono.error(
new NotFoundException("RouteDefinition not found: " + routeId)));
});
}
}

RedisRouteDefinitionWatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
json复制代码/**
* @author li.hongjian
* @email lhj502819@163.com
* @Date 2021/4/1
*/
public class RedisRouteDefinitionWatch implements ApplicationEventPublisherAware, SmartLifecycle {

private final TaskScheduler taskScheduler = getTaskScheduler();

private final AtomicLong redisWatchIndex = new AtomicLong(0);

private final AtomicBoolean running = new AtomicBoolean(false);

private ApplicationEventPublisher publisher;

private ScheduledFuture<?> watchFuture;

private static ThreadPoolTaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setBeanName("Redis-Watch-Task-Scheduler");
taskScheduler.initialize();
return taskScheduler;
}


@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}

@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::redisServicesWatch, 30000); //启动一个定时,30s执行一次
}
}

/**
* 这里最好是自定义一个事件,因为如果使用了Nacos的话,会冲突,这样的话需要修改SCG的源码,监听自定义的事件
* 我们就不这么做了,感兴趣的可以自行实现
*/
private void redisServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent( //30s发布一次事件,通知SCG重新拉取
new HeartbeatEvent(this, redisWatchIndex.getAndIncrement()));
}

@Override
public void stop() {
if (this.running.compareAndSet(true, false) && this.watchFuture != null) {
this.watchFuture.cancel(true);
}
}

@Override
public boolean isRunning() {
return false;
}
}

这样就大功告成了,实现了基于Redis配置路由信息并且可动态刷新的功能。

使用

1、Redis中数据:
在这里插入图片描述

2、将RedisRouteDefinitionWatch、RedisRouteDefinitionRepository、RedisRouteDefinitionRepositoryOperator放到Spring容器中,比如@Bean注入
通过以上两步,即可完成。代码写的比较简陋。

大家可自行验证下,亲测有效。代码仓库地址

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%