什么是Ribbon现在就带你研究!

什么是Ribbon

Ribbon是Netflix发布的负载均衡器。属于SpringCloud组件之一,用于实现客户端负载均衡功能。

服务器端负载均衡

所谓服务器端负载均衡,⽐如Nginx、F5这些,请求到达服务器之后由这些负载均衡
器根据⼀定的算法将请求路由到⽬标服务器处理。

客户端负载均衡

所谓客户端负载均衡,⽐如我们要说的Ribbon,服务消费者客户端会有⼀个服务器
地址列表,调⽤⽅在请求前通过⼀定的负载均衡算法选择⼀个服务器进⾏访问,负
载均衡算法的执⾏是在请求客户端进⾏。

Ribbon的执行源码

ribbon的负载均衡是通过@LoadBalanced启动的,首先看一下LoadBalanced注解,关注一下提示被注解标记的RestTemplate bean会被配置使用LoadBalancerClient,所以我们就得到了的,主要的执行过程是在LoadBalancerClient对象中。

1
2
3
4
5
6
7
php复制代码/**
* Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
* @author Spencer Gibb
*/
...
public @interface LoadBalanced {
}

LoadBalancerClient

这边我把注释删除了,可以看到execute方法肯定是执行方法。

1
2
3
4
5
6
7
8
9
java复制代码public interface LoadBalancerClient extends ServiceInstanceChooser {


<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

URI reconstructURI(ServiceInstance instance, URI original);
}

现在思考executor方法什么时候执行的?

查看引用,是在一个Interceptor方法中调用了。

image.png

RibbonLoadBalancerClient.execute

接下来看具体的execute方法

1
2
3
4
5
6
7
8
9
10
11
scss复制代码public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
//获取一个loadbalancer对象
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//获取服务
Server server = getServer(loadBalancer);
//封装成一个RibbonServer对象
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
//执行
return execute(serviceId, ribbonServer, request);
}
getLoadBalancer方法

从一个SpringClientFactory对象获取

1
2
3
typescript复制代码protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
getServer方法

执行loadBalancer.chooseServer方法

1
2
3
4
5
6
kotlin复制代码protected Server getServer(ILoadBalancer loadBalancer) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer("default"); // TODO: better handling of key
}
RibbonLoadBalancer.chooseServer方法

主要看else的逻辑,执行父类的chooseServer方法

1
2
3
4
5
6
7
8
9
kotlin复制代码public Server chooseServer(Object key) {
//主要判断是否是多个分区 这是正对于aws的
if (ENABLED.get() && this.getLoadBalancerStats().getAvailableZones().size() > 1) {
...
} else {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
}
BaseLoadBalancer.chooseServer方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kotlin复制代码public Server chooseServer(Object key) {
//计数器,用于记录请求次数
if (this.counter == null) {
this.counter = this.createCounter();
}
this.counter.increment();
//负载均衡策略
if (this.rule == null) {
return null;
} else {
try {
return this.rule.choose(key);
} catch (Exception var3) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", new Object[]{this.name, key, var3});
return null;
}
}
}
PredicateBasedRule.choose方法

主要获取server的逻辑,从loadbalancer对象中获取所有的server集合

1
2
3
4
5
6
7
8
9
10
11
scss复制代码public Server choose(Object key) {
//获取loadbalancer对象
ILoadBalancer lb = getLoadBalancer();
//获取server对象 选择一个实例在过滤之后执行负载均衡
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}

以上就是ribbon的主要执行流程,接下来是具体实现细节的解答。

初始化疑问

首先在解决疑问前,要先了解ribbon的配置类,一般提前设置属性都是在初始化配置完成的。1.LoadBalancerAutoConfiguration 2.RibbonAutoConfiguration 3.RibbonClientConfiguration

疑问1.interceptor什么时候设置的?用来拦截什么对象?

这个问题是负载均衡相关的,首先定位到LoadBalancerAutoConfiguration类,具体看如下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码public class LoadBalancerAutoConfiguration {
//负载均衡
@LoadBalanced
//注入所有RestTemplate对象
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();

@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
//遍历所有restTemplate
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
//为每个restTemplate执行RestTemplateCustomizer.customize方法
//此处的customize方法就是下面的lambda表达式,就是设置拦截器
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}

...

@Configuration
@ConditionalOnClass(RetryTemplate.class)
public static class RetryInterceptorAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RetryLoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties,
LoadBalancerRequestFactory requestFactory,
LoadBalancedRetryFactory loadBalancedRetryFactory) {
return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
requestFactory, loadBalancedRetryFactory);
}

@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
//此处用了函数式编程,本方法是为restTemplate设置拦截器
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}

所以我们知道了是在LoadBalancerAutoConfiguration类设置了拦截器,拦截restTemplate对象。

疑问2.SpringClientFactory什么时候设置的?

如下所示,SpringClientFactory是在RibbonAutoConfiguration自动配置类初始化的

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码...
public class RibbonAutoConfiguration {

....

@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
...
}

疑问3.LoadBalancer什么时候初始化的?Rule什么时候初始化的?

这个问题实例化就得干活儿了,涉及到需要具体实现,所以我们猜测是跟生成RibbonClient的时候有关。所以找到了RibbonClientConfiguration类,我只保留了重要代码,其他的…代替,

具体如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
kotlin复制代码public class RibbonClientConfiguration {

...

//设置负载均衡策略rule
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
//如果配置文件配置了,就生成配置的
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
//默认分区隔离负载均衡策略
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}

//同rule
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, name)) {
return this.propertiesFactory.get(IPing.class, config, name);
}
return new DummyPing();
}

//生成serverList 此时是个空对象
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, name)) {
return this.propertiesFactory.get(ServerList.class, config, name);
}
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
}

@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}

//生成loadBalancer对象
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
//如果配置文件配置,则走配置文件
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
//生成默认分区LoadBalancer
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
}

可以看出RibbonClientConfiguration生成了ribbon所需的所有组件,组成了ribbon服务。

疑问4.serverList什么时候加载的?

可以看到RibbonClientConfiguration生成了空的serverList对象,然后再创建loadbalancer时将这个空对象传入了。所以我们关注一下LoadBalancer初始化过程

首先调用了父类构造

1
2
3
4
5
scss复制代码public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}

进入父类构造,属性赋值的部分就不关注了,主要关注restOfInit方法

1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码 public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
DynamicServerListLoadBalancer.restOfInit方法

进入restOfInit方法,主要看enableAndInitLearnNewServersFeature以及updateListOfServers方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
kotlin复制代码void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
//定时任务刷新服务器缓存列表
enableAndInitLearnNewServersFeature();
//因为上述方法是延迟至性的,所以立即获取一次服务器列表
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}

DynamicServerListLoadBalancer.enableAndInitLearnNewServersFeature方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
//start方法
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
//新建runnable对象
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//更新
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//定时任务
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}

这边在看一下updateAction对象以及doUpdate方法,

1
2
3
4
5
6
java复制代码protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
DynamicServerListLoadBalancer.updateListOfServers方法

这边就是更新服务列表的方法

1
2
3
4
5
6
7
8
9
scss复制代码 public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
//获取server集合
servers = serverListImpl.getUpdatedListOfServers();
...
}
updateAllServerList(servers);
}

这边serverListImpl就是具体的注册中心客户端实现,用来向注册中心发送请求 获取服务列表

image.png

以上就是ribbon的启动以及执行流程

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%