盘点 Cloud Nacos 基础手册

总文档 :文章目录

Github : github.com/black-ant

一 . 前言

水文一篇 , 哈哈哈哈哈哈~

文章目的 :

  • 梳理 Nacos 创建功能的 Debug 方向
  • 梳理 Nacos 组件的模块体系

文章大纲 : 该文档主要涉及以下几个主要的部分

  • Nacos 服务发现
  • Nacos 配置加载
  • Nacos 健康检查
  • Nacos 路由策略

PS : 文档参考来源为 官方文档 , 建议阅读文档了解快速使用

二 . 源码的编译

2.1 Nacos源码编译

1
2
3
4
5
6
7
8
java复制代码// Step 1 : 下载源码
https://github.com/alibaba/nacos.git

// Step 2 : 编译 Nacos
mvn -Prelease-nacos -Dmaven.test.skip=true clean install -U

// Step 3 : 运行 Server 文件
nacos_code\distribution\target

2.2 Nacos 源码运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码// Step 1 : 下载 Nacos 源码
https://github.com/alibaba/nacos.git

// Step 2 : IDEA 导入 Nacos
此处添加 SpringBoot 启动 , 启动的类为 com.alibaba.nacos.Nacos


// Step 3 : IDEA 修改启动参数
-Dnacos.standalone=true -Dnacos.home=C:\\nacos

-nacos.standalone=true : 单机启动
-Dnacos.home=C:\\nacos : 日志路径



// PS : Nacos Application
@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
@ServletComponentScan
@EnableScheduling
public class Nacos {

public static void main(String[] args) {
SpringApplication.run(Nacos.class, args);
}
}

三 . 模块源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
xml复制代码<modules>
<!-- 配置管理-->
<module>config</module>
<!-- Nacos 内核 -->
<module>core</module>
<!-- 服务发现 -->
<module>naming</module>
<!-- 地址服务器-->
<module>address</module>
<!-- 单元测试 -->
<module>test</module>
<!-- 接口抽象 -->
<module>api</module>
<!-- 客户端 -->
<module>client</module>
<!-- 案例 -->
<module>example</module>
<!-- 公共工具 -->
<module>common</module>
<!-- Server 构建发布 -->
<module>distribution</module>
<!-- 控制台,图形界面模块 -->
<module>console</module>
<!-- 元数据管理-->
<module>cmdb</module>
<!-- TODO : 猜测是集成 istio 完成流量控制-->
<module>istio</module>
<!-- 一致性管理 -->
<module>consistency</module>
<!-- 权限控制 -->
<module>auth</module>
<!-- 系统信息管理 Env 读取 , conf 读取 -->
<module>sys</module>
</modules>

3.1 Nacos 服务的发现和管理 (c0-c20)

Nacos 对服务的管理主要集中在 Naming 模块中 , 这里结合 Client 端看一下服务的发现和管理是什么逻辑 , 以及该如何去操作他们

3.1.1 控制台获取服务列表

外部接口 :

  • C- CatalogController # listDetail : 获取服务详情列表
  • C- CatalogController # instanceList: 列出特殊服务的实例
  • C- CatalogController # serviceDetail : 服务详情

核心主要是通过 ServiceManager 进行处理 , 此处看一下内部相关的逻辑 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码通过三个接口不难发现 ,其最终的调用核心都是 ServiceManager 类

// 内部类
C01- ServiceManager
// 内部类
PC- UpdatedServiceProcessor
PC- ServiceUpdater
PSC- ServiceChecksum
PC- EmptyServiceAutoClean
PC- ServiceReporter
PSC- ServiceKey :
// 核心参数
F- Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
F- LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
F- ConsistencyService consistencyService;
// 常用方法
M- init : 初始化方法
M- chooseServiceMap : 通过空间名获取 Server 集合
M- addUpdatedServiceToQueue : 更新 Server
M- onChange : server 改变
M- onDelete : server 删除


