Cloud Eureka Client 端服务发现

这是我参与8月更文挑战的第7天,活动详情查看:8月更文挑战

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

文章目的 :

  • 深入 Client 端服务发现的流程

概念补充
region:可以简单理解为地理上的分区,比如上海地区,或者广州地区,再或者北京等等,没有具体大小的限制。根据项目具体的情况,可以自行合理划分region。

zone:可以简单理解为region内的具体机房,比如说region划分为北京,然后北京有两个机房,就可以在此region之下划分出zone1,zone2两个zone。

二 . 调用的起点

2.1 初始化

Eureka Client 在初始化 EurekaClient 类的同时发起了服务发现的逻辑 , 主要调用流程为 :

  • Step 1 : C- EurekaClientAutoConfiguration

    • PVC- RefreshableEurekaClientConfiguration 中进行 EurekaClient 的 Bean 加载
  • Step 2 : 调用 DiscoveryClient 构造器

  • Step 3 : Client 发起 FullRegistry 操作

Step 1 : Configuration 的 Bean 加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
}
else {
appManager = manager;
}
// 构建 CloudEurekaClient 对象
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,config, this.optionalArgs, this.context);
// 注册健康检查对象
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}

// PS : 调用 CloudEurekaClient 构造器中会调用其父类 DiscoveryClient 的构造函数

Step 2 : DiscoveryClient full Registry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();

if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
// 从eureka服务器获取完整的注册表信息并将其存储在本地
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {

}

// 在更新实例远程状态之前通知缓存刷新
onCacheRefreshed();

// 基于缓存中保存的刷新数据更新远程状态
updateInstanceRemoteStatus();

// 成功获取注册表,因此返回true
return true;
}

Step 3 : 查询 Registry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();

Applications apps = null;

// 通过 Client 获取 Applications
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());

if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}


if (apps == null) {
//............
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 重点 : 设置到 AtomicReference<Applications> 对象中
localRegionApps.set(this.filterAndShuffle(apps));
} else {
}
}

三 . 服务的发现

从上面看到 , 通过 Client 搜索了 Registy 对象 , 后面看一下具体的查询流程

3.1 service 的获取

Step 1 : EurekaDiscoveryClient 发起请求

1
2
3
4
java复制代码// EurekaDiscoveryClient 中通过如下代码获取 :
Applications applications = this.eurekaClient.getApplications();

// PS : 此处的 eurekaClient 为 CloudEurekaClient

Step 2 : 获取 Applications

1
2
3
4
5
6
java复制代码// 这里是通过一个原之类缓存了所有的 Applications
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

public Applications getApplications() {
return localRegionApps.get();
}

PS : Applications 数据如下
image.png

Step 3 : Applications 的缓存

上文知道了获取的时机 , 这里再来看一下数据是在什么时候缓存进去的 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
java复制代码// 节点一 : 获取 Client Registry
private void getAndStoreFullRegistry() throws Throwable {
// 上文看了 发起的流程 , 调用了一个 Client 发起
}


// 节点二 : 远程获取 Application
public EurekaHttpResponse<Applications> getApplications(final String... regions) {
return execute(new RequestExecutor<Applications>() {
@Override
public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {
return delegate.getApplications(regions);
}

@Override
public RequestType getRequestType() {
return RequestType.GetApplications;
}
});
}


// 节点三 : 循环调用不同的 HttpClient , 如下图总共提供了四种 HttpClient , 此处主要为 SessionedEurekaHttpClient
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
long now = System.currentTimeMillis();
long delay = now - lastReconnectTimeStamp;
if (delay >= currentSessionDurationMs) {
logger.debug("Ending a session and starting anew");
lastReconnectTimeStamp = now;
currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
}

EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
if (eurekaHttpClient == null) {
eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
}

// 回调节点二传入的匿名对象
return requestExecutor.execute(eurekaHttpClient);
}

// 节点四 : Internet 调用 , 可以看到调用的 app 接口 C- AbstractJerseyEurekaHttpClient
public EurekaHttpResponse<Applications> getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}

// 没想到啊没想到 , 这里居然看到了 jerseyClient , 这玩意这么底层的吗 C-
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam("regions", regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.getEntity(Applications.class);
}
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {

if (response != null) {
response.close();
}
}
}

补充 : Eureka HttpClient 体系

Eureka-Sytsem-EurekaHttpClientDecorator.png

补充 : application 信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码
[
[{
"host": "localhost",
"port": 8088,
"secure": false,
"uri": "http://localhost:8088",
"metadata": {
"management.port": "8088"
},
"instanceId": "DESKTOP-TSNPTAK:GANG-EUREKA:8088",
"serviceId": "GANG-EUREKA",
"instanceInfo": {
"instanceId": "DESKTOP-TSNPTAK:GANG-EUREKA:8088",
"app": "GANG-EUREKA",
"appGroupName": null,
"ipAddr": "192.168.0.6",
"sid": "na",
"homePageUrl": "http://localhost:8088/",
"statusPageUrl": "http://localhost:8088/actuator/info",
"healthCheckUrl": "http://localhost:8088/actuator/health",
"secureHealthCheckUrl": null,
"vipAddress": "GANG-EUREKA",
"secureVipAddress": "GANG-EUREKA",
"countryId": 1,
"dataCenterInfo": {
"@class": "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo",
"name": "MyOwn"
},
"hostName": "localhost",
"status": "UP",
"overriddenStatus": "UNKNOWN",
"leaseInfo": {
"renewalIntervalInSecs": 30,
"durationInSecs": 90,
"registrationTimestamp": 1628746104090,
"lastRenewalTimestamp": 1628748325836,
"evictionTimestamp": 0,
"serviceUpTimestamp": 1628746074745
},
"isCoordinatingDiscoveryServer": true,
"metadata": {
"management.port": "8088"
},
"lastUpdatedTimestamp": 1628746104090,
"lastDirtyTimestamp": 1628746074064,
"actionType": "ADDED",
"asgName": null
},
"scheme": null
}]
]

四 . 请求的使用

请求的使用可以参考 Feign 负载均衡 , 其本身也是从容器中获取 Server 列表

1
2
3
4
5
6
7
8
9
10
11
java复制代码public Object getServicesList() {
List<List<ServiceInstance>> servicesList = new ArrayList<>();
//获取服务名称
List<String> serviceNames = discoveryClient.getServices();
for (String serviceName : serviceNames) {
//获取服务中的实例列表
List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceName);
servicesList.add(serviceInstances);
}
return servicesList;
}

Step : Applicaiton 的存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public List<String> getServices() {
LinkedHashSet<String> services = new LinkedHashSet<>();
if (this.discoveryClients != null) {
// 可以看到 , 这里从多个 discoveryClients 中获取 Services
for (DiscoveryClient discoveryClient : this.discoveryClients) {
List<String> serviceForClient = discoveryClient.getServices();
if (serviceForClient != null) {
services.addAll(serviceForClient);
}
}
}
return new ArrayList<>(services);
}

C- DiscoveryClient : 该对象在 2.1 Step 2 中设置了
AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

五 . 总结

核心对象就是一个 AtomicReference ,通过 AbstractJerseyEurekaHttpClient 调用 /app 查询所有的 Server

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%