实战 Cloud Nacos Client 端主流程

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

Nacos 请求时部分操作异常会直接宕掉 ,这个时候了解整个 Nacos Client 端的流程就至关重要.

Nacos 主流程主要分为以下几个部分 :

  • Nacos Client 端的启动和初始化
  • Nacos Client 端服务注册
  • Nacos Client 端服务发现

二 . Nacos Client 端的启动和初始化

Nacos 的自动装配类主要分为2个部分 :

  • NacosConfigAutoConfiguration
  • NacosDiscoveryAutoConfiguration

上一篇 Nacos 配置加载流程和优先级 已经看了配置的处理流程 , 这一篇主要关注 NacosDiscoveryAutoConfiguration

2.1 Nacos 的 Discovery 自动装配类

在 Nacos 的启动过程中 , 在配置类中初始化了如下几个对象 :

1
2
3
java复制代码- new NacosServiceManager() 
- new NacosServiceDiscovery(discoveryProperties, nacosServiceManager) : 从缓存中获取列表
- new NacosDiscoveryClient(nacosServiceDiscovery)

2.1.1 NacosWatch 的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码// C- NacosWatch
public void start() {

if (this.running.compareAndSet(false, true)) {

// Step 1 : 构建 EventListener 对象
EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
event -> new EventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {

List<Instance> instances = ((NamingEvent) event).getInstances();

Optional<Instance> instanceOptional = selectCurrentInstance(instances);

instanceOptional.ifPresent(currentInstance -> {
resetIfNeeded(currentInstance);
});
}
}
});


// Step 2 : 通过 NacosProperties 准备 NamingService 对象 -> 2.1.2 Step 2
NamingService namingService = nacosServiceManager
.getNamingService(properties.getNacosProperties());
try {

namingService.subscribe(properties.getService(), properties.getGroup(),
Arrays.asList(properties.getClusterName()), eventListener);
}
catch (Exception e) {
log.error("namingService subscribe failed, properties:{}", properties, e);
}

this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay());
}
}

2.1.2 初始化流程

Step1 : NacosWatch 发起监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// NamingService init 流程
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
initServerAddr(properties);
// 初始化web容器
InitUtils.initWebRootContext();
// 初始化缓存目录
initCacheDir();
// 初始化 log 路径
initLogName(properties);

this.eventDispatcher = new EventDispatcher();
// 代理对象用于调用远程 Server
this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
// 用于向Nacos服务端发送已注册服务的心跳
this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
// HostReactor用于获取、保存、更新各Service实例信息
this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
isLoadCacheAtStart(properties), initPollingThreadCount(properties));
}

Step 2 : NamingFactory 构建 NamingService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码// 实际上是通过反射生产最终的实例对象
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
NamingService vendorImpl = (NamingService) constructor.newInstance(properties);

// 工作空间
private String namespace;
//
private String endpoint;
// 服务列表对应的 Server地址 : localhost:8848
private String serverList;
// 本地缓存地址
private String cacheDir;
// log 名称 : 通常是 naming.log
private String logName;

private HostReactor hostReactor;
private BeatReactor beatReactor;

private EventDispatcher eventDispatcher;
private NamingProxy serverProxy;

Step 3 : EventDispatcher 添加监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {

// 通过监听器来实现更新Service
eventDispatcher.addListener(hostReactor
.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")),
StringUtils.join(clusters, ","), listener);
}


public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {

// 构建一个 EventListener 集合
List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
observers.add(listener);

// ConcurrentMap<String, List<EventListener>> observerMap
// 其中有个 Notifier 的线程 , 通过 while 循环持续的处理实例
// /nacos/v1/ns/instance
observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
if (observers != null) {
observers.add(listener);
}

// change 时刷新 Service
serviceChanged(serviceInfo);
}

hostReactor.getServiceInfo 结果 :
image.png

三 . Nacos Client 端服务注册

服务注册涉及到如下流程 :

  • NacosRegistration:保存服务的基本数据信息
  • NacosServiceRegistry:实现服务注册
  • NacosServiceRegistryAutoConfiguration:Nacos自动配置类

Nacos 服务注册的起点是 @EnableDiscoveryClient , 其最终会调用 NacosAutoServiceRegistration :

  • AbstractAutoServiceRegistration
1
2
3
4
5
java复制代码@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
// 默认为 true , 开启后就会调用对应的 AutoServiceRegistration
boolean autoRegister() default true;
}

3.1 start

不同的注册中心 , 会有不同的实现类 ,此处是对应 Nacos 的 NacosAutoServiceRegistration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码// C- NacosAutoServiceRegistration
public void start() {
// 如果未开启 , 则直接return


// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));

// 调用 Registry 发起注册 : this.serviceRegistry.register(getRegistration());
register();

if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}

}

3.2 NacosServiceRegistry 发起注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public void register(Registration registration) {

if (StringUtils.isEmpty(registration.getServiceId())) {
return;
}

// 构建当前的 ServiceId 和 Group
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();

// 包括 IP , 端口 , 分配名 , 元数据
Instance instance = getNacosInstanceFromRegistration(registration);

try {
// 注册当前实例 , 同时会添加心跳
// serverProxy.registerService(groupedServiceName, groupName, instance);
namingService.registerInstance(serviceId, group, instance);
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}

四. 服务的发现

之前在Nacos 基础 中 , 我们大概看过一点 , 这里无非是再看一下

Nacos 的不同版本获取的方式是有很大区别的 , 这里主要针对 spring-cloud-starter-alibaba-nacos-discovery : 2.2.5 版本来看一下 .

4.1 Nacos Server 的发现

Step 1 : 发起的起点

1
2
3
4
5
6
java复制代码private List<NacosServer> getServers() {
String group = discoveryProperties.getGroup();
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return instancesToServerList(instances);
}

Step 2 : 发现的主流程

到这里就和之前的逻辑串起来了 , 后面就是反向调用 Nacos Server 中提供的接口即可

1
2
3
4
5
6
7
8
java复制代码// C- NacosNamingService
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
return selectInstances(serviceInfo, healthy);
}

这里获取中会从 Map<String, ServiceInfo> serviceInfoMap 中获取数据 , 下面来看一下 ServiceInfo 是如何获取的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码public void updateServiceNow(String serviceName, String clusters) {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 同样通过代理类发起调用
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

if (StringUtils.isNotEmpty(result)) {
processServiceJson(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}


// 发起远程调用
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {

final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));

// 这里可以看到具体的 API :
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

总结

这篇文章有点简单 , 了解的不深 , 但是应该是很有用 , 出现问题在核心的地方打个断点 ,能节省很多时间

TODO : 流程图今天就不画了 , 后面有时间补一个

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%