// 另外 , ServiceManager 还存在一个依赖 : ConsistencyService
C02- ConsistencyService : 一致性服务接口 -> PS:C02_01
M- put :向集群提交一个数据
M- remove :从集群删除一个数据
M- get :从集群获取数据
M- listen :监听集群中某个key的变化
M- unlisten :删除对某个key的监听
M- isAvailable :返回当前一致性状态是否可用


//总结 : 此处的逻辑很简单 , 就是对集合的 CURD 操作 , 核心特点有以下几个 :
1- Delete 时 , 会调用依赖对象 ConsistencyService (DelegateConsistencyServiceImpl) , 用于处理一致性需求

PS:C02_01 ConsistencyService 体系结构

nacos-ConsistencyService.png

当存在多个服务的时候 , 是如何存储的?

Nacos-server-Map.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码// 如上述图所示 , 相关的对象存放在 clusterMap 中

// 注意 , Service 有2个
com.alibaba.nacos.api.naming.pojo.Service
com.alibaba.nacos.naming.core.Service


// Service 的属性
C- Service
I- com.alibaba.nacos.api.naming.pojo.Service
F- Selector selector
F- Map<String, Cluster> clusterMap = new HashMap<>();
F- Boolean enabled
F- Boolean resetWeight
F- String token
F- List<String> owners

3.1.2 Nacos 服务和 Config 的控制类

1
2
3
4
5
6
7
8
9
java复制代码Nacos 中主要通过 NamingService 和 ConfigService 对服务和配置进行控制 , 其底层原理仍然为 :   , 这里来简单看一下 

ConfigService -> NacosConfigService
NamingService -> NacosNamingService


这2个类归属于 com.alibaba.nacos.client.naming 包

// PS : 注意 ,要使用 nacos-config-spring-boot-starter 和 nacos-discovery-spring-boot-starter 包

3.1.3 Nacos 的退出销毁

当我们注册的服务在关闭的时候 , Nacos 会在生命周期结束的时候从 Server 端注销该应用

Step 1 : Closeable 停止相关类 , 停止项目 , 当我们点击停止后 , 可以看到如下一串log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.beat.BeatReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.EventDispatcher do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.EventDispatcher do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.HostReactor do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.PushReceiver do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.PushReceiver do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.backups.FailoverReactor do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.backups.FailoverReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.HostReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.net.NamingProxy do shutdown begin
com.alibaba.nacos.client.naming : [NamingHttpClientManager] Start destroying NacosRestTemplate
com.alibaba.nacos.client.naming : [NamingHttpClientManager] Destruction of the end
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.net.NamingProxy do shutdown stop


这里可以看到 , 这里进行了关闭操作 , 其中主要的 Close 操作都是基于 com.alibaba.nacos.common.lifecycle.Closeable 进行实现

// PS : 此处的调用逻辑为 TODO

Step 2 : NacosServiceRegistry 注销

除了这里 Closeable 会关闭外 , 还会注销 Service , 此处主要是NacosServiceRegistry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public void deregister(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
return;
}

NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();

try {
namingService.deregisterInstance(serviceId, group, registration.getHost(),
registration.getPort(), nacosDiscoveryProperties.getClusterName());
} catch (Exception e) {
// 省略 log
}

}

// PS : 此处的原理为 继承了 ServiceRegistry , 实现销毁逻辑
C- AbstractAutoServiceRegistration

Nacos_Closeable 体系

Nacos_Closeable.png

3.1.4 健康检查流程

Nacos 提供对服务的实时的健康检查,阻止向不健康的主机或服务实例发送请求。Nacos 支持传输层 (PING 或 TCP)和应用层 (如 HTTP、MySQL、用户自定义)的健康检查

健康检查相关接口

  • 发送实例心跳 (InstanceController) : /nacos/v1/ns/instance/beat
  • 更新实例的健康状态 (HealthController) : /nacos/v1/ns/health/instance

Client 端发起心跳

服务端会定时发起心跳操作调用2个接口 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码C- BeatReactor
PC- BeatTask : 内部类

// 其中会有2个步骤 :

// Step 1 : BeatReactor # addBeatInfo 中添加定时任务
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 添加心跳信息
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

