Cloud Eureka Client 端服务注册

这是我参与8月更文挑战的第6天,活动详情查看:8月更文挑战

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

文章目的

  • 梳理 Eureka 服务注册的流程
  • 深入 服务注册的源码

对应配置类

Eureka Client 的配置是通过 EurekaClientAutoConfiguration 和 EurekaDiscoveryClientConfiguration 进行处理的

1
2
3
4
5
6
7
java复制代码C- EurekaDiscoveryClientConfiguration
public EurekaDiscoveryClient discoveryClient(EurekaClient client,
EurekaClientConfig clientConfig) {
return new EurekaDiscoveryClient(client, clientConfig);
}

C- EurekaClientAutoConfiguration

二 . 服务的注册

来看一下 Eureka 是如何注册服务的 , Eureka 通过 @EnableDiscoveryClient 开启服务的注册

Eureka 的服务注册的主要线路为 Lifecycle 发起 EurekaServiceRegistry 的生命周期控制

服务注册核心的类为 : EurekaServiceRegistry , 这个类提供了以下几个方法

1
2
3
4
5
jAVA复制代码C- EurekaServiceRegistry
M- register : 注册应用
M- deregister
M- setStatus : 更新状态
M- getStatus

2.1 Eureka 注册的起点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码// C- EurekaServiceRegistry
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}

if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}

// 只有在nonSecurePort大于0且由于下面的containerPortInitializer未运行时才初始化
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
// 2.1.1 -> 发起注册逻辑
this.serviceRegistry.register(this.registration);
// 发出 InstanceRegisteredEvent 事件
this.context.publishEvent(new InstanceRegisteredEvent<>(this,
this.registration.getInstanceConfig()));
this.running.set(true);
}
}

补充 : start 的入口

1
2
3
4
5
6
java复制代码// Start 方法基于 Lifecycle 接口 , 这个接口用于控制Bean 的生命周期 ,当 Application 发生 开启和停止时 , 都会调用对应的钩子事件
public interface Lifecycle {
void start();
void stop();
boolean isRunning();
}

2.1.1 注册服务

Step 1 : register 注册的起点

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码// C- EurekaServiceRegistry
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);

// 通过修改状态发起注册逻辑
// PS : deregister 就是设置为 InstanceInfo.InstanceStatus.DOWN
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

// 健康检查 ,此处暂时不深入
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
.getEurekaClient().registerHealthCheck(healthCheckHandler));
}

Step 2 : 修改状态 , 调用监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码C- EurekaServiceRegistry
public synchronized void setInstanceStatus(InstanceStatus status) {
// InstanceStatus.UP
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}

InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
// 可以看到 , 通过调用监听器发送状态改变的事件 -> Step 3
listener.notify(new StatusChangeEvent(prev, next));
}
}
}

// 补充 : StatusChangeListener 对象
public static interface StatusChangeListener {
String getId();
void notify(StatusChangeEvent statusChangeEvent);
}

// 补充 : StatusChangeEvent 对象
public class StatusChangeEvent extends DiscoveryEvent {
// 这个对象中记录了2个状态 , 之前和现在的, 但是这个好像只是为了打印日志 , 也没啥区别 TODO
private final InstanceInfo.InstanceStatus current;
private final InstanceInfo.InstanceStatus previous;
}

Step 3 : 发起发更新逻辑 , 对远程服务器进行更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码C- InstanceInfoReplicator
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
// 省略不必要的关注点 , 可以看到这里通过 ScheduledExecutorService 发起了定时任务
scheduler.submit(new Runnable() {
@Override
public void run() {
// private final AtomicReference<Future> scheduledPeriodicRef;
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
latestPeriodic.cancel(false);
}
// 更新本地实例信息并复制到远程服务器 -> Step 4
InstanceInfoReplicator.this.run();
}
});

}
}

补充 : InstanceInfoReplicator 的作用

  • 配置了单个更新线程,以保证对远程服务器的连续更新
  • 更新任务可以通过onDemandUpdate() 按需调度
  • 任务处理速率受突发大小限制
  • 一个新的更新任务总是自动调度在一个较早的更新任务之后。
    • 但如果启动了按需更新任务,则定时的自动更新任务将被丢弃(新的按需更新任务将在新的按需更新任务之后调度新的任务)

