盘点 Cloud Feign 负载均衡策略

这是我参与更文挑战的第2天,活动详情查看: 更文挑战

总文档 :文章目录

Github : github.com/black-ant

一 . 前言

这是Feign 系列文章中的一篇 , 主要说明 Feign 关于负载均衡的相关知识点 , 文章包含如下内容 :

  • Feign 使用 Ribbon 时的负载均衡处理

二 . Feign 的轮询策略

Feign 其本身有一套轮询的逻辑处理 Server 的调用 , 其主要流程如下 :

  • Step 1 : 调用 submit 获取 server 信息
  • Step 2 : selectServer 选择 Server
  • Step 3 : 回调执行 ServerOperation 中的请求发起

前期调用逻辑

为了方便理解 , 我们来看一下其之前的调用逻辑 , Feign 主要是通过 Invoke 的方式发起方法的代理 :

  • Step 1 : 触发 InvoKe (FeignInvocationHandler)
  • Step 2 : SynchronousMethodHandler 的调用 , 处理对于方法逻辑
  • Step 3 : LoadBalancerFeignClient 调用主逻辑
  • Step 4 : AbstractLoadBalancerAwareClient 中间过渡 , 处理 URL , 生成真正的URL
  • Step 5 : 最终调用的是 Client 的内部类 (Default)

2.1 获取 Server 入口

可以看到 , 在如上第四步中 , 开始负载均衡的相关处理 :

executeWithLoadBalancer 是 AbstractLoadBalancerAwareClient 中的负载均衡方法 , 该类位于 com.netflix.loadbalancer 包中

PS : OpenFeign 本身就携带了 Ribbon 依赖 !

首先 , 我们来看一下 executeWithLoadBalancer 主逻辑 , 这里 Submit 是一个Function 语法糖调用 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码C- AbstractLoadBalancerAwareClient
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

try {
// Step 1 : submit 中获取 Server
// Step 2 : 通过 ServerOperation 语法糖 , 执行具体的操作
return command.submit(// 暂时省略 Function:003 )
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}

接下来看一下 Submit 方法中做了什么事情 :

  • Step 1 : 容器准备
  • Step 2 : 负载均衡监听器开启
  • Step 3 : 重试次数设置
    • maxRetrysSame : 在一台服务器上执行的最大重试次数
    • maxRetrysNext : 要重试的最大不同服务器的数量
  • Step 4 : 使用负载均衡器获得 Server ]
  • Step 5 : 重试策略的处理
  • Step 6 : 流程异常处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码// C- LoadBalancerCommand : 做过魔改 , 仅展示核心的代码
public Observable<T> submit(final ServerOperation<T> operation) {

// Step 1 : 容器准备
final ExecutionInfoContext context = new ExecutionInfoContext();

// Step 2 : ExecutionContextListenerInvoker 负载均衡器在执行的不同阶段调用的侦听器
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}

// Step 3 : 重试次数设置
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

// Step 4 : 使用负载均衡器获得 Server
// 选择 Server , 此处已经通过 Rule 选择完成
Observable<Server> servers = server == null ? selectServer() : Observable.just(server);
Observable<T> o = servers.concatMap(// PS : Function:001 详见);

// Step 5 : 重试策略的处理
if (maxRetrysNext > 0 && server == null){
o = o.retry(retryPolicy(maxRetrysNext, false));
}

// Step 6 : 流程异常处理
return o.onErrorResumeNext( // PS : Function:002 详见);
}

重点 : 在Step 4 中 , selectServer 就已经完成负载均衡的选择处理了 , 其后在 concatMap (Function 1)中进行了进一步的操作 :

[Pro] : Function:001 中干了什么 ?

总结: 简单来说 , 就是数据审计 , 监听器触发和方法扩展 , 用于后续的回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
java复制代码C- LoadBalancerCommand

// 观察者对象编号 : Observable:001
new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
// 容器设置 Server
context.setServer(server);

// 捕获LoadBalancer中的每个服务器(节点)的各种统计数据
final ServerStats stats = loadBalancerContext.getServerStats(server);

// 尝试调用和重试操作 (Observable:002)
Observable<T> o = Observable.just(server).concatMap( new Func1<Server, Observable<T>>(){
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
loadBalancerContext.noteOpenConnection(stats);

if (listenerInvoker != null) {
try {
// 当选择服务器并且请求将在服务器上执行时调用
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}

// 计时器开始
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();

// 提供接收基于推送的通知的机制 -> UNDO:001 / Observable:003
return operation.call(server).doOnEach(new Observer<T>() {

private T entity;

// 省略其中代码 , 其中实现了 四个方法 :

// - onCompleted : 完成后调用
recordStats(tracer, stats, entity, null)

// - onError : 异常时调用 , 区别是传入了 exception
recordStats(tracer, stats, null, e)
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo())

// - onNext : 为观察者提供一个要观察的新项目 , 设置 entity + 触发监听器
this.entity = entity
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo())


// - recordStats : 停止计时 + 更新统计数据
tracer.stop()
oadBalancerContext.noteRequestCompletion(
stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler)
});
}
}

[Pro] : Function:002 中干了什么 ?

Function 2 是异常处理流程 , 其中主要干了三件事 :

  • 通过不同的重试配置 , 构建不同的 ClientException
  • listenerInvoker 存在 , 触发事件
  • 返回一个观察者对象 , 当观察者订阅了这个Observable时,它会调用ObserverObserver#onError方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码C- LoadBalancerCommand
new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
// 建不同的 ClientException
if (context.getAttemptCount() > 0) {
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(......);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(......);
}
}

if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
}

2.2 selectServer 选择 Server