// Step 2 : BeatTask 中调用 Server 接口进行心跳操作
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);


// PS : 心跳的间隔默认是 5秒 ( com.alibaba.nacos.api.common.Constants)
public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);

服务端检测心跳

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
java复制代码// 服务端同样会对实例进行检测 , 核心类为 ClientBeatCheckTask

// Step 1 : ClientBeatCheckTask 的创建
C- Service
?- 在 Service 初始化时 ,即开始了 Task 任务


public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
//............
}

// 这里也可以看到默认时间的设置
public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}



// Step 2 : ClientBeatCheckTask 的运行
C- ClientBeatCheckTask
?- 检查并更新临时实例的状态,如果它们已经过期则删除它们。


public void run() {
//...... 省略

// Step 1 : 获取所有实例
List<Instance> instances = service.allIPs(true);

for (Instance instance : instances) {
// 如果时间大于心跳超时时间 , 则修改健康状态
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}

if (!getGlobalConfig().isExpireInstance()) {
return;
}

for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// 如果心跳大于删除时间 , 则删除实例
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
deleteIp(instance);
}
}
}

// PS : 第一次修改的是健康状态 , 后面才修改的实例数

ACK 健康检查主要逻辑 ,同时推送信息

无意中发现了一个 ACK 机制 , 是通过事件出发的 . 这个模式看代码主要是为了在 Server 发生变化的时候 ,通过 udpClient , 以 UDP 端口推送更新

PS : 客户端在查询服务实例的时候,如果提供 udp 端口,则 server 会创建 udpClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
java复制代码for (Instance instance : service.allIPs(Lists.newArrayList(clusterName))) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
instance.setHealthy(valid);
// 发布事件 ServiceChangeEvent
pushService.serviceChanged(service);
break;
}
}

// ServiceChangeEvent 事件的处理
C- PushService
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();

Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
// 通过 ServerName 和 命名空间 获取PushClient 集合
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}

Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();

// 循环所有的 PushClient
for (PushClient client : clients.values()) {
if (client.zombie()) {
clients.remove(client.toString());
continue;
}

Receiver.AckEntry ackEntry;
// 获取缓存 key ,并且从缓存中获取实体数据
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();

}

// 构建 ACK 实体类
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}

// UDP ACK 校验 , 同时推送 ACK 实体
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}

}, 1000, TimeUnit.MILLISECONDS);

futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

}

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
return null;
}

if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}

try {
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
ackMap.put(ackEntry.key, ackEntry);
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

// Socket Send
udpSocket.send(ackEntry.origin);

ackEntry.increaseRetryTime();

GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);

return ackEntry;
} catch (Exception e) {
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;

return null;
}
}

健康检查阈值的使用

在配置服务时 , 可以配置一个 0-1 的浮点数 , 定义健康检查的阈值 ,该阈值对应的类为 com.alibaba.nacos.api.naming.pojo.Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码C- Service
F- name : 服务名
F- protectThreshold : 健康阈值
F- appName : 应用名
F- groupName : 组名
F- metadata : 元数据

// 阈值的使用
C- InstanceController
M- doSrvIpxt :

// 核心逻辑

double threshold = service.getProtectThreshold();
// IPMap 中可用的健康实例数/服务总数的比例 如果小于阈值 , 则达到保护阈值
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

if (isCheck) {
result.put("reachProtectThreshold", true);
}
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}


PS : 这里联想后面 , Client 做 Balancer 时 , 获取的 Server 实际上就是全部健康的实例了

但是阈值的目的是什么呢 ?

假设实例出现了大量的异常 , 那么就会导致最后压力会到那几个健康的实例上 , 这个时候 , 可能会出现连锁反应

为了避免这些情况 ,当达到健康阈值的时候 , 就将所有的实例返回.

这就是那句**ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));**的目的.

虽然Client 端可能碰到异常实例 ,但是可以避免整个系统崩溃

/nacos/v1/ns/health/instance 参数

名称 类型 是否必选 描述
namespaceId 字符串 命名空间ID
serviceName 字符串 服务名
groupName 字符串 分组名
clusterName 字符串 集群名
ip 字符串 服务实例IP
port int 服务实例port
healthy boolean 是否健康