Step 4 : 调用 InstanceInfoReplicator # run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public void run() {
try {
// 刷新当前本地InstanceInfo , InstanceInfo上的isdirty标志被设置为true
discoveryClient.refreshInstanceInfo();

Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 最核心的一句 : 发起服务注册
discoveryClient.register();
// 如果unsetDirtyTimestamp 匹配 lastDirtyTimestamp,则取消dirty标志
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

2.2 注册服务

此处就开始通过 DiscoveryClient 进行注册逻辑 :

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码C- DiscoveryClient
boolean register() throws Throwable {
EurekaHttpResponse<Void> httpResponse;
try {
// 发起注册逻辑
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
throw e;
}
// 这个返回在业务上没有太大的作用
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

2.3 register 循环调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
JAVA复制代码// 这里和下方的模式非常相似 ,都是传个匿名函数进去 , 再由里面发起回调
C- EurekaHttpClientDecorator
public EurekaHttpResponse<Void> register(final InstanceInfo info) {
return execute(new RequestExecutor<Void>() {
@Override
public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
// 通过该回调函数进行链式调用
return delegate.register(info);
}

@Override
public RequestType getRequestType() {
return RequestType.Register;
}
});
}

细节点 : 轮询调用

Eureka-Client-Registry.png

// 此处进行了一个链式调用 ,分别调用了

  • C- SessionedEurekaHttpClient : 强制在一个定期的间隔(一个会话)重新连接,防止客户端永远坚持一个特定的Eureka服务器实例
  • C- RetryableEurekaHttpClient : 在集群中的后续服务器上重试失败的请求
  • C- RedirectingEurekaHttpClient : 该Client 会进行 Server 重定向 , 并且针对最终解析的端点执行请求
  • C- MetricsCollectingEurekaHttpClient : 收集和统计JerseyApplicationClient发送请求和响应的行为信息
1
2
3
4
5
6
7
java复制代码// 通常内部是通过 clientFactory 构建下一个 HttpClient  , 这个对象在 EurekaHttpClients 中通过构造器传入
TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());


// 这几个里面有几个关注点 :
// 关注点一 : 原子对象类
private final AtomicReference<EurekaHttpClient> eurekaHttpClientRef = new AtomicReference<>();

补充 : 调用的目的 , 为什么这里要循环几个 HttpClient ?

查看这几个 HttpClient ,感觉他们应该更像是 Filter ,只不过他们的职能是发起请求 ,而结构类似于 Filter 过滤链

关于这一块可以单章看一下 , 这里只关注最后的一个 , 也就是 MetricsCollectingEurekaHttpClient

2.4 核心 HttpClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码C- MetricsCollectingEurekaHttpClient
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
// 度量执行某些代码所花费的时间
Stopwatch stopwatch = requestMetrics.latencyTimer.start();
try {
// HttpClient 中最常见的模式就说这种匿名类的回调
EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment();
return httpResponse;
} catch (Exception e) {
requestMetrics.connectionErrors.increment();
exceptionsMetric.count(e);
throw e;
} finally {
// 计时结束
stopwatch.stop();
}
}

补充 : JerseyApplicationClient 的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码// Step 1 : JerseyEurekaHttpClientFactory 的创建
public TransportClientFactory newTransportClientFactory(EurekaClientConfig clientConfig,
Collection<ClientFilter> additionalFilters, InstanceInfo myInstanceInfo, Optional<SSLContext> sslContext,
Optional<HostnameVerifier> hostnameVerifier) {
final TransportClientFactory jerseyFactory = JerseyEurekaHttpClientFactory.create(
clientConfig,
additionalFilters,
myInstanceInfo,
new EurekaClientIdentity(myInstanceInfo.getIPAddr()),
sslContext,
hostnameVerifier
);

// 通过工厂创建
final TransportClientFactory metricsFactory = MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory);

return new TransportClientFactory() {
@Override
public EurekaHttpClient newClient(EurekaEndpoint serviceUrl) {
// 匿名对象回调函数
return metricsFactory.newClient(serviceUrl);
}
};
}

// Step 2 : 构造器注入
private MetricsCollectingEurekaHttpClient(EurekaHttpClient delegate,
Map<RequestType, EurekaHttpClientRequestMetrics> metricsByRequestType,
ExceptionsMetric exceptionsMetric,
boolean shutdownMetrics) {
....

2.5 发起 Server 请求流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码// C- AbstractJerseyEurekaHttpClient
public EurekaHttpResponse<Void> register(InstanceInfo info) {
// http://localhost:8088/eureka/
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
// 准备 Build 用于构造 Request
// serviceUrl -> http://localhost:8088/eureka/
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);

// 发起请求 , 此处使用的是 jersey
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (response != null) {
response.close();
}
}
}

这里就会调用 Server 端 ApplicationResource , 具体流程后面说 Server 端时看看

三 . 补充

3.1 HttpClient 的结构设计

HttpClient 的这种调用模式是一种典型的 .. 设计模式 , 其中大量使用了 AtomicReference 原之类的特性 ,

Eureka-System-TransportClientFactory.png

3.2 为什么 Eureka 里面大量使用了原子类 ?

首先 : 原子类主要通过 CAS (compare and swap) + volatile 和 native 方法来保证原子操作

前文中看到使用了 ScheduledExecutorService 发起了注册 , 实际debug 中发现这个里面实际上存在很多多线程调用 .

总结

之前看比较新的框架 , 往往可以看到很多 Spring 的影子 , 很多都会选择用 Spring 作为管理 .

但是看了Eureka 的框架才意识到 , 其实很多框架是没有和 Spring 集成的 , Spring 很好用 ,但是Eureka 如果用了 , 返回会显得很臃肿

eureke 中的技术栈有一定年限了, 但是不意味着不好用了 , 这一套模式前几年我还用过 , 现在看起来 , 颇有感触

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%