前面说了 , 负载均衡的核心就是 SelectServer :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码C- LoadBalancerCommand
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
// 通过 loadBalancerContext 进行负载均衡处理
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);

// Observable 模式的其他调用
next.onNext(server);
next.onCompleted();
}
});
}

2.3 getServerFromLoadBalancer 主逻辑

这一段逻辑比较长 , 但是核心代码不多 , 我们仅保留其中最核心的几个部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码C- LoadBalancerContext
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {

// 属性准备
String host = null;
int port = -1;

// 省略点一 : 会尝试从 URI original (http:///template/get?desc=order-server 中获取 Host 和 Post
// 当然 ,正常模式下是获取不到的 , 所以这里省略 host != null 的情况 >>>

// 获取 ILoadBalancer 对象 , 此处主要为 ZoneAwareLoadBalancer , 会调用父类 BaseLoadBalancer
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
if (lb != null){
// 核心逻辑 : 选择 Server 对象 --> 详见 2.4
Server svc = lb.chooseServer(loadBalancerKey);
host = svc.getHost();
return svc;
} else {
// 省略负载均衡器不存在的逻辑
}
} else {
// 省略 host 不为 null
}

// 最终构建一个 新 Server 对象返回
return new Server(host, port);
}



// PS : 其中省略了很多判空抛出异常的逻辑 , 通常结构如下所示
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,"....");
}

Feign_ILoadBalancer.png

2.4 chooseServer 中通过 Rule 选择 Server

此处就是最终算法的使用点 , 通过 Rule 调用最终的策略进行处理 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码C- BaseLoadBalancer
public Server chooseServer(Object key) {
if (counter == null) {
// 用于跟踪某个事件发生频率的监视器类型
counter = createCounter();
}
counter.increment();

if (rule == null) {
return null;
} else {
// 调用 Rule 返回具体的 Server
return rule.choose(key);

// 忽略 try-catch
}
}

Rule 体系中提供了完善的体系 , 来看一下 Rule 体系的完整结构 :

Feign_IRule.png

PS : 这里主要使用 PredicateBasedRule

2.5 PredicateBasedRule 案例分析

其中有几个主要的逻辑 :

  • lb.getAllServers() : 获取所有的Server
  • getEligibleServers(servers, loadBalancerKey) :
  • incrementAndGetModulo(eligible.size()) :

Step 1 : choose 选择 Server 可以看到 , 这里是通过 Predicate 进行具体的选择

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码C- PredicateBasedRule 
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
// 注意此处的 lb.getAllServers() , 已经获取了所有的 Server 列表
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}

Step 2 : Server 选择主方法

在该方法中首先获取一个正确的 Server List (Step 3), 在使用算法选择具体的 Server (Step 4)

1
2
3
4
5
6
7
8
9
10
java复制代码C- PredicateBasedRule 
// 选择合适的 Service
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
// 核心语句 : 此处从 Server 中获取 Server
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

Step 3 : 获取正确的 Server List

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码C- PredicateBasedRule 
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
//
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}

Step 4 : 使用算法选择具体的 Server

1
2
3
4
5
6
7
8
9
10
java复制代码C- PredicateBasedRule 
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo){
return current;
}
}
}

2.6 执行 ServerOperation (Function:003)

当Submit 执行Server 处理时(详见 -> UNDO:001) , 就会回调之前准备的观察者对象 , 此处通过 Server 生成自己需要的 URL , 完成负载均衡的处理

这里也是一种观察者的使用 , 在所有的处理完成后 , 通过订阅的方式执行后续逻辑代码 >>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码C- AbstractLoadBalancerAwareClient
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
// 获取实际的 URL , 指向具体的 Server
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
} catch (Exception e) {
return Observable.error(e);
}
}
}

其他

问题一 : 观察者及 Function 的调用逻辑

Feign 负载均衡中一大麻烦就是观察者的使用及Function 语法糖 , 这种回调模式很容易混淆整个逻辑 :

  • Step 1 : LoadBalancerCommand 中取得 Server 对象
  • Step 2 : Function:002 中进行 Server 的二次处理
  • Step 3 : Observable:002 中执行监听器和计时
  • Step 4 : 回到 AbstractLoadBalancerAwareClient 中的 Function:003 生成 Url 及后续逻辑
  • Step 5 : Observable:003 中 onNext 触发监听器和设置 entity 实体
  • Step 6 : Observable:003 中 onCompleted 对请求完成进行订阅处理

问题二 : ExecutionListener 的使用

  • 作用 : 负载均衡器在执行的不同阶段调用的监听器
  • 使用 : 提供了如下几种方法
    • onExecutionStart : 启动
    • onStartWithServer : 当选择服务器并且请求将在服务器上执行时调用
    • onExceptionWithServer : Server 出现异常
    • onExecutionSuccess : 执行成功
    • onExecutionFailed : 执行失败
1
2
3
4
5
6
7
java复制代码public void onExecutionStart(ExecutionContext<I> context) {
for (ExecutionListener<I, O> listener : listeners) {
if (!isListenerDisabled(listener)) {
listener.onExecutionStart(context.getChildContext(listener));
}
}
}

总结

这篇文章力求把负载均衡的逻辑理清楚 , 现在看基本上满足了要求 , 其实整个过程中还有很多可以思考的地方

例如 :

  • Feign 中多观察者对象的使用 , 虽然可读性变差了 ,但是业务能力提升了不少 , 如何更好的设计这套逻辑
  • Feign Ribbon Rule 的使用 ,是否可以自行定制一套或者通过扩展的方式来外界算法

这些东西都会在后续的文章中 , 进行分析 , 拭目以待.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%