权重的处理

权重主要是 Instance 对象中进行配置

1
2
3
4
5
6
7
8
9
10
11
java复制代码C- Instance
M- instanceId
M- ip
M- port
M- weight : 权重
M- healthy : 健康情况
M- enabled
M- ephemeral
M- clusterName
M- serviceName
M- metadata

PS : 权重可以用于 Client 端时进行 权重分配处理

参考原文 @ blog.csdn.net/krisdad/art…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码public class NacosWeightLoadBalanceRule extends AbstractLoadBalancerRule {

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {}

@Resource private NacosDiscoveryProperties nacosDiscoveryProperties;

@Override
public Server choose(Object key) {
// 1.获取服务的名称
BaseLoadBalancer loadBalancer = (BaseLoadBalancer) this.getLoadBalancer();
String serverName = loadBalancer.getName();
// 2.此时Nacos Client会自动实现基于权重的负载均衡算法
NamingService namingService = nacosDiscoveryProperties.namingServiceInstance();
try {
Instance instance = namingService.selectOneHealthyInstance(serverName);
return new NacosServer(instance);
} catch (NacosException e) {
e.printStackTrace();
}
return null;
}

@Bean
public IRule getLoadBalancerRule(){
return new NacosWeightLoadBalancerRule();
}


// PS : 个人以为 , 权重是给 Client 端自行处理的

其他要点

1
2
3
4
5
6
7
8
9
10
java复制代码// Nacos 的默认值

@Value("${nacos.naming.empty-service.auto-clean:false}")
private boolean emptyServiceAutoClean;

@Value("${nacos.naming.empty-service.clean.initial-delay-ms:60000}")
private int cleanEmptyServiceDelay;

@Value("${nacos.naming.empty-service.clean.period-time-ms:20000}")
private int cleanEmptyServicePeriod;

3.2 Nacos 配置流程 C30-C60

3.2.1 Nacos 配置的管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码// 获取配置 , 这里主要有几个步骤 :
C30- NacosConfigService
M30_01- getConfigInner(String tenant, String dataId, String group, long timeoutMs)
- 构建 ConfigResponse , 为其设置 dataId , tenant , group
1- 调用 LocalConfigInfoProcessor.getFailover 优先使用本地配置
2- 调用 ClientWorker , 获取远程配置 -> PS:M30_01_01
3- 仍然没有 , LocalConfigInfoProcesso.getSnapshot 获取快照
End- configFilterChainManager 进行 Filter 链处理

// PS:M30_01_01 ClientWorker 的处理
ClientWorker 中进行了远程服务的请求 , 核心代码 :
agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);

// 可以看到 , 这里并没有太负载的逻辑 , 仍然是 Rest 请求 : PS 看官网的消息 , 2.0 会采用长连接 , 这里应该会有变动
- Constants.CONFIG_CONTROLLER_PATH : /v1/cs/configs

LocalConfigInfoProcessor 主要逻辑

1
2
3
4
5
6
7
8
9
10
JAVA复制代码// 这里本地配置是指本地 File 文件 , 这里通过源码推断一下使用的方式 : 
C31- LocalConfigInfoProcessor
M31_01- getFailover
- 获取 localPath -> PS:M31_01_01
M32_02- saveSnapshot : 获取成功后 , 会保存快照
?- 保存路径 : 省略\nacos\config\fixed-127.0.0.1_8848_nacos\snapshot\one1\test1


// PS:M31_01_01 localPath 参数
C:\Users\10169\nacos\config\fixed-127.0.0.1_8848_nacos\data\config-data\one1\test1

Pro 1 : 从这个源码里面 , 可以看到哪些知识点 ?

既然存在本地文件 , 是否意味着我可以通过修改这个路径优先使用本地配置 , 此处修改以下路径后测试成功
省略\nacos\config\fixed-127.0.0.1_8848_nacos\data\config-data\one1\test1

1
2
3
4
5
6
java复制代码PS : 除了这个路径 , SpringBoot 运行在配置文件中直接配置路径 -> 
spring:
cloud:
config:
# 相同配置,本地优先
override-none: true

Pro 2 : Filter 的使用

上面可以看到 , 配置的处理中 , 都有个默认的 Filter 处理

configFilterChainManager.doFilter(null, cr);

1
2
3
4
5
java复制代码    // 依照这个逻辑 , 是可以进行更多配置的
C32- ConfigFilterChainManager
?- 其中允许自定义添加 Filter
M32_01- addFilter
M32_02- doFilter

TODO : 此处如何植入 Filter 待完善 , 没找到接口添加Filter , 奇怪….

3.2.2 Nacos 配置的容灾处理

Nacos LocalConfigInfoProcessor 提供了容灾的功能 , 方式包含2种 : 本地配置和快照处理

本地配置

上面说了 , 修改指定路径即可实现

配置快照
Nacos 的客户端 SDK 会在本地生成配置的快照。当客户端无法连接到 Nacos Server 时,可以使用配置快照显示系统的整体容灾能力。配置快照类似于 Git 中的本地 commit,也类似于缓存,会在适当的时机更新,但是并没有缓存过期(expiration)的概念。

3.2.3 Nacos 动态配置处理

动态配置主要是指配置变更时的监听 :

Nacos 通过长轮询检测配置是否变化 , 对应的核心类为 LongPollingRunnable # checkUpdateDataIds , 对这里 Debug 看下 >>>>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
java复制代码

class LongPollingRunnable implements Runnable {

private final int taskId;

public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}

@Override
public void run() {

// .... 核心语句
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
}

}

List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isUseLocalConfigInfo()) {
sb.append(cacheData.dataId).append(WORD_SEPARATOR);
sb.append(cacheData.group).append(WORD_SEPARATOR);
if (StringUtils.isBlank(cacheData.tenant)) {
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
} else {
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
}
if (cacheData.isInitializing()) {
// It updates when cacheData occours in cacheMap by first time.
inInitializingCacheList
.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
}
}
}
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}


List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {

Map<String, String> params = new HashMap<String, String>(2);
params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
Map<String, String> headers = new HashMap<String, String>(2);
// 长轮询方式
headers.put("Long-Pulling-Timeout", "" + timeout);

// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.put("Long-Pulling-Timeout-No-Hangup", "true");
}

if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}

try {
// In order to prevent the server from handling the delay of the client's long task,
// increase the client's read timeout to avoid this problem.

long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
// /v1/cs/configs/listener
HttpRestResult<String> result = agent
.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
readTimeoutMs);

if (result.ok()) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.getData());
} else {
setHealthServer(false);
}
} catch (Exception e) {
setHealthServer(false);
throw e;
}
return Collections.emptyList();
}


// 对应的 Controller 为 ConfigController
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {

// .............

Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}

// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

// 对应轮询的接口
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {

// Long polling.
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}

// Compatible with short polling logic.
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);

// Compatible with short polling result.
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);

String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);

// Before 2.0.4 version, return value is put into header.
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}

Loggers.AUTH.info("new content:" + newResult);

// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}


// LongPollingService

这里想看详情推荐看这一篇 https://www.jianshu.com/p/acb9b1093a54

3.2.4 Nacos 元数据

这里来看一下 , Nacos 的元数据是什么 ?

Nacos数据(如配置和服务)描述信息,如服务版本、权重、容灾策略、负载均衡策略、鉴权配置、各种自定义标签 (label),从作用范围来看,分为服务级别的元信息、集群的元信息及实例的元信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码// 方式一 : 配置服务的时候配置
spring:
application:
name: nacos-config-server
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
metadata:
version: v1

// 方式二 : 页面直接配置

// 方式三 : API 调用 , 通过 Delete , Update 等请求方式决定类型
批量更新实例元数据 (InstanceController) : /nacos/v1/ns/instance/metadata/batch
批量删除实例元数据 (InstanceController) : /nacos/v1/ns/instance/metadata/batch

image-20210527135529407.png

3.3 Nacos 负载均衡

Nacos 的负载均衡归属于 动态 DNS 服务

Nacos_service_list.jpg

动态 DNS 服务支持权重路由,让您更容易地实现中间层负载均衡、更灵活的路由策略、流量控制以及数据中心内网的简单DNS解析服务。动态DNS服务还能让您更容易地实现以 DNS 协议为基础的服务发现,以帮助您消除耦合到厂商私有服务发现 API 上的风险。

基于 Feign 相关的知识 , 我们知道 , Balance 的处理是在 BaseLoadBalancer 中结合 Rule 处理的 , 这里我们来分析一下 , 2 者是通过什么结构进行连接处理的

Step 1 : Feign 对 Nacos 的调用

我们以 BaseLoadBalancer 为起点 , 进行 Debug 处理 , 来到点位 PredicateBasedRule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码C- PredicateBasedRule
M- choose(Object key)
- Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
?- 此处可以看到 , 其中有一个 lb.getAllServers 的操作 , 此处的 lb 为 DynamicServerListLoadBalancer

// PS : getAllServers , 此处可以看到 ,其中的 Servers 已经全部放在 List 中了
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}


// 跟踪一下放入的逻辑 , 放入逻辑的起点是ILoadBalancer Bean 的加载 , 其调用链为 :
C- RibbonClientConfiguration # ribbonLoadBalancer : 构建一个 ILoadBalancer
C- ZoneAwareLoadBalancer : 进入 ZoneAwareLoadBalancer 构造函数
C- DynamicServerListLoadBalancer : 进入 构造函数
C- DynamicServerListLoadBalancer # restOfInit : init 操作
C- DynamicServerListLoadBalancer # updateListOfServers : 更新 Server 列表主流程 , 此处第一次获取相关的 Server List , 后续Debug 第一节点
C- DynamicServerListLoadBalancer # updateAllServerList : 设置 ServerList


public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}


可以看到 , 其中有2个获取 Server 的逻辑方法 , 在这里看一下家族体系 , 就清楚了
serverListImpl.getUpdatedListOfServers();
filter.getFilteredListOfServers(servers);


// 下述图片中就很清楚了 , 存在一个实现类 NacosServerList 实现类 , 从Nacos 中获取服务列表
private List<NacosServer> getServers() {
try {
String group = discoveryProperties.getGroup();
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return instancesToServerList(instances);
} catch (Exception e) {
throw new IllegalStateException(....);
}
}


// 负载均衡策略
负载均衡策略是基于 Balance

Nacos_ServerList.png

3.4 集群的处理

Nacos 集群使用

Nacos 的集群使用比较简单 , 只需要在 /conf/cluster.conf 中配置对应的服务信息即可>>>>

1
2
3
4
5
6
java复制代码
#it is ip
#example
127.0.0.1:8848
127.0.0.1:8849
127.0.0.1:8850

Nacos 集群源码跟踪

来看一下源码层面 , 这个逻辑是怎么处理的 ?

核心处理类在 com.alibaba.nacos.core.cluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码
// Step 1 : 获取配置的方式
C- EnvUtil
public static String getClusterConfFilePath() {
return Paths.get(getNacosHome(), "conf", "cluster.conf").toString();
}

// 读取 Cluster 配置
public static List<String> readClusterConf() throws IOException {
try (Reader reader = new InputStreamReader(new FileInputStream(new File(getClusterConfFilePath())),
StandardCharsets.UTF_8)) {
return analyzeClusterConf(reader);
} catch (FileNotFoundException ignore) {
List<String> tmp = new ArrayList<>();
String clusters = EnvUtil.getMemberList();
if (StringUtils.isNotBlank(clusters)) {
String[] details = clusters.split(",");
for (String item : details) {
tmp.add(item.trim());
}
}
return tmp;
}
}


// Step 2 : Cluster 的使用
AbstractMemberLookup

// 主要使用集中在 ServerManager 中
C- ServerManager
F- ServerMemberManager memberManager;

C- ServerMemberManager : Nacos中的集群节点管理
M- init : 集群节点管理器初始化
M- getSelf : 获取本地节点信息
M- getmemberaddressinfo : 获取正常成员节点的地址信息
M- allMembers : 获取集群成员节点的列表
M- update : 更新目标节点信息
M- isUnHealth : 目标节点是否健康
M- initAndStartLookup : 初始化寻址模式

// TODO : 其他方法就省略了 , 后期准备进行相关的性能分析 , 集群的源码梳理预计放在那一部分分析

总结

这篇文章又是一篇更偏向于应用的文章 , 源码深入的较少 , 更重要的原因是因为 Nacos 的源码分层更清楚 , 结构清晰 , 不需要太负载的深入.

另外 , Nacos 2.0 也在发布中 , 看文档采用了 Socket 长连接的方式 , 后续如果有机会 , 对比一下2者的区别看看.

附录

# 附录一 : 手动调用 Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
java复制代码package com.alibaba.nacos.discovery.service;

import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Cluster;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.Service;
import com.alibaba.nacos.api.naming.pojo.healthcheck.AbstractHealthChecker;
import netscape.javascript.JSObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* @Classname NacosClientService
* @Description TODO
* @Date 2021/5/26
* @Created by zengzg
*/
@Component
public class NacosClientNodesService implements ApplicationRunner {

private Logger logger = LoggerFactory.getLogger(this.getClass());

private NamingService namingService;

@Value("${spring.cloud.nacos.config.server-addr}")
private String serverAddr;

@Override
public void run(ApplicationArguments args) throws Exception {
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
namingService = NacosFactory.createNamingService(properties);
}

/**
* 获取 Nacos Config
* 参数格式
* {
* "instanceId": "192.168.0.97#9083#DEFAULT#DEFAULT_GROUP@@nacos-user-server",
* "ip": "192.168.0.97",
* "port": 9083,
* "weight": 1, 可以通过权重决定使用的 Server
* "healthy": true,
* "enabled": true,
* "ephemeral": true,
* "clusterName": "DEFAULT",
* "serviceName": "DEFAULT_GROUP@@nacos-user-server",
* "metadata": {
* "preserved.register.source": "SPRING_CLOUD"
* },
* "ipDeleteTimeout": 30000,
* "instanceHeartBeatInterval": 5000,
* "instanceHeartBeatTimeOut": 15000
* }
*
* @param serviceName
* @return
*/
public List<Instance> get(String serviceName) {
List<Instance> content = new LinkedList<Instance>();
try {
content = namingService.getAllInstances(serviceName);
logger.info("------> 获取 Config serviceName [{}] <-------", serviceName);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}

return content;


}

/**
* 创建 Nacos Config
*
* @param serviceName
* @param ip
* @param port
*/
public void createOrUpdate(String serviceName, String ip, Integer port) {
try {
logger.info("------> 创建 Config GroupID [{}] -- DataID [{}] Success ,The value :[{}] <-------", serviceName, ip, port);
namingService.registerInstance(serviceName, ip, port, "TEST1");
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}


/**
* 移除 Nacos Config
*
* @param serviceName
* @param ip
*/
public void delete(String serviceName, String ip, Integer port) {
try {
namingService.deregisterInstance(serviceName, ip, port, "DEFAULT");
logger.info("------> 删除 Config GroupID [{}] -- DataID [{}] Success <-------", serviceName, ip);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}
}

# 附录二 : 手动调用 Config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
java复制代码public class NacosClientConfigService implements ApplicationRunner {

private Logger logger = LoggerFactory.getLogger(this.getClass());

private ConfigService configService;

@Value("${spring.cloud.nacos.config.server-addr}")
private String serverAddr;

@Override
public void run(ApplicationArguments args) throws Exception {
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
configService = NacosFactory.createConfigService(properties);
}


/**
* 获取 Nacos Config
*
* @param dataId
* @param groupId
* @return
*/
public String get(String dataId, String groupId) {
String content = "";
try {
content = configService.getConfig(dataId, groupId, 5000);
logger.info("------> 获取 Config GroupID [{}] -- DataID [{}] Success ,The value :[{}] <-------", dataId, groupId, content);

configService.addListener(dataId, groupId, new ConfigListener());

} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}

return content;


}

/**
* 创建 Nacos Config
*
* @param dataId
* @param groupId
* @param content
*/
public void createOrUpdate(String dataId, String groupId, String content) {
try {
logger.info("------> 创建 Config GroupID [{}] -- DataID [{}] Success ,The value :[{}] <-------", dataId, groupId, content);
configService.publishConfig(dataId, groupId, content);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}


/**
* 移除 Nacos Config
*
* @param dataId
* @param groupId
*/
public void delete(String dataId, String groupId) {
try {
configService.removeConfig(dataId, groupId);
logger.info("------> 删除 Config GroupID [{}] -- DataID [{}] Success <-------", dataId, groupId);

configService.removeListener(dataId, groupId, null);

} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}


}

# 附录三 : 使用NacosInjected

可以通过 的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码<!-- 使用 Nacos Inject -->
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.7</version>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-discovery-spring-boot-starter</artifactId>
<version>0.2.7</version>
</dependency>


@NacosInjected
private ConfigService configService;

@NacosInjected
private NamingService namingService;

# 附录四 : Naocs 官方架构图 (搬运)

这里是纯搬运 , 可以看官方文档 @ nacos.io/zh-cn/docs/…

功能图 :
image.png

  • 服务管理:实现服务CRUD,域名CRUD,服务健康状态检查,服务权重管理等功能
  • 配置管理:实现配置管CRUD,版本管理,灰度管理,监听管理,推送轨迹,聚合数据等功能
  • 元数据管理:提供元数据CURD 和打标能力
  • 插件机制:实现三个模块可分可合能力,实现扩展点SPI机制
  • 事件机制:实现异步化事件通知,sdk数据变化异步通知等逻辑
  • 日志模块:管理日志分类,日志级别,日志可移植性(尤其避免冲突),日志格式,异常码+帮助文档
  • 回调机制:sdk通知数据,通过统一的模式回调用户处理。接口和数据结构需要具备可扩展性
  • 寻址模式:解决ip,域名,nameserver、广播等多种寻址模式,需要可扩展
  • 推送通道:解决server与存储、server间、server与sdk间推送性能问题
  • 容量管理:管理每个租户,分组下的容量,防止存储被写爆,影响服务可用性
  • 流量管理:按照租户,分组等多个维度对请求频率,长链接个数,报文大小,请求流控进行控制
  • 缓存机制:容灾目录,本地缓存,server缓存机制。容灾目录使用需要工具
  • 启动模式:按照单机模式,配置模式,服务模式,dns模式,或者all模式,启动不同的程序+UI
  • 一致性协议:解决不同数据,不同一致性要求情况下,不同一致性机制
  • 存储模块:解决数据持久化、非持久化存储,解决数据分片问题
  • Nameserver:解决namespace到clusterid的路由问题,解决用户环境与nacos物理环境映射问题
  • CMDB:解决元数据存储,与三方cmdb系统对接问题,解决应用,人,资源关系
  • Metrics:暴露标准metrics数据,方便与三方监控系统打通
  • Trace:暴露标准trace,方便与SLA系统打通,日志白平化,推送轨迹等能力,并且可以和计量计费系统打通
  • 接入管理:相当于阿里云开通服务,分配身份、容量、权限过程
  • 用户管理:解决用户管理,登录,sso等问题
  • 权限管理:解决身份识别,访问控制,角色管理等问题
  • 审计系统:扩展接口方便与不同公司审计系统打通
  • 通知系统:核心数据变更,或者操作,方便通过SMS系统打通,通知到对应人数据变更
  • OpenAPI:暴露标准Rest风格HTTP接口,简单易用,方便多语言集成
  • Console:易用控制台,做服务管理、配置管理等操作
  • SDK:多语言sdk
  • Agent:dns-f类似模式,或者与mesh等方案集成
  • CLI:命令行对产品进行轻量化管理,像git一样好用

领域模型 :

image.png

SDK 类图 :

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%