Spring Cloud Gateway 雪崩了,我 TM

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

本系列是 我TM人傻了 系列第六期[捂脸],往期精彩回顾:

image

大家好,我又人傻了。这次的经验告诉我们,出来写代码偷的懒,迟早要还的。

问题现象与背景

昨晚我们的网关雪崩了一段时间,现象是:

1.不断有各种微服务报异常:在写 HTTP 响应的时候,连接已经关闭

1
makefile复制代码reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response

2.同时还有请求还没读取完,连接已经关闭的异常

1
lua复制代码org.springframework.http.converter.HttpMessageNotReadableException: I/O error while reading input message; nested exception is java.io.IOException: UT000128: Remote peer closed connection before all data could be read

3.前端不断有请求超时的报警,504 Gateway Time-out

4.网关进程不断健康检查失败而被重启

5.重启后的网关进程,立刻请求数量激增,每个实例峰值 2000 qps,闲时每个实例 500 qps,忙时由于有扩容也能保持每个实例在 1000 qps 以内,然后健康检查接口就很长时间没有响应,导致实例不断重启

其中,1 和 2 的问题应该是应为网关不断重启,并且由于某些原因优雅关闭失败导致强制关闭,强制关闭导致连接被强制断开从而有 1 和 2 相关的异常。

我们的网关是基于 Spring Cloud Gateway 实现的,并且有自动根据 CPU 负载扩容的机制。奇怪的是在请求数量彪增的时候,CPU 利用率并没有提高很多,保持在 60% 左右,由于 CPU 负载没有达到扩容的界限,所以一直没有自动扩容。为了快速解决问题,我们手动扩容了几个网关实例,将网关单实例负载控制在了 1000 以内,暂时解决了问题。

问题分析

为了彻底解决这个问题,我们使用 JFR 分析。首先先根据已知的线索去分析:

  1. Spring Cloud Gateway 是基于 Spring-WebFlux 实现的异步响应式网关,http 业务线程是有限的(默认是 2 * 可以使用的 CPU 个数,我们这里是 4)。
  2. 网关进程不断健康检查失败,健康检查调用的是 /actuator/health 接口,这个接口一直超时。

健康检查接口超时一般有两个原因:

  1. 健康检查接口检查某个组件的时候,阻塞住了。例如数据库如果卡住,那么可能数据库健康检查会一直没有返回。
  2. http 线程池没来得及处理健康检查请求,请求就超时了。

我们可以先去看 JFR 中的定时堆栈,看是否有 http 线程卡在健康检查上面。查看出问题后的线程堆栈,重点关注那 4 个 http 线程,结果发现这 4 个线程的堆栈基本一样,都是在执行 Redis 命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
less复制代码"reactor-http-nio-1" #68 daemon prio=5 os_prio=0 cpu=70832.99ms elapsed=199.98s tid=0x0000ffffb2f8a740 nid=0x69 waiting on condition  [0x0000fffe8adfc000]
java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.8/Native Method)
- parking to wait for <0x00000007d50eddf8> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.8/LockSupport.java:234)
at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.8/CompletableFuture.java:1798)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.8/ForkJoinPool.java:3128)
at java.util.concurrent.CompletableFuture.timedGet(java.base@11.0.8/CompletableFuture.java:1868)
at java.util.concurrent.CompletableFuture.get(java.base@11.0.8/CompletableFuture.java:2021)
at io.lettuce.core.protocol.AsyncCommand.await(AsyncCommand.java:83)
at io.lettuce.core.internal.Futures.awaitOrCancel(Futures.java:244)
at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75)
at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
at com.sun.proxy.$Proxy245.get(Unknown Source)
at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.get(LettuceStringCommands.java:68)
at org.springframework.data.redis.connection.DefaultedRedisConnection.get(DefaultedRedisConnection.java:267)
at org.springframework.data.redis.connection.DefaultStringRedisConnection.get(DefaultStringRedisConnection.java:406)
at org.springframework.data.redis.core.DefaultValueOperations$1.inRedis(DefaultValueOperations.java:57)
at org.springframework.data.redis.core.AbstractOperations$ValueDeserializingRedisCallback.doInRedis(AbstractOperations.java:60)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:222)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:189)
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96)
at org.springframework.data.redis.core.DefaultValueOperations.get(DefaultValueOperations.java:53)
at com.jojotech.apigateway.filter.AccessCheckFilter.traced(AccessCheckFilter.java:196)
at com.jojotech.apigateway.filter.AbstractTracedFilter.filter(AbstractTracedFilter.java:39)
at org.springframework.cloud.gateway.handler.FilteringWebHandler$GatewayFilterAdapter.filter(FilteringWebHandler.java:137)
at org.springframework.cloud.gateway.filter.OrderedGatewayFilter.filter(OrderedGatewayFilter.java:44)
at org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain.lambda$filter$0(FilteringWebHandler.java:117)
at org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain$$Lambda$1478/0x0000000800b84c40.get(Unknown Source)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoFilterWhen$MonoFilterWhenMain.onNext(MonoFilterWhen.java:149)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2397)
at reactor.core.publisher.MonoFilterWhen$MonoFilterWhenMain.onSubscribe(MonoFilterWhen.java:112)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:250)
at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:98)
at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:44)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:270)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:228)
at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.request(FluxDematerialize.java:127)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:235)
at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onSubscribe(FluxDematerialize.java:77)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at org.springframework.cloud.sleuth.instrument.web.TraceWebFilter$MonoWebFilterTrace.subscribe(TraceWebFilter.java:184)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:915)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:654)
at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:478)
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:526)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:209)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at reactor.netty.http.server.logging.AccessLogHandlerH1.channelRead(AccessLogHandlerH1.java:59)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.8/Thread.java:834)

发现 http 线程没有卡在健康检查,同时其他线程也没有任何和健康检查相关的堆栈(异步环境下,健康检查也是异步的,其中某些过程可能交给其他线程)。所以,健康检查请求应该是还没被执行就超时取消了。

那么为啥会这样呢?于此同时,我还发现这里用的是 RedisTemplate,是 spring-data-redis 的同步 Redis API。我猛然想起来之前写这里的代码的时候,因为只是验证一个 key 是否存在和修改 key 的过期时间,偷懒没有用异步 API。这里是不是因为使用同步 API 阻塞了 http 线程导致的雪崩呢?

我们来验证下这个猜想:我们的项目中 redis 操作是通过 spring-data-redis + Lettuce 连接池,启用并且增加了关于 Lettuce 命令的 JFR 监控,可以参考我的这篇文章:这个 Redis 连接池的新监控方式针不戳~我再加一点佐料,截至目前我的 pull request 已经合并,这个特性会在 6.2.x 版本发布。我们看下出问题时间附近的 Redis 命令采集,如下图所示:

image

我们来简单计算下执行 Redis 命令导致的阻塞时间(我们的采集是 10s 一次,count 是命令次数,时间单位是微秒):使用这里的命令个数乘以 50% 的中位数,除以 10(因为是 10s),得出每秒因为执行 Redis 命令导致的阻塞时间:

1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码32*152=4864
1*860=860
5*163=815
32*176=5632
1*178=178
16959*168=2849112
774*176=136224
3144*166=521904
17343*179=3104397
702*166=116532
总和 6740518
6740518 / 10 = 674051.8 us = 0.67s

这个仅仅是使用中位数计算的阻塞时间,从图上的分布其实可以看出真正的值应该比这个大,这样很有可能每秒需要在 Redis 同步接口上阻塞的时间就超过了 1s,不断地请求,请求没有减少,从而导致了请求越积越多,最后雪崩。

并且由于是阻塞接口,线程很多时间消耗在等待 io 了,所以 CPU 上不去,导致没有自动扩容。业务高峰时,由于有设定好的预先扩容,导致网关单实例没有达到出问题的压力,所以没问题。

解决问题

我们来改写原有代码,使用同步 spring-data-redis Api 原有代码是(其实就是 spring-cloud-gateway 的 Filter 接口的核心方法 public Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain) 的方法体):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
scss复制代码if (StringUtils.isBlank(token)) {
//如果 token 不存在,则根据路径决定继续请求还是返回需要登录的状态码
return continueOrUnauthorized(path, exchange, chain, headers);
} else {
try {
String accessTokenValue = redisTemplate.opsForValue().get(token);
if (StringUtils.isNotBlank(accessTokenValue)) {
//如果 accessTokenValue 不为空,则续期 4 小时,保证登录用户只要有操作就不会让 token 过期
Long expire = redisTemplate.getExpire(token);
log.info("accessTokenValue = {}, expire = {}", accessTokenValue, expire);
if (expire != null && expire < 4 * 60 * 60) {
redisTemplate.expire(token, 4, TimeUnit.HOURS);
}

//解析,获取 userId
JSONObject accessToken = JSON.parseObject(accessTokenValue);
String userId = accessToken.getString("userId");
//如果 userId 不为空才合法
if (StringUtils.isNotBlank(userId)) {
//解析 Token
HttpHeaders newHeaders = parse(accessToken);
//继续请求
return FilterUtil.changeRequestHeader(exchange, chain, newHeaders);
}
}
} catch (Exception e) {
log.error("read accessToken error: {}", e.getMessage(), e);
}
//如果 token 不合法,则根据路径决定继续请求还是返回需要登录的状态码
return continueOrUnauthorized(path, exchange, chain, headers);
}

改成使用异步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
scss复制代码if (StringUtils.isBlank(token)) {
return continueOrUnauthorized(path, exchange, chain, headers);
} else {
HttpHeaders finalHeaders = headers;
//必须使用 tracedPublisherFactory 包裹,否则链路信息会丢失,这里参考我的另一篇文章:Spring Cloud Gateway 没有链路信息,我 TM 人傻了
return tracedPublisherFactory.getTracedMono(
redisTemplate.opsForValue().get(token)
//必须切换线程,否则后续线程使用的还是 Redisson 的线程,如果耗时长则会影响其他使用 Redis 的业务,并且这个耗时也算在 Redis 连接命令超时中
.publishOn(Schedulers.parallel()),
exchange
).doOnSuccess(accessTokenValue -> {
if (accessTokenValue != null) {
//accessToken续期,4小时
tracedPublisherFactory.getTracedMono(redisTemplate.getExpire(token).publishOn(Schedulers.parallel()), exchange).doOnSuccess(expire -> {
log.info("accessTokenValue = {}, expire = {}", accessTokenValue, expire);
if (expire != null && expire.toHours() < 4) {
redisTemplate.expire(token, Duration.ofHours(4)).subscribe();
}
}).subscribe();
}
})
//必须转换成非 null,否则 flatmap 不会执行;也不能在末尾用 switchIfEmpty,因为整体返回的是 Mono<Void> 本来里面承载的就是空的,会导致每个请求发送两遍。
.defaultIfEmpty("")
.flatMap(accessTokenValue -> {
try {
if (StringUtils.isNotBlank(accessTokenValue)) {
JSONObject accessToken = JSON.parseObject(accessTokenValue);
String userId = accessToken.getString("userId");
if (StringUtils.isNotBlank(userId)) {
//解析 Token
HttpHeaders newHeaders = parse(accessToken);
//继续请求
return FilterUtil.changeRequestHeader(exchange, chain, newHeaders);
}
}
return continueOrUnauthorized(path, exchange, chain, finalHeaders);
} catch (Exception e) {
log.error("read accessToken error: {}", e.getMessage(), e);
return continueOrUnauthorized(path, exchange, chain, finalHeaders);
}
});
}

这里有几个注意点:

  1. Spring-Cloud-Sleuth 对于 Spring-WebFlux 中做的链路追踪优先,如果我们在 Filter 中创建新的 Flux 或者 Mono,这里面是没有链路信息的,需要我们手动加入。这个可以参考我的另一篇文章:Spring Cloud Gateway 没有链路信息,我 TM 人傻了
  2. spring-data-redis + Lettuce 连接池的组合,对于异步接口,我们最好在获取响应之后切换成别的线程池执行,否则后续线程使用的还是 Redisson 的线程,如果耗时长则会影响其他使用 Redis 的业务,并且这个耗时也算在 Redis 连接命令超时中
  3. Project Reactor 如果中间结果有 null 值,则后面的 flatmap、map 等流操作就不执行了。如果在这里终止,前端收到的响应是有问题的。所以中间结果我们要在每一步考虑 null 问题。
  4. spring-cloud-gateway 的核心 GatewayFilter 接口,核心方法返回的是 Mono<Void>。Mono 本来里面承载的就是空的,导致我们不能使用末尾的 switchIfEmpty 来简化中间步骤的 null,如果用了会导致每个请求发送两遍。

这样修改后,压测了下网关,单实例 2w qps 请求也没有出现这个问题了。

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%