开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

精读 RocketMQ 源码系列(1)--- NameSer

发表于 2021-07-10

一、前言

「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」

相信看了 juejin.cn/post/698220… 中推荐的官方中文文档的同学一定已经对 NameServer 有了初步的了解。这里我们总结一下:

  1. NameServer 是路由注册中心
  2. NameServer 的主要功能是:注册发现和路由删除

在阅读源码时,我希望自己是带着问题去看源码。我也认为,学习的第一步就是要学会提问,有时候一个好的问题比一个精彩的答案更重要。

在了解 NameServer 的技术架构、主要功能之后,可能会提出这么几个问题:

  1. NameServer 是怎么实现注册发现和路由删除功能的?
  2. NameServer 是集群部署的,但彼此之间不通信,不同 NameServer 之间产生的数据不一致问题,怎么解决?
  3. 为什么不选择使用 zookeeper 作为注册中心,而选择自研 NameServer?

针对这三个问题,尝试去源码中寻找答案吧……

二、启动流程

面对一个东西,我们的第一个问题往往是:它从何而来,生命周期的源头在哪?
对于一些组件,这个问题就是:它是如何启动的?

接下来就先来看看 NameServer 是如何启动的。

启动类:org/apache/rocketmq/namesrv/NamesrvStartup.java

nameserver.png

可以参考上面这张流程图,自己将这部分源码过一遍。

2.1 NameSrvStartup#main0

首先看到 main0方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码    public static NamesrvController main0(String[] args) {

try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}

这个方法就做了两件事:

  1. 创建一个 NamesrvController 实例
  2. 启动该实例

从名字上可以看出 NamesrvController 是 NameServer 的核心控制器,因此 NamesrvController 的启动,主要也是在启动它。

2.2 NameSrvStartup#createNamesrvController

填充配置对象的属性

该方法首先是对两个配置对象进行属性填充,填充方式有两种:

  • -c:后跟配置文件路径
  • -p:表示通过--属性名 属性值的形式配置

相关属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class NamesrvConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

// rocketmq 主目录,通过设置 ROCKETMQ_HOME 配置主目录,在源码环境搭建的过程中有这一步
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// 用于存储 KV配置的持久化路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// 默认配置文件路径,不生效。若需要在启动时配置 NameServer 启动属性,使用 -c 配置文件路径 的方法
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
// 是否支持顺序消息,默认不支持,没大用,就是在处理客户端获取路由数据时标识下,如果支持顺序消息,需要返回对应 topic 的顺序消息配置属性
private boolean orderMessageEnable = false;

// getter and setter
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public class NettyServerConfig implements Cloneable {
private int listenPort = 8888; // NameServer 监听端口,默认会被初始化为 9876
private int serverWorkerThreads = 8; // Netty 业务线程池线程个数
private int serverCallbackExecutorThreads = 0; // Netty public 任务线程池线程个数,Netty 网络设计,根据业务类型会创建不同的线程池
// 比如处理消息发送,消息消费、心跳检测等。如果该业务类型未注册线程池,则由public线程池执行
private int serverSelectorThreads = 3; // IO 线程池个数,主要是 NameServer、Broker 端解析请求、返回相的线程个数,这类线程主要是处理
// 网络请求的,解析请求包,然后转发到各个业务线程池完成具体的操作,然后将结果返回给调用方
private int serverOnewaySemaphoreValue = 256; // send oneway 消息请求并发度(broker 端参数)
private int serverAsyncSemaphoreValue = 64; // 异步消息发送最大并发度
private int serverChannelMaxIdleTimeSeconds = 120; // 网络连接最大空闲时间

private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; // 网络 socket 发送缓冲区大小
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; // 网络接收端缓存区大小
private boolean serverPooledByteBufAllocatorEnable = true; // bytebuffer是否开启缓存

/**
* make make install
*
*
* ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
*/
private boolean useEpollNativeSelector = false; // 是否启用 Epoll IO 模型

创建 NameServerController 对象

根据以上填充好的配置创建对象,并将配置备份在 NameServerController 中

1
2
3
4
java复制代码final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

2.3 NameSrvStartup#start

首先执行 initialize 方法,这个方法还是做了很多事:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public boolean initialize() {

// 1. 加载 kv 配置
this.kvConfigManager.load();

// 2. 创建 netty 服务端
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

// 3. 创建接收客户端请求的线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

this.registerProcessor();

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
// 4. 创建一个定时任务,每隔 10 秒扫描不活跃的 Broker
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
...
}

然后做了一件很重要的事:注册了一个钩子方法,监听 JVM 退出事件,在退出时进行 controller 的资源释放。

然后启动 controller

1
2
3
4
5
6
7
8
9
10
java复制代码// 注册了一个钩子方法,监听 JVM 退出事件,在退出时进行 controller 的资源释放
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));

controller.start();

“如果代码中使用了线程池,一种优雅停机的方式就是注册一个 JVM 钩子函数,在 JVM 关闭之前,先将线程池关闭,及时释放资源”

——《RocketMQ 技术内幕》

到这还没结束,我们继续往下看,controller.start()

1
2
3
4
5
6
7
8
9
java复制代码    public void start() throws Exception {
// 启动 netty 服务端,用于接收客户端请求
this.remotingServer.start();

if (this.fileWatchService != null) {
// 启动监听TLS配置文件的线程
this.fileWatchService.start();
}
}

所以最后可以看到,启动 NameServer 就是为了启动这个 netty 服务端,然后就可以接收来自 broker 的注册请求和客户端的路由发现请求。

对于fileWatchService,它监听的是 TLS(Transport Layer Security,传输层安全协议) 配置文件,并不涉及业务逻辑。因此就不详细深入了,对网络安全协议感兴趣的同学可以上网学习。

至此,启动流程分析完毕。最后来小结下:

  1. 首先通过命令行参数、配置文件、默认配置填充NamesrvConfig、NettyServerConfig
  2. 根据以上两个配置对象创建NamesrvController,并备份配置信息到NamesrvController中
  3. 启动NamesrvController实例,实际上启动的是 netty 服务端

怎么样,其实并不是很难对不对,接下来我们继续问问题。

启动之后,NameServer 又做了哪些事呢?本文一开始就已经给出了答案:注册发现和路由删除,紧接着文章开头其实就提出了一个问题:

NameServer 是怎么实现注册发现和路由删除功能的?

继续从源码中寻找答案……

三、注册发现

注册和发现其实是两个动作,但针对的都是 broker。注册是指 broker 注册到 NameServer,发现是指 producer 和 consumer 通过 NameServer 发现 broker。

3.1 注册

既然是“broker 注册到 NameServer”,那当然要先去 broker 的源码中寻找答案。问题是 broker 源码那么多,我该从哪下手呢?这时候可以停下来想想,如果是我自己去设计一个系统,这个系统需要将自己的存活状态上报至一个注册中心,我会选择在什么时候去注册呢?

应该容易想到,最好是启动成功的时候就马上去注册,然后与注册中心建立一个心跳机制,一直不停的告诉注册中心:我还活着!

不管这个想法对不对,但至少这个时候有方向了,我知道应该去 broker 的一大堆源码中,先找它的启动流程源码。

这就是一直强调的带着问题去读源码,通过问问题,让自己阅读源码时更具有目的性;通过对问题的思考,可以提出自己的猜想,如果猜想是对的,恭喜你,你将会收获成就感;如果猜想是错的,更要恭喜你,你可以对比自己的猜想和源码的实现,看看差在哪个地方,这个地方就是你提升的空间。

3.1.1 broker 启动流程

代码位置:BrokerStartup#main

流程图如下:

broker_startup.png

以上流程图的大致步骤是:

  1. 创建 broker 的核心控制器 BrokerController
  2. 启动 BrokerController:这一步会启动很多服务,如消息存储服务、netty 服务端、fileWatchService、以及我们重点关心的给 NameServer 发送心跳的服务

这里我们重点分析图中的第 18~20:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码        // 1. 在 broker 启动时,先做一次注册
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}

// 2. 接下来先延迟 10s,然后每隔30s(brokerConfig.getRegisterNameServerPeriod()我配置的是30s)进行一次注册
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

为什么要先延迟 10s?

因为 broker 刚刚已经发送了注册请求,没有必要立马再进行注册,所以定时任务线程池先延迟了10s。这种设计很细节,但在业务上是有效的,避免不必要的资源浪费。

要正确理解 registerBrokerAll方法的意思:这个方法并不是把“所有的broker”都注册,而是把该 broker 注册到所有的 NameServer 上,这一点在后面的源码中可以得到验证。

3.1.2 registerBrokerAll

这个方法有三个参数:

1
2
3
java复制代码boolean checkOrderConfig, // 是否校验 顺序消息配置
boolean oneway, // 是否是 单向发送,单向发送不接收返回值
boolean forceRegister // 是否强制注册
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// topicConfigWrapper 中封装了 该broker 上的 topic 信息和 dataVersion
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

// 这块代码的作用是将 topicConfigWrapper 中的值取出来重新封装一遍,又再塞回 topicConfigWrapper,我理解是为了将 this.brokerConfig.getBrokerPermission()
// 的属性值 set 进去。不过这并不是很重要的细节,我们只要知道 topicConfigWrapper 至少包含了该broker 上的 topic 信息和 dataVersion 即可
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}

if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}

重点看一下最下面的这块if语句的内容,由于forceRigister == true,所以后面的 needRegister方法的逻辑并没有机会执行。但这里还是简单讲一下这个方法:

  1. broker 请求 NameServer,查询 broker 相关的配置在 NameServer 端的数据版本,请求类型:QUERY_DATA_VERSION = 322
  2. NameServer 接收请求,DefaultRequestProcessor#processRequest,会将从 broker 发送过来的 dataVersion 和 NameServer 存储的进行比较,如果不相等,则需要在 NameServer 端更新 broker 的心跳更新时间lastUpdateTimestamp;如果相等返回changed == false的结果
  3. Broker 端处理所有 NameServer 返回的结果,只要有一个 changed == true,那么needRegister == true

也就是需要执行doRegisterBrokerAll

当然在当前 broker 启动过程中,是一定会执行 doRegisterBrokerAll的。

3.1.3 doRegisterBrokerAll

这个方法主要做了两件事:

  1. 调用 brokerOuterAPI.registerBrokerAll进行注册
  2. 处理注册结果 registerBrokerResultList:进行 master 地址的更新、顺序消息Topic的配置更新

重点在第一步。

BrokerOuterAPI 这个类是 broker 对外交互的类,其中封装了 RemotingClient remotingClient ,在这里是作为客户端向 NameServer 发送真正的注册请求。

看一下registerBrokerAll 中核心的一块代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码 // CountDownLatch 使得只有所有 nameServer 的响应结果都返回时才会继续执行后续的逻辑
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
// 遍历所有的 NameServer,并将注册任务 registerBroker 丢进 brokerOuterExecutor 线程池中执行
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}

log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
// 每返回一个结果,减1
countDownLatch.countDown();
}
}
});
}

try {
// 主线程阻塞在此,直到所有的 countDownLatch 减为0
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}

至于registerBroker,里面的逻辑就是去调用 netty 客户端的 invokeSync或invokeOneway,去向 NameServer 发送请求。具体的通信过程和原理涉及到 netty,不是本文的重点。笔者也计划后续会进行 netty 源码的解析,敬请期待。

值得一提的是,上面这段代码采用多线程的方式,使用到了:

  • CountDownLatch
  • BrokerFixedThreadPoolExecutor :父类是 ThreadPoolExecutor
  • CopyOnWriteArrayList:存储线程执行结果,因为存在多线程的写操作,所以需要使用并发安全的容器

对并发编程感兴趣的同学可以学习下这里的用法,同时有兴趣了解其原理的,可以查看 JDK 源码。

到这一步,对于 注册 这件事来说,broker 端算是完成了它的工作,后续就是 NameServer 接收到请求去处理的事了。

3.1.4 NameServer 处理注册请求

我们再回到 NameServer ,看看是怎么处理注册请求的。我们稍微思考下,这个处理请求的代码入口在哪呢?(实际过程中是通过代码调试得知入口的,但代码调试得到的结果实际上有点像翻答案,我们可以尝试自己先思考下)

当然首先是 netty 服务端先接收到请求,因此我们先去看一下 NettyRemotingServer,看了一圈发现这个类里并没有类似处理请求的方法。但是这个类集成了 NettyRemotingAbstract,我们继续在这里找一下,发现了这个类里有个方法叫 processRequestCommand。

这个类最后会调用到 NameServer 的 DefaultRequestProcessor#processRequest,这个方法中帮助 NameServer 处理来自客户端和 Broker 的各种请求。

流程图:

namesrv_process_request.png

首先方法一进来就是一个 switch case,找到 REGISTER_BROKER

1
2
3
4
5
6
7
8
9
10
11
java复制代码switch (request.getCode()) {
...
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
// 根据版本区分,实际两个方法没有很大的区别
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
}

中间的请求crc校验、请求参数的处理过程我们不详细看了。值得一提的是,NameServer 中管理路由信息的类是 RouteInfoManager,其维护了五张表:

1
2
3
4
5
java复制代码    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;// Topic 消息队列的路由信息,消息发送时根据此表进行负载均衡
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;// broker 地址信息
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;// 集群信息
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;// 存活的broker
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

看一下更新 brokerLiveTable 的逻辑,注意:其它几张表也会被更新

1
2
3
4
5
6
java复制代码  BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));

把当前系统时间作为 lastUpdateTimestamp broker 上报心跳的时间。

后续会将处理结果封装并返回到请求响应中,通过 netty 再返回给 broker,与上面分析的 broker 端的流程形成闭环。

至此,关于注册的源码分析全部完成,最后来小结一下

3.1.5 小结

  1. broker 服务器在启动时会向**所有的 NameServer **注册,并建立长连接,之后每隔30s发送一次心跳,内容包括 brokerId,broker 地址,名称和集群信息
  2. NameServer 接收到心跳包后,会将整个消息集群的数据存入到 RouteInfoManager 的几个HashMap中,并更新 lastUpdateTimestamp

其中涉及到的一些可深入的技术点:

  1. 并发编程,包括线程池的使用、并发组件(CountDownLatch、CopyOnWriteArrayList)、锁的使用(NameServer 更新几张路由表的时候)
  2. netty 相关的网络编程知识

3.2 发现

客户端在从 NameServer 中获取 broker 相关信息,这个过程就是路由发现。我们以生产者为例分析路由发现。

路由信息在生产者中存放在 DefaultMQProducerImpl.topicPublishInfoTable中

1
2
java复制代码   private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();

是一个并发安全的容器,为什么要使用ConcurrentMap呢,因为它的写入口实际上有两个,也就是生产者路由发现的时机有两个:

  1. 发送消息时会去检查 topicPublishInfoTable 是否为空或可用,不符合条件则去 NameServer 中查询
    • DefaultMQProducer#send,实际调用的是 DefaultMQProducerImpl#tryToFindTopicPublishInfo
  2. 生产者启动时也会启动一个定时任务,定时从 NameServer 上拉取 topic 信息
    • 这个代码路径是:DefaultMQProducerImpl#start -> mQClientFactory.start() -> MQClientInstance#this.startScheduledTask()-> MQClientInstance#this.updateTopicRouteInfoFromNameServer();

为什么要有两个入口呢?

  • 这是因为路由信息变更时,nameserver不会主动推送,需要客户端主动拉取路由信息才能将客户端上路由信息进行更新。请求类型 GET_ROUTEINFO_BY_TOPIC,调用RouteInfoManager的pickupTopicRouteData方法,这样设计的目的是降低 NameServer 的复杂度。因此第2种方式是必不可少的。
  • 而第一种方式,就更好理解了,按需更新,这里的需是指发消息的需求。

两个方法都会调用下面这个方法MQClientInstance:

1
2
java复制代码public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer)

最终调用 MQClientAPIImpl#getTopicRouteInfoFromNameServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);

// 请求code: GET_ROUTEINFO_BY_TOPIC,最终又会前面提到的 processRequest 方法中,根据code找到处理逻辑
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}

break;
}
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}

throw new MQClientException(response.getCode(), response.getRemark());
}

NameServer 端处理请求的逻辑比较简单,就是查询一下路由信息然后返回,这里就不再赘述。

至此,NameServer 的注册发现分析完毕

四、路由删除

路由删除有两个触发点:

  1. broker 非正常关闭,NameServer 发现 broker 无响应,将其删除。详细过程:
    • Broker 每隔 30s 向 nameserver 发送心跳包,并更新 brokerLiveTable 中的信息,尤其是 lastUpdateTimestamp。
    • namaserver 每隔10s会扫描 brokerLiveTable,如果发现 lastUpdateTimestamp 距离当前时间已经超过了120s,则认为 Broker 宕机,会进行路由删除操作。
  2. Broker 正常关闭时,与 NameServer 断开长连接,会执行 unregisterBroker 指令

路由删除比较简单,大家可以对照源码验证下这里讲的两个触发点。

五、前言中提到的几个问题

现在来回顾一下文章开头我们提到的三个问题。第一个问题已经解答完了,重点看一下第二和第三个问题

5.1 NameServer 如何解决数据不一致的问题

首先要理解为什么 NameServer 会有数据不一致的问题。因为 NameServer 虽然是集群部署,但各个节点之间是相互独立不进行通信的。那么在进行路由注册、删除时,不同节点之间存在不一样数据的情况是必然存在的

如何解决呢?事实上,RocketMQ 并不认为这是一个需要去解决的问题。因为 Topic 路由信息本身就不需要追求集群中各个节点的强一致性,只需要做到最终一致性。

说白了,NameServer 的各个节点根本不关心自己的数据和别的节点是不是一致。关心这件事的人是生产者和消费者。而客户端关心这件事的本质其实是:我希望我拿到的路由信息尽量是正确的,可用的,也就是我根据获取到路由信息选择了一个 broker 去发送消息,这个 broker 是能正常接收到的。

那这就有问题了,因为上面讲路由删除的时候,提到了: NameServer 发现 lastUpdateTimestamp 距离当前时间已经超过了120s,才认为 Broker 宕机,会进行路由删除操作。也就是说,会有 2 分钟的空档,这 2 分钟,很可能生产者会向一个已经宕机的 broker 发送消息。那这种情况怎么办呢?

这个问题先按下不表,因为答案并不在 NameServer,而是在 producer 中。重要的是,现在我又有了一个好问题!

5.2 为什么rocketmq选择自己开发一个NameServer,而不是使用zk

事实上,在RocketMQ的早期版本,即MetaQ 1.x和MetaQ 2.x阶段,也是依赖Zookeeper的。但MetaQ 3.x(即RocketMQ)却去掉了ZooKeeper依赖,转而采用自己的NameServer。

因为 RocketMQ 的设计理念是简单高效,并且 RocketMQ 的架构设计决定了它只需要一个轻量级的元数据服务器就足够了,只需要保持最终一致,而不需要Zookeeper这样的强一致性解决方案。这么做的好处很明显:不需要再依赖另一个中间件,从而减少整体维护成本。

这里可以稍微扩展下,其实选择 NameServer 还是选择 Zookeeper 代表了在分布式系统中的设计侧重点。

根据CAP理论, RocketMQ 在注册中心这个模块的设计上选择了 AP 模式的 NameServer,而不是 CP 模式的 Zookeeper

不使用 zookeeper 的原因是当 RocketMQ 没满足A(可用性)带来的影响比较大,影响稳定性

Zookeeper CP 的适用场景:

  • 分布式选主,主备高可用切换等场景下有不可替代的作用,而这些需求往往多集中在大数据、离线任务等相关的业务领域,因为大数据领域,讲究分割数据集,并且大部分时间分任务多进程 / 线程并行处理这些数据集,但是总是有一些点上需要将这些任务和进程统一协调,这时候就是 ZooKeeper 发挥巨大作用的用武之地。
  • 但是在交易场景交易链路上,在主业务数据存取,大规模服务发现、大规模健康监测等方面有天然的短板,应该竭力避免在这些场景下引入 ZooKeeper,在生产实践中,应用对 ZooKeeper 申请使用的时候要进行严格的场景、容量、SLA 需求的评估。

NameServer 的适用场景:

  • NameServer作为一个名称服务,需要提供服务注册、服务剔除、服务发现这些基本功能,但是NameServer节点之间并不通信,容忍在某个时刻各个节点数据可能不一致的情况下 所以可以使用 CP,也可以使用AP,但是大数据使用CP,在线服务则AP,分布式协调、选主使用CP,服务发现使用 AP

参考资料:

  • RocketMQ 4.8.0 源码
  • github.com/apache/rock…
  • github.com/DillonDong/…
  • 《RocketMQ 技术内幕》

最后

  • 如果觉得有收获,三连支持下;
  • 文章若有错误,欢迎评论留言指出,也欢迎转载,转载请注明出处;
  • 个人vx:Listener27, 交流技术、面试、学习资料、帮助一线互联网大厂内推等

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

C++ Primer Plus 第02章 开始学习C++ 学

发表于 2021-07-10

第二章 开始学习C++

  1. 进入C++

1
2
3
4
5
6
7
8
9
10
11
12
cpp复制代码/*第一个C++程序*/

#include <iostream>
using namespace std; /*定义一个可视化*/

int main(void)
{
cout<<"Come up an C++"<<endl;
cout<<"You won't regret it"<<endl;

return 0;
}

对于一个C++ 程序主要包含以下元素:

  • 注释:由前缀// 或者是 /* */ 标识
  • 预处理器编译指令#include
  • 函数头:int main()
  • 编译指令:using namespace
  • 函数体:用{ } 括起来
  • 使用C++ 的cout工具显示消息的语句
  • 结束main()函数的return语句

1.1、main()函数头

main() 被启动代码调用,而启动代码是编译器添加到程序中。

函数头描述 main() 和OS(UNIX/Linux、Windows、mac os等)直接的接口。

空括号的main() 不接受任何参数。

1
2
3
4
5
cpp复制代码int main(void)
{
statement
return 0;
}

main()函数描述了函数的行为。同时也构成了两部分的 函数定义(function definition) :第一行int main()函数叫做 函数头(function heading),花括号({和})中包括的部分叫 函数体。

函数体:指出函数应做什么的计算机指令。

在C++中,每条完整的指令叫做语句。所有的语句都是以 分号结束。

main()中最后一条语句叫做 返回语句(return statement),结束main()函数。

⚠️注意:C++程序通常是以main() 函数开始执行,如果没有,程序不完整,则编译器会指出未定义main()函数。

大小写都必须准确

不需要main()函数的特殊情况:

  1. Windows中的动态链接(DLL)模块。
  2. 单片机或机器人芯片

1.2、C++注释

C++中的注释以 双斜杠(//) 打头。以行尾作为结束。

注释的作用:为程序提供解释说明,使得程序通俗易懂。

通常标识程序的一部分或者是标识代码的某个方面。

注意点:编译器不会运行,会直接忽略注释。

C++也可以识别C语言的注释

C语言风格的注释

  • 多行注释:符号/* 和 */ 之间,以 */ 作为注释的结束。
  • 单行注释:以 双斜杠(//) 开始,行尾作为结束。

1.3、预处理器和头文件

如果程序要使用C++输入或输出工具时,必须使用两行代码:

1
2
cpp复制代码#include <iostream>
using namespace std;

使用了 #include作为了预编译指令,也就是所谓的预处理器指令。

预处理器的操作:在源代码被编译之前,替换或添加文本。

例如:

1
cpp复制代码#include <iostream>

像iostream这样的文件叫做 包含文件(include file) ———— 也被包含在其他的文件中,所以也叫做 **头文件(header file) **。

头文件命名约定

头文件类型 约定 示例 说明
C++旧式风格 以 .h 结尾 iostream.h C++程序可以使用
C旧式风格 以 .h 结尾 math.h C、C++ 程序可以使用
C++ 新式风格 没有扩展名 iostream C++程序可以使用,使用namespace std;
转换后的C 加上前缀c,没有扩展名 cmath C++ 程序可以使用,可以使用不是C的特性,如namespace std;

1.4、名称空间(namespace)

如果程序中使用的是 iostream ,那么就需要使用名称空间编译指令来使得对程序可用。

1
cpp复制代码using namespace std;

也叫做 using编译指令。

1.5、使用cout进行C++的输出

1
2
cpp复制代码    cout<<"Come up an C++"<<endl;
cout<<"You won't regret it"<<endl;

双引号 括起来的部分就是要打印的消息。

在C++中,使用双引号括起来的一系列字符叫做 字符串,由于若干个字符组合而成。

<< 指出信息流动的路径,cout是一个预定义的对象。

初识运算符重载

<< 既可以是插入运算符,也可以是左移运算符。

典型的运算符重载的情况,通过重载,同一个运算符代表不同的含义。具体含义编译器会通过上下文来确定。

常见的运算符重载例子

  • & —-> 既表示地址运算符,又表示按位AND运算符。
  • * —-> 既表示乘法,也表示对指针解除引用。

控制符endl

1
cpp复制代码cout<<endl;

endl 是C++中的一个特殊符号,作用:重起一行。

在输出流中插入endl使得屏幕光标移到下一行开头。

endl在头文件iostream中定义,且位于名称空间std中。

换行符

C++也也提供了早期C语言表示换行的方法;C语言符号\n。

\n 被视为一个字符,名为换行符,也就是C++中的endl的重起一行。

显示字符串时,在字符串中包含换行符,而不是在末尾添加endl,可减少输入量。

1
2
3
cpp复制代码/*两个语法都是一样的,都是:重起一行*/
cout<<"\n";
cout<<endl;

1.6、C++源代码风格

C++中源代码的风格遵循以下规则:

  • 每条语句各占一行。
  • 每个函数都有一个开始花括号和一个结束花括号,两个花括号各占一行。
  • 函数中的语句都相对于花括号进行缩进。
  • 与函数名称相关的圆括号周围没有空白。
  1. C++语句

程序代码例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
cpp复制代码#include<iostream>
using namespace std;

int main()
{
int carrots; // 声明一个整型变量

carrots = 25; // 给变量赋值
cout<<"我有 "<<carrots<<" 个carrots."<<endl;

carrots = carrots - 1; // 对变量进行修改
cout<<"快看!快看!现在,我有 "<<carrots<<" 个carrots!"<<endl;

return 0;
}

2.1 声明语句和变量

在C++中,首次使用变量时,必须声明 。可以避免因拼写错误而难以发现错误。

声明通常指出要存储的数据类型和程序对存储在内存里的数据使用的名称。

程序中的声明语句叫作定义声明语句,简称定义。定义会使得编译器为变量分配内存。

⚠️注意:声明不一定是定义。

2.2 赋值语句

赋值语句将值赋给存储单元。

符号 = 叫作赋值运算符。👉 小Tips:C++中允许连续使用赋值运算符。

赋值是从右向左进行。

  1. 其他C++语句

3.1 cin 和cout

cin 使用 >> 运算符从输入流中抽取字符。

可以将通过键盘输入的一列字符(即输入)转换为接收信息的变量能够接受的形式。

cout 的对象属性包含一个插入运算符 << ,将右侧的信息插入到输出流中。

<< 运算符可以进行多条输出语句的拼接。

3.2 类的简介

类是C++ 面向对象编程(OOP)的核心概念之一。

什么是类?

类是用户定义的一种数据类型。

要定义类,需要描述它能够表示什么信息和可对数据执行什么操作。

类定义描述的是:数据格式及其用法,而对象则是根据数据格式规范创建的实体。

两个类cin类 和 cout类

  • cin类:istream类对象
  • cout类:ostream类对象,ostream类定义描述了ostream对象表示的数据以及对它执行的操作。

两个类没有被内置到编译器中。
注意点:类描述了一种数据类型的全部属性(包括使用它执行的操作),对象则是根据描述创建的实体。

C++中信息发送的方式

  • 使用类方法(函数调用等)
  • 重新定义运算符
  1. 函数

两种C++函数

  • 有返回值
  • 无返回值

4.1 有返回值的函数

有返回值的函数将生成一个值,而值将赋值给变量或其他的表达式中使用。

  • 被调用函数:被调用的函数
  • 调用函数:包含调用的函数
  • 返回值:发送回的值

参数 是发送给函数的信息, 返回值 是从函数中发送回去的值。

👉 小Tips:对于C++编译器而言,函数的参数类型和返回值类型必须一致 。

⚠️注意:C++程序应当为程序中使用的每个函数提供原型。

函数原型结尾必须以 分号(;) 结束。如果省略分号,编译器则认为是函数头,要求提供定义该函数的函数体。

不要混淆函数原型和函数定义

函数原型只描述函数接口。

函数定义包含函数的代码。

👉 小Tips:首次使用函数之前提供原型,一般把原型放在 main()函数定义前面。

4.2 函数变体

  • 在原型中使用关键字void来指定返回类型,指出函数没有返回值。
1
cpp复制代码void bucks(double);
  • 关键字void不接受任何参数。如果省略void,让括号为空,则C++解释为一个不接受任何参数的隐式声明。
1
cpp复制代码int rand(void);

4.3 用户定义的函数

对于库函数,在使用之前必须提供其原型,通常把原型放在main()定义之前。

  • 函数格式
    函数格式为:一个函数头 + 花括号的函数体。
1
2
3
4
cpp复制代码type functionname(arguementlist)
{
statements
}

C++ 不允许将函数定义嵌套在另一个函数中,每个函数定义独立。

  • 函数头
    例如main()函数头。

⚠️注意:关键字是专用词,不能用作他用。

return不能用作变量名,double不能用作函数名。

4.4 用户定义的有返回值的函数

有返回值的函数,使用关键字return来提供返回值,并结束函数。

函数的特性

  • 有函数头和函数体
  • 接受一个参数
  • 返回一个值
  • 需要一个原型

4.5 在多函数程序中使用using编译指令

让程序访问名称空间std的4种方法

  • 将 using namespace std; 放在函数定义之前,让文件种所有的函数都能够使用名称空间std中所有的元素。
  • 将 using namespace std; 放在特定的函数定义之前,让该函数能够使用名称空间std中的所有元素。
  • 在特定的函数中使用类似 using std::cout;编译命令,而不是 using namespace std; 让该函数能使用指定的元素,如cout。
  • 完全不使用编译指令using,而在需要使用名称空间std中的元素是,使用前缀std::。

Github地址:github.com/SolerHo/cpp…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

短链系统设计

发表于 2021-07-10

什么是短链接?

短链接就是就是把普通的链接转换为较短的网址。比如常见的我们收到的营销短信中链接hm.tb.cn/x.V8VgyA
短链接优点:短,字符少,美观,便于发布传播。

image.png

百度短网址: dwz.cn/

谷歌短网址: goo.gl/

实现原理

image.png
基本原理大概可以分为下面5步

  1. 用户输入短链接网址hm.tb.cn/x.V8VgyA
  2. DNS解析hm.tb.cn IP地址
  3. 解析出来IP地址,发送HTTP GET请求,查询短码x.V8VgyA
  4. 服务器通过短码查询对应长URL
  5. 通过301重定向跳转到对应的长URL
    注:这里我们要区分301和302重定向的区别,看具体自己应用场景。
    HTTP响应状态代码,301 是永久重定向, 302 是临时重定向
    301 永久重定向,GET请求301跳转会默认被浏览器缓存起来。302 GET请求默认不会被缓存

短链接转换核心算法

如何将长链接转换为短链接,我们一般的应用场景比如短信营销,用户通过短链接跳转到对应的营销活动落地页面,短链接长链接转换的核心其实就是长链短链的映射关系,以及如何将长链接转换为短链接。

一般常用的有两种算法1. 哈希 2. 全局唯一ID算法

  • 1. MurmurHash 哈希算法
    哈希算法有很多种,应用广泛且性能较高的MurmurHash算法,比如我们常见的Redis一致性Hash算法就是基于MurmurHash实现的。

MurmurHash 提供了两种长度的哈希值,128bits和32bits,

  • 2. 全局唯一ID
    全局唯一ID,其实我们能想到的就有很多,比如手机号码,会员ID等,或者代码生成比如雪花算法,MySQL自增ID。

获取唯一的短码之后,我们需要短码尽可能的短,还需要将long型id转换为BASE62字符,Base62比BASE64少了两个字符’/‘,’+’

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class Base62Utils {

private static final char[] toBASE62 = {
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z',
'A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'
};

public static String toBase62(long num) {
StringBuilder sb = new StringBuilder();
do {
int i = (int) (num % 62);
sb.append(toBASE62[i]);
num /= 62;
} while(num > 0);
return sb.reverse().toString();
}
}

数据存储设计

数据存储设计,主要还是看具体的业务场景,比如数据量在百亿级别,MySQL估计是搞不定了。那么如何选型NoSQL数据库?常见的NoSQL数据库MongoDB, Hbase,ES,DynamoDB, Cassandra 或者 Riak等等。具体的业务选型还需要考虑你们公司所在的技术栈和运维成本等等。

如何支持高并发?如何防止缓存穿透等等?

参考:

www.cnblogs.com/rjzheng/p/1…

time.geekbang.org/column/arti…

zhuanlan.zhihu.com/p/91947139

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

《高可用系列》-熔断降级我学会了!

发表于 2021-07-10

本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!

概述

高可用三剑客 限流,熔断和削峰 终于来到第二篇, 熔断降级专题了,想回顾限流相关内容的童鞋,可以查看一下,下面文章,欢迎点赞,收藏,关注三连,感谢!

限流系列文章:

  • 《面试补习》- 你来说说什么是限流?
  • 限流神器Sentinel,不了解一下吗?
  • 阿里P7大佬带你解密Sentinel

仅以两张图来初步形容一下 熔断 适用的场景:

  • 雪崩

  • 股灾


什么是熔断


来自 wiki 的 熔断机制 描述:

1
2
3
4
5
6
7
复制代码熔断机制(英语:Circuit breaker / Trading curb)指的是在股票市场的交易时间中,
当价格波动的幅度达到某一个限定的目标(熔断点)时,对其暂停交易一段时间的机制。
此机制如同保险丝在电流过大时候熔断,故而得名。

熔断机制推出的目的是为了防范系统性风险,给市场更多的冷静时间,避免恐慌情绪蔓延导致市场波动,
从而防止大规模股价下跌现象的发生。
然而熔断机制也因切断了资金的流通性,同样会造成市场情绪加大,并令市场风险在熔断期结束后继续扩大。

转换成互联网语言可以这么理解:

  • 当异常幅度达到设定的阀值后触发的系统保护机制
  • 保护机制会将某部分能力关闭,以保证大部分能力的正常
  • 这种机制是有损的,但是利大于端

熔断机制的特点,在关闭一段时间后,会自动触发恢复检测,如果发现服务正常,则将服务逐渐开放。

1、雪崩效应

在分布式服务部署的架构下,整体链路可以参考为:

image.png

如果在大促期间, DB_2 由于 机器负载过高,sql执行缓慢,链接数打满 或网络抖动等情况,导致 DB_2 不可用,那么整体链路的影响就会变成:

image.png

服务雪崩的每个阶段都可能由不同的原因造成, 比如造成 服务不可用 的原因有:

  • 硬件故障
  • 程序Bug
  • 缓存击穿
  • 用户大量请求

2、雪崩处理策略

  • 流量控制: 限流和削峰都属于流量控制的一种策略
  • 缓存优化: 在上述案例中,DB 由于压力过大导致的雪崩,可以引入缓存,减轻DB压力
  • 服务降级: 通过异常分支链路的快速失败,确保主链路正常提供服务
  • 应用扩容: 针对机器压力过大,负载过高,可以通过机器扩容来解决,缓解流量压力

断路器模式

熔断器模式(Circuit Breaker Pattern),是一个现代软件开发的设计模式。用以侦测错误,并避免不断地触发相同的错误(如维护时服务不可用、暂时性的系统问题或是未知的系统错误)。

状态描述:

  • 关闭:熔断器默认处于关闭状态,熔断器本身带有计数能力(如滑动窗口实现),当失败数量达到预设阀值后,触发状态变更,熔断器被打开
  • 开启:在一定时间内,所有请求都会被拒绝,或采用备用链路处理。
  • 半开启: 在刷新时间窗口后,会进入半开启状态,熔断器尝试接受请求,如果这阶段出现请求失败,直接恢复到开启状态。

image.png

隔离策略

1、线程隔离

Hystrix 采用了 Bulkhead Partition 舱壁隔离技术,来将外部依赖进行资源隔离,进而避免任何外部依赖的故障导致本服务崩溃。

舱壁隔离,是说将船体内部空间区隔划分成若干个隔舱,一旦某几个隔舱发生破损进水,水流不会在其间相互流动,如此一来船舶在受损时,依然能具有足够的浮力和稳定性,进而减低立即沉船的危险。

image.png

图片来源: 《防雪崩利器:熔断器 Hystrix 的原理与使用》

Hystrix 在线程池隔离实现主要解决一下场景:

在商品详情系统中,如果没有对服务做降级措施,那么当评论服务出现异常时,整个商品详情系统都会受到影响,最终导致用户无法查看商品详情。

在这个例子中,商品详情服务,从请求入口分配线程处理,对每个服务使用同一个线程进行处理(同步),在评论服务出现异常时(响应缓慢,处理超时,服务异常等),导致整个线程阻塞,服务端响应超时,触发用户重试刷新请求,最终导致服务雪崩,系统崩溃。

image.png

Hystrix 线程池隔离方案;

hystrix把每个依赖都进行隔离,对依赖的调用全部包装成HystrixCommand或者HystrixObservableCommand 在服务调用时,分配独立的线程池进行资源隔离调用,如下图中的评论服务出现不可用时,商品详情系统还是能够将商品信息,大促信息封装好返回给用户。评论服务的异常,并不会影响其他依赖的调用。

image.png

线程隔离特点

优点:

  • 一个依赖可以给予一个线程池,这个依赖的异常不会影响其他的依赖。
  • 使用线程可以完全隔离第三方代码,请求线程可以快速放回。
  • 当一个失败的依赖再次变成可用时,线程池将清理,并立即恢复可用,而不是一个长时间的恢复。
  • 可以完全模拟异步调用,方便异步编程。
  • 使用线程池,可以有效的进行实时监控、统计和封装。

缺点:

  • 使用线程池的缺点主要是增加了计算的开销。每一个依赖调用都会涉及到队列,调度,上下文切换,而这些操作都有可能在不同的线程中执行。

线程切换的性能损耗问题

Netflix在使用过程中详细评估了使用异步线程和同步线程带来的性能差异,结果表明在99%的情况下,异步线程带来的几毫秒延迟的完全可以接受的

2、信号量隔离

Hystrix 的信号量隔离限制对某个资源调用的异常比例。

Sentinel 在信号量隔离的限制上提供了更多的策略选择,基于慢调用比例、异常比例和异常数。

信号量隔离实现原理

Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,在 信号量隔离的底层实现中, 通过根据不同的策略,如 异常数 策略,统计在 滑动窗口区间内, 异常请求量的比例,来决定对服务进行熔断降级处理。

滑动窗口示意图:

image.png

1、慢调用比例 (SLOW_REQUEST_RATIO)
设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当调用请求数量大于阀值,触发熔断。阀值设置,100ms响应,10个请求 如下图所示:

image.png

2、异常比例 (ERROR_RATIO

当单位统计时长内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。阀值设置 20% 如下图所示:

image.png

3、异常数 (ERROR_COUNT)

当单位统计时长内的异常数目超过阈值之后会自动进行熔断。阀值设置 5 如图所示:

image.png

熔断降级组件对比

image.png

Sentinel

Sentinel是阿里中间件团队开源的,面向分布式服务架构的轻量级高可用流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。

Sentinel 的侧重点在于:

  • 多样化的流量控制
  • 熔断降级
  • 系统负载保护
  • 实时监控和控制台

Hystrix

Hystrix是Netflix开源的一款容错系统,能帮助使用者码出具备强大的容错能力和鲁棒性的程序。提供降级,熔断等功能。在2018年底,Hystrix在其Github主页宣布,不再开放新功能,推荐开发者使用其他仍然活跃的开源项目。

1
2
3
4
5
6
7
8
sql复制代码官方 wiki 描述:
Hystrix is designed to do the following:

Give protection from and control over latency and failure from dependencies accessed (typically over the network) via third-party client libraries.
Stop cascading failures in a complex distributed system.
Fail fast and rapidly recover.
Fallback and gracefully degrade when possible.
Enable near real-time monitoring, alerting, and operational control.
  1. 对通过第三方客户端库访问的依赖项(通常是通过网络)的延迟和故障进行保护和控制。
  2. 在复杂的分布式系统中阻止级联故障。
  3. 快速失败,快速恢复。
  4. 回退,尽可能优雅地降级。
  5. 启用近实时监控、警报和操作控制。

resilience4j

resilience4j是一个轻量、易用、可组装的高可用框架,支持熔断、高频控制、隔离、限流、限时、重试等多种高可用机制。Netflix 官方在停止维护 Hystrix 后,推荐使用 resilience4j 作为替代方案。

与Hystrix相比,它有以下一些主要的区别:

  • Hystrix调用必须被封装到HystrixCommand里,而resilience4j以装饰器的方式提供对函数式接口、lambda表达式等的嵌套装饰,因此你可以用简洁的方式组合多种高可用机制
  • Hystrix的频次统计采用滑动窗口的方式,而resilience4j采用环状缓冲区的方式
  • 关于熔断器在半开状态时的状态转换,Hystrix仅使用一次执行判定是否进行状态转换,而resilience4j则采用可配置的执行次数与阈值,来决定是否进行状态转换,这种方式提高了熔断机制的稳定性
  • 关于隔离机制,Hystrix提供基于线程池和信号量的隔离,而resilience4j只提供基于信号量的隔离

点关注,不迷路

好了各位,以上就是这篇文章的全部内容了,我后面会每周都更新几篇高质量的大厂面试和常用技术栈相关的文章。感谢大伙能看到这里,如果这个文章写得还不错, 求三连!!! 创作不易,感谢各位的支持和认可,我们下篇文章见!

我是 九灵 ,有需要交流的童鞋可以 加我wx,Jayce-K,关注公众号:Java 补习课,掌握第一手资料!
如果本篇博客有任何错误,请批评指教,不胜感激 !

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java并发系列终结篇:彻底搞懂Java线程池的工作原理

发表于 2021-07-10

多线程并发是Java语言中非常重要的一块内容,同时,也是Java基础的一个难点。说它重要是因为多线程是日常开发中频繁用到的知识,说它难是因为多线程并发涉及到的知识点非常之多,想要完全掌握Java的并发相关知识并非易事。也正因此,Java并发成了Java面试中最高频的知识点之一。本系列文章将从Java内存模型、volatile关键字、synchronized关键字、ReetrantLock、Atomic并发类以及线程池等方面来系统的认识Java的并发知识。通过本系列文章的学习你将深入理解volatile关键字的作用,了解到synchronized实现原理、AQS和CLH队列锁,清晰的认识自旋锁、偏向锁、乐观锁、悲观锁…等等一系列让人眼花缭乱的并发知识。

多线程并发系列文章:

这一次,彻底搞懂Java内存模型与volatile关键字

这一次,彻底搞懂Java中的synchronized关键字

这一次,彻底搞懂Java中的ReentrantLock实现原理

这一次,彻底搞懂Java并发包中的Atomic原子类

深入理解Java线程的等待与唤醒机制(一)

深入理解Java线程的等待与唤醒机制(二)

Java并发系列终结篇:彻底搞懂Java线程池的工作原理

Java并发系列番外篇:ThreadLocal原理其实很简单

本篇文章是多线程并发系列的最后一篇,将深入分析Java中线程池的工作原理。个人认为线程池是Java并发中比较难已理解的一块知识,因为线程池内部实现使用到了大量的像ReentrantLock、AQS、AtomicInteger、CAS以及“生产者-消费者”模型等并发相关的知识,基本上涵盖了并发系列前几篇文章的大部分知识点。这也是为什么把线程池放到最后来写的原因。本篇文章权当是一个并发系列的综合练习,刚好巩固实践一下前面知识点的运用。

开始之前先给大家推荐一下AndroidNote这个GitHub仓库,这里是我的学习笔记,同时也是我文章初稿的出处。这个仓库中汇总了大量的java进阶和Android进阶知识。是一个比较系统且全面的Android知识库。对于准备面试的同学也是一份不可多得的面试宝典,欢迎大家到GitHub的仓库主页关注。

一、线程池基础知识

在Java语言中,虽然创建并启动一个线程非常方便,但是由于创建线程需要占用一定的操作系统资源,在高并发的情况下,频繁的创建和销毁线程会大量消耗CPU和内存资源,对程序性能造成很大的影响。为了避免这一问题,Java给我们提供了线程池。

线程池是一种基于池化技术思想来管理线程的工具。在线程池中维护了多个线程,由线程池统一的管理调配线程来执行任务。通过线程复用,减少了频繁创建和销毁线程的开销。

本章内容我们先来了解一下线程池的一些基础知识,学习如何使用线程池以及了解线程池的生命周期。

1.线程池的使用

线程池的使用和创建可以说非常的简单,这得益于JDK提供给我们良好封装的API。线程池的实现被封装到了ThreadPoolExecutor中,我们可以通过ThreadPoolExecutor的构造方法来实例化出一个线程池,代码如下:

1
2
3
4
5
6
7
8
9
10
11
java复制代码// 实例化一个线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));
// 使用线程池执行一个任务
executor.execute(() -> {
// Do something
});
// 关闭线程池,会阻止新任务提交,但不影响已提交的任务
executor.shutdown();
// 关闭线程池,阻止新任务提交,并且中断当前正在运行的线程
executor.showdownNow();

创建好线程池后直接调用execute方法并传入一个Runnable参数即可将任务交给线程池执行,通过shutdown/shutdownNow方法可以关闭线程池。

ThreadPoolExecutor的构造方法中参数众多,对于初学者而言在没有了解各个参数的作用的情况下很难去配置合适的线程池。因此Java还为我们提供了一个线程池工具类Executors来快捷的创建线程池。Executors提供了很多简便的创建线程池的方法,举两个例子,代码如下:

1
2
3
4
5
6
java复制代码// 实例化一个单线程的线程池
ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
// 创建固定线程个数的线程池
ExecutorService fixedExecutor = Executors.newFixedThreadPool(10);
// 创建一个可重用固定线程数的线程池
ExecutorService executorService2 = Executors.newCachedThreadPool();

但是,通常来说在实际开发中并不推荐直接使用Executors来创建线程池,而是需要根据项目实际情况配置适合自己项目的线程池,关于如何配置合适的线程池这是后话,需要我们理解线程池的各个参数以及线程池的工作原理之后才能有答案。

2.线程池的生命周期

线程池从诞生到死亡,中间会经历RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五个生命周期状态。

  • RUNNING 表示线程池处于运行状态,能够接受新提交的任务且能对已添加的任务进行处理。RUNNING状态是线程池的初始化状态,线程池一旦被创建就处于RUNNING状态。
  • SHUTDOWN 线程处于关闭状态,不接受新任务,但可以处理已添加的任务。RUNNING状态的线程池调用shutdown后会进入SHUTDOWN状态。
  • STOP 线程池处于停止状态,不接收任务,不处理已添加的任务,且会中断正在执行任务的线程。RUNNING状态的线程池调用了shutdownNow后会进入STOP状态。
  • TIDYING 当所有任务已终止,且任务数量为0时,线程池会进入TIDYING。当线程池处于SHUTDOWN状态时,阻塞队列中的任务被执行完了,且线程池中没有正在执行的任务了,状态会由SHUTDOWN变为TIDYING。当线程处于STOP状态时,线程池中没有正在执行的任务时则会由STOP变为TIDYING。
  • TERMINATED 线程终止状态。处于TIDYING状态的线程执行terminated()后进入TERMINATED状态。

根据上述线程池生命周期状态的描述,可以画出如下所示的线程池生命周期状态流程示意图。

threadpoollifecycle.png

二、线程池的工作机制

1.ThreadPoolExecutor中的参数

上一小节中,我们使用ThreadPoolExecutor的构造方法来创建了一个线程池。其实在ThreadPoolExecutor中有多个构造方法,但是最终都调用到了下边代码中的这一个构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class ThreadPoolExecutor extends AbstractExecutorService {

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// ...省略校验相关代码

this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

// ...

}

这个构造方法中有7个参数之多,我们逐个来看每个参数所代表的含义:

  • corePoolSize 表示线程池的核心线程数。当有任务提交到线程池时,如果线程池中的线程数小于corePoolSize,那么则直接创建新的线程来执行任务。
  • workQueue 任务队列,它是一个阻塞队列,用于存储来不及执行的任务的队列。当有任务提交到线程池的时候,如果线程池中的线程数大于等于corePoolSize,那么这个任务则会先被放到这个队列中,等待执行。
  • maximumPoolSize 表示线程池支持的最大线程数量。当一个任务提交到线程池时,线程池中的线程数大于corePoolSize,并且workQueue已满,那么则会创建新的线程执行任务,但是线程数要小于等于maximumPoolSize。
  • keepAliveTime 非核心线程空闲时保持存活的时间。非核心线程即workQueue满了之后,再提交任务时创建的线程,因为这些线程不是核心线程,所以它空闲时间超过keepAliveTime后则会被回收。
  • unit 非核心线程空闲时保持存活的时间的单位
  • threadFactory 创建线程的工厂,可以在这里统一处理创建线程的属性
  • handler 拒绝策略,当线程池中的线程达到maximumPoolSize线程数后且workQueue已满的情况下,再向线程池提交任务则执行对应的拒绝策略

2.线程池工作流程

线程池提交任务是从execute方法开始的,我们可以从execute方法来分析线程池的工作流程。

(1)当execute方法提交一个任务时,如果线程池中线程数小于corePoolSize,那么不管线程池中是否有空闲的线程,都会创建一个新的线程来执行任务。

thread_pool1.png

(2)当execute方法提交一个任务时,线程池中的线程数已经达到了corePoolSize,且此时没有空闲的线程,那么则会将任务存储到workQueue中。

thread_pool2.png
(3)如果execute提交任务时线程池中的线程数已经到达了corePoolSize,并且workQueue已满,那么则会创建新的线程来执行任务,但总线程数应该小于maximumPoolSize。
thread_pool3.png

(4)如果线程池中的线程执行完了当前的任务,则会尝试从workQueue中取出第一个任务来执行。如果workQueue为空则会阻塞线程。

thread_pool4.png

(5)如果execute提交任务时,线程池中的线程数达到了maximumPoolSize,且workQueue已满,此时会执行拒绝策略来拒绝接受任务。

thread_pool5.png

(6)如果线程池中的线程数超过了corePoolSize,那么空闲时间超过keepAliveTime的线程会被销毁,但程池中线程个数会保持为corePoolSize。

thread_pool6.png

(7)如果线程池存在空闲的线程,并且设置了allowCoreThreadTimeOut为true。那么空闲时间超过keepAliveTime的线程都会被销毁。

thread_pool7.png

3.线程池的拒绝策略

如果线程池中的线程数达到了maximumPoolSize,并且workQueue队列存储满的情况下,线程池会执行对应的拒绝策略。在JDK中提供了RejectedExecutionHandler接口来执行拒绝操作。实现RejectedExecutionHandler的类有四个,对应了四种拒绝策略。分别如下:

  • DiscardPolicy 当提交任务到线程池中被拒绝时,线程池会丢弃这个被拒绝的任务
  • DiscardOldestPolicy 当提交任务到线程池中被拒绝时,线程池会丢弃等待队列中最老的任务。
  • CallerRunsPolicy 当提交任务到线程池中被拒绝时,会在线程池当前正在运行的Thread线程中处理被拒绝额任务。即哪个线程提交的任务哪个线程去执行。
  • AbortPolicy 当提交任务到线程池中被拒绝时,直接抛出RejectedExecutionException异常。

三、线程池源码分析

从上一章对线程池的工作流程解读来看,线程池的原理似乎并没有很难。但是开篇时我说过想要读懂线程池的源码并不容,主要原因是线程池内部运用到了大量并发相关知识,另外还与线程池中用到的位运算有关。

1.线程池中的位运算(了解内容)

在向线程池提交任务时有两个比较中要的参数会决定任务的去向,这两个参数分别是线程池的状态和线程池中的线程数。在ThreadPoolExecutor内部使用了一个AtomicInteger类型的整数ctl来表示这两个参数,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public class ThreadPoolExecutor extends AbstractExecutorService {
// Integer.SIZE = 32.所以 COUNT_BITS= 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00011111 11111111 11111111 11111111 这个值可以表示线程池的最大线程容量
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 将-1左移29位得到RUNNING状态的值
private static final int RUNNING = -1 << COUNT_BITS;
// 线程池运行状态和线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int ctlOf(int rs, int wc) { return rs | wc; }

// ...
}

因为涉及多线程的操作,这里为了保证原子性,ctl参数使用了AtomicInteger类型,并且通过ctlOf方法来计算出了ctl的初始值。如果你不了解位运算大概很难理解上述代码的用意。

我们知道,int类型在Java中占用4byte的内存,一个byte占用8bit,所以Java中的int类型共占用32bit。对于这个32bit,我们可以进行高低位的拆分。做Android开发的同学应该都了解View测量流程中的MeasureSpec参数,这个参数将32bit的int拆分成了高2位和低30位,分别表示View的测量模式和测量值。而这里的ctl与MeasureSpec类似,ctl将32位的int拆分成了高3位和低29位,分别表示线程池的运行状态和线程池中的线程个数。

下面我们通过位运算来验证一下ctl是如何工作的,当然,如果你不理解这个位运算的过程对理解线程池的源码影响并不大,所以对以下验证内容不感兴趣的同学可以直接略过。

可以看到上述代码中RUNNING的值为-1左移29位,我们知道在计算机中**负数是以其绝对值的补码来表示的,而补码是由反码加1得到。**因此-1在计算机中存储形式为1的反码+1

1
2
3
4
5
java复制代码1的原码:00000000 00000000 00000000 00000001
+
1的反码:11111111 11111111 11111111 11111110
---------------------------------------
-1存储: 11111111 11111111 11111111 11111111

接下来对-1左移29位可以得到RUNNING的值为:

1
2
Java复制代码// 高三位表示线程状态,即高三位为111表示RUNNING
11100000 00000000 00000000 00000000

而AtomicInteger初始线程数量是0,因此ctlOf方法中的“|”运算如下:

1
2
3
4
5
Java复制代码RUNNING:  11100000 00000000 00000000 00000000
|
线程数为0: 00000000 00000000 00000000 00000000
---------------------------------------
得到ctl: 11100000 00000000 00000000 00000000

通过RUNNING|0(线程数)即可得到ctl的初始值。同时还可以通过以下方法将ctl拆解成运行状态和线程数:

1
2
3
4
5
6
java复制代码    // 00011111 11111111 11111111 11111111
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 获取线程池运行状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
// 获取线程池中的线程数
private static int workerCountOf(int c) { return c & COUNT_MASK; }

假设此时线程池为RUNNING状态,且线程数为0,验证一下runStateOf是如何得到线程池的运行状态的:

1
2
3
4
5
6
7
java复制代码 COUNT_MASK:  00011111 11111111 11111111 11111111

~COUNT_MASK: 11100000 00000000 00000000 00000000
&
ctl: 11100000 00000000 00000000 00000000
----------------------------------------
RUNNING: 11100000 00000000 00000000 00000000

如果不理解上边的验证流程没有关系,只要知道通过runStateOf方法可以得到线程池的运行状态,通过workerCountOf可以得到线程池中的线程数即可。

接下来我们进入线程池的源码的源码分析环节。

2.ThreadPoolExecutor的execute

向线程池提交任务的方法是execute方法,execute方法是ThreadPoolExecutor的核心方法,以此方法为入口来进行剖析,execute方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Java复制代码   public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取ctl的值
int c = ctl.get();
// 1.线程数小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 线程池中线程数小于核心线程数,则尝试创建核心线程执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.到此处说明线程池中线程数大于核心线程数或者创建线程失败
if (isRunning(c) && workQueue.offer(command)) {
// 如果线程是运行状态并且可以使用offer将任务加入阻塞队列未满,
// offer是非阻塞操作。
int recheck = ctl.get();
// 重新检查线程池状态,因为上次检测后线程池状态可能发生改变,
// 如果非运行状态就移除任务并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果是运行状态,并且线程数是0,则创建线程
else if (workerCountOf(recheck) == 0)
// 线程数是0,则创建非核心线程,且不指定首次执行任务,这里的第二个参数其实没有实际意义
addWorker(null, false);
}
// 3.阻塞队列已满,创建非核心线程执行任务
else if (!addWorker(command, false))
// 如果失败,则执行拒绝策略
reject(command);
}

execute方法中的逻辑可以分为三部分:

  • 1.如果线程池中的线程数小于核心线程,则直接调用addWorker方法创建新线程来执行任务。
  • 2.如果线程池中的线程数大于核心线程数,则将任务添加到阻塞队列中,接着再次检验线程池的运行状态,因为上次检测过之后线程池状态有可能发生了变化,如果线程池关闭了,那么移除任务,执行拒绝策略。如果线程依然是运行状态,但是线程池中没有线程,那么就调用addWorker方法创建线程,注意此时传入任务参数是null,即不指定执行任务,因为任务已经加入了阻塞队列。创建完线程后从阻塞队列中取出任务执行。
  • 3.如果第2步将任务添加到阻塞队列失败了,说明阻塞队列任务已满,那么则会执行第三步,即创建非核心线程来执行任务,如果非核心线程创建失败那么就执行拒绝策略。

可以看到,代码的执行逻辑和我们在第二章中分析的线程池的工作流程是一样的。

接下来看下execute方法中创建线程的方法addWoker,addWoker方法承担了核心线程和非核心线程的创建,通过一个boolean参数core来区分是创建核心线程还是非核心线程。先来看addWorker方法前半部分的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码   // 返回值表示是否成功创建了线程
private boolean addWorker(Runnable firstTask, boolean core) {
// 这里做了一个retry标记,相当于goto.
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
// 根据core来确定创建最大线程数,超过最大值则创建线程失败,
// 注意这里的最大值可能有三个corePoolSize、maximumPoolSize和线程池线程的最大容量
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 通过CAS来将线程数+1,如果成功则跳出循环,执行下边逻辑
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 线程池的状态发生了改变,退回retry重新执行
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}

// ...省略后半部分

return workerStarted;
}

这部分代码会通过是否创建核心线程来确定线程池中线程数的值,如果是创建核心线程,那么最大值不能超过corePoolSize,如果是创建非核心线程那么线程数不能超过maximumPoolSize,另外无论是创建核心线程还是非核心线程,最大线程数都不能超过线程池允许的最大线程数COUNT_MASK(有可能设置的maximumPoolSize大于COUNT_MASK)。如果线程数大于最大值就返回false,创建线程失败。

接下来通过CAS将线程数加1,如果成功那么就break retry结束无限循环,如果CAS失败了则就continue retry从新开始for循环,注意这里的retry不是Java的关键字,是一个可以任意命名的字符。

接下来,如果能继续向下执行则开始执行创建线程并执行任务的工作了,看下addWorker方法的后半部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码   private boolean addWorker(Runnable firstTask, boolean core) {

// ...省略前半部分

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 实例化一个Worker,内部封装了线程
w = new Worker(firstTask);
// 取出新建的线程
final Thread t = w.thread;
if (t != null) {
// 这里使用ReentrantLock加锁保证线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
// 拿到锁湖重新检查线程池状态,只有处于RUNNING状态或者
// 处于SHUTDOWN并且firstTask==null时候才会创建线程
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 线程不是处于NEW状态,说明线程已经启动,抛出异常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 将线程加入线程队列,这里的worker是一个HashSet
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 开启线程执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

这部分逻辑其实比较容易理解,就是创建Worker并开启线程执行任务的过程,Worker是对线程的封装,创建的worker会被添加到ThreadPoolExecutor中的HashSet中。也就是线程池中的线程都维护在这个名为workers的HashSet中并被ThreadPoolExecutor所管理,HashSet中的线程可能处于正在工作的状态,也可能处于空闲状态,一旦达到指定的空闲时间,则会根据条件进行回收线程。

我们知道,线程调用start后就会开始执行线程的逻辑代码,执行完后线程的生命周期就结束了,那么线程池是如何保证Worker执行完任务后仍然不结束的呢?当线程空闲超时或者关闭线程池又是怎样进行线程回收的呢?这个实现逻辑其实就在Worker中。看下Worker的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码 private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 执行任务的线程
final Thread thread;
// 初始化Worker时传进来的任务,可能为null,如果不空,
// 则创建和立即执行这个task,对应核心线程创建的情况
Runnable firstTask;

Worker(Runnable firstTask) {
// 初始化时设置setate为-1
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 通过线程工程创建线程
this.thread = getThreadFactory().newThread(this);
}

// 线程的真正执行逻辑
public void run() {
runWorker(this);
}

// 判断线程是否是独占状态,如果不是意味着线程处于空闲状态
protected boolean isHeldExclusively() {
return getState() != 0;
}

// 获取锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// ...
}

Worker是位于ThreadPoolExecutor中的一个内部类,它继承了AQS,使用AQS来实现了独占锁的功能,但是并没支持可重入。这里使用不可重入的特性来表示线程的执行状态,即可以通过isHeldExclusively方法来判断,如果是独占状态,说明线程正在执行任务,如果非独占状态,说明线程处于空闲状态。关于AQS我们前边文章中已经详细分析过了,不了解AQS的可以翻看前边ReentrantLock的文章。

另外,Worker还实现了Runnable接口,因此它的执行逻辑就是在run方法中,run方法调用的是线程池中的runWorker(this)方法。任务的执行逻辑就在runWorker方法中,它的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 取出Worker中的任务,可能为空
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task不为null或者阻塞队列中有任务,通过循环不断的从阻塞队列中取出任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
// ...
try {
// 任务执行前的hook点
beforeExecute(wt, task);
try {
// 执行任务
task.run();
// 任务执行后的hook点
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 超时没有取到任务,则回收空闲超时的线程
processWorkerExit(w, completedAbruptly);
}
}

可以看到,runWorker的核心逻辑就是不断通过getTask方法从阻塞队列中获取任务并执行.通过这样的方式实现了线程的复用,避免了创建线程。这里要注意的是这里是一个“生产者-消费者”模式,getTask是从阻塞队列中取任务,所以如果阻塞队列中没有任务的时候就会处于阻塞状态。getTask中通过判断是否要回收线程而设置了等待超时时间,如果阻塞队列中一直没有任务,那么在等待keepAliveTime时间后会返回一个null。最终会走到上述代码的finally方法中,意味着有线程空闲时间超过了keepAliveTime时间,那么调用processWorkerExit方法移除Worker。processWorkerExit方法中没有复杂难以理解的逻辑,这里就不再贴代码了。我们重点看下getTask中是如何处理的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Java复制代码    private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
// ...


// Flag1. 如果配置了allowCoreThreadTimeOut==true或者线程池中的
// 线程数大于核心线程数,则timed为true,表示开启指定线程超时后被回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// ...

try {
// Flag2. 取出阻塞队列中的任务,注意如果timed为true,则会调用阻塞队列的poll方法,
// 并设置超时时间为keepAliveTime,如果超时没有取到任务则会返回null。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

重点看getTask是如何处理空闲超时的逻辑的。我们知道,回收线程的条件是线程大于核心线程数或者配置了allowCoreThreadTimeOut为true,当线程空闲超时的情况下就会回收线程。上述代码在Flag1处先判断了如果线程池中的线程数大于核心线程数,或者开启了allowCoreThreadTimeOut,那么就需要开启线程空闲超时回收。所有在Flag2处,timed为true的情况下调用了阻塞队列的poll方法,并传入了超时时间为keepAliveTime,poll方法是一个阻塞方法,在没有任务时候回进行阻塞。如果在keepAliveTime时间内,没有获取到任务,那么poll方法就会返回null,结束runWorker的循环。进而执行runWorker方法中回收线程的操作。

这里需要我们理解阻塞队列poll方法的使用,poll方法接受一个时间参数,是一个阻塞操作,在给定的时间内没有获取到数据就返回null。poll方法的核心代码如下:

1
2
3
4
5
java复制代码while (count == 0) { 
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}

其实说白了,阻塞队列就是一个使用ReentrantLock实现的“生产者-消费者”模式,我们在深入理解Java线程的等待与唤醒机制(二)这篇文章中使用ReentrantLock实现“生产者-消费者”模型其实就是一个简单的阻塞队列,与JDK中的BlockingQueue实现机制类似。感兴趣的同学可以自己查看ArrayBlockingQueue等阻塞队列的实现,限于文章篇幅,这里就不再赘述了。

3.ThreadPoolExecutor的拒绝策略

上一小节中我们多次提到线程池的拒绝策略,它是在reject方法中实现的。实现代码也非常简单,代码如下:

1
2
3
java复制代码    final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

通过调用handler的rejectedExecution方法实现。这里其实就是运用了策略模式,handler是一个RejectedExecutionHandler类型的成员变量,RejectedExecutionHandler是一个接口,只有一个rejectedExecution方法。在实例化线程池时构造方法中传入对应的拒绝策略实例即可。前文已经提到了Java提供的几种默认实现分别为DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy以及AbortPolicy。

以AbortPolicy直接抛出异常为例,来看下代码实现:

1
2
3
4
5
6
7
8
9
10
java复制代码    public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

可以看到直接在rejectedExecution方法中抛出了RejectedExecutionException来拒绝任务。其他的几个策略实现也都比较简单,有兴趣可以自己查阅代码。

4.ThreadPoolExecutor的shutdown

调用shutdown方法后,会将线程池标记为SHUTDOWN状态,上边execute的源码可以看出,只有线程池是RUNNING状态才接受任务,因此被标记位SHUTDOWN后,再提交任务会被线程池拒绝。shutdown的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码    public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否可以关闭线程
checkShutdownAccess();
// 将线程池状态置为SHUTDOWN状态
advanceRunState(SHUTDOWN);
// 尝试中断空闲线程
interruptIdleWorkers();
// 空方法,线程池关闭的hook点
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}

private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

修改线程池为SHUTDOWN状态后,会调用interruptIdleWorkers去中断空闲线程线程,具体实现逻辑是在interruptIdleWorkers(boolean onlyOne)方法中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码    
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 尝试tryLock获取锁,如果拿锁成功说明线程是空闲状态
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

shutdown的逻辑比较简单,里边做了两件比较重要的事情,即先将线程池状态修改为SHUTDOWN,接着遍历所有Worker,将空闲的Worker进行中断。

五、总结

本文深入的探究了线程池的工作流程和实现原理。就线程池的工作流程而言其实并不难以理解。但是在分析线程池的源码时,如果没有很好的并发基础的话,大概率是难以读懂线程池的源码的。因为线程池内部使用了大量并发知识,对任何一点用到的并发知识认识不到位都会造成理解偏差。写这篇文章参看了很多的其他线程池的相关文章,几乎没有找到一篇能够剖析清楚线程池源码的文章。归根结底还是没能系统的理解Atomic、Lock与AQS、CAS、阻塞队列等并发相关知识。

参考&推荐阅读

tech.meituan.com/2020/04/02/…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

程序员的福音 - Apache Commons Lang

发表于 2021-07-09

此文是系列文章第二篇,前几篇请点击链接查看

程序猿的福音 - Apache Commons简介

Apache Commons Lang是对java.lang的扩展,基本上是commons中最常用的工具包。

目前Lang包有两个commons-lang3和commons-lang。

lang最新版本是2.6,最低要求Java1.2以上,目前官方已不在维护。lang3目前最新版本是3.12.0,最低要求Java8以上。相对于lang来说完全支持Java8的特性,废除了一些旧的API。该版本无法兼容旧有版本,于是为了避免冲突改名为lang3。

Java8以上的用户推荐使用lang3代替lang,下面我们主要以lang3 - 3.12.0版本为例做说明。

以下为整体包结构:

1
2
3
4
5
6
7
8
9
10
11
12
arduino复制代码org.apache.commons.lang3
org.apache.commons.lang3.builder
org.apache.commons.lang3.concurrent
org.apache.commons.lang3.event
org.apache.commons.lang3.exception
org.apache.commons.lang3.math
org.apache.commons.lang3.mutable
org.apache.commons.lang3.reflect
org.apache.commons.lang3.text
org.apache.commons.lang3.text.translate
org.apache.commons.lang3.time
org.apache.commons.lang3.tuple

图片

下面只列举其中常用的加以说明,其余感兴趣的可以自行翻阅源码研究。

  1. 日期相关

在Java8之前,日期只提供了java.util.Date类和java.util.Calendar类,说实话这些API并不是很好用,而且也存在线程安全的问题,所以Java8推出了新的日期API。如果你还在用旧的日期API,可以使用DateUtils和DateFormatUtils工具类。

1. 字符串转日期

1
2
3
4
5
6
7
Java复制代码final String strDate = "2021-07-04 11:11:11";
final String pattern = "yyyy-MM-dd HH:mm:ss";
// 原生写法
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
Date date1 = sdf.parse(strDate);
// commons写法
Date date2 = DateUtils.parseDate(strDate, pattern);

2. 日期转字符串

1
2
3
4
5
6
7
Java复制代码final Date date = new Date();
final String pattern = "yyyy年MM月dd日";
// 原生写法
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
String strDate = sdf.format(date);
// 使用commons写法
String strDate = DateFormatUtils.format(date, pattern);

3. 日期计算

1
2
3
4
5
6
7
8
9
10
11
Java复制代码final Date date = new Date();
// 原生写法
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.add(Calendar.DATE, 5); // 加5天
cal.add(Calendar.HOUR_OF_DAY, -5); // 减5小时
// 使用commons写法
Date newDate1 = DateUtils.addDays(date, 5); // 加5天
Date newDate2 = DateUtils.addHours(date, -5); // 减5小时
Date newDate3 = DateUtils.truncate(date, Calendar.DATE); // 过滤时分秒
boolean isSameDay = DateUtils.isSameDay(newDate1, newDate2); // 判断是否是同一天
  1. 字符串相关

字符串是Java中最常用的类型,相关的工具类也可以说是最常用的,下面直接看例子

1. 字符串判空

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Java复制代码String str = "";
// 原生写法
if (str == null || str.length() == 0) {
// Do something
}
// commons写法
if (StringUtils.isEmpty(str)) {
// Do something
}
/* StringUtils.isEmpty(null) = true
* StringUtils.isEmpty("") = true
* StringUtils.isEmpty(" ") = false
* StringUtils.isEmpty("bob") = false
* StringUtils.isEmpty(" bob ") = false
 */

相关方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Java复制代码// isEmpty取反
StringUtils.isNotEmpty(str);
/*
* null, 空串,空格为true
* StringUtils.isBlank(null) = true
* StringUtils.isBlank("") = true
* StringUtils.isBlank(" ") = true
* StringUtils.isBlank("bob") = false
* StringUtils.isBlank(" bob ") = false
*/
StringUtils.isBlank(str);
// isBlank取反
StringUtils.isNotBlank(str);
// 任意一个参数为空则结果为true
StringUtils.isAnyEmpty(str1, str2, str3);
// 所有参数为空则结果为true
StringUtils.isAllEmpty(str1, str2, str3);

2. 字符串去空格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Java复制代码// 去除两端空格,不需要判断null
String newStr = StringUtils.trim(str);
/*
* 去除两端空格,如果是null则转换为空字符串
* StringUtils.trimToEmpty(null) = ""
* StringUtils.trimToEmpty("") = ""
* StringUtils.trimToEmpty(" ") = ""
* StringUtils.trimToEmpty("abc") = "abc"
* StringUtils.trimToEmpty(" abc ") = "abc"
*/
newStr = StringUtils.trimToEmpty(str);
/*
* 去除两端空格,如果结果是空串则转换为null
* StringUtils.trimToNull(null) = null
* StringUtils.trimToNull("") = null
* StringUtils.trimToNull(" ") = null
* StringUtils.trimToNull("abc") = "abc"
* StringUtils.trimToNull(" abc ") = "abc"
*/
newStr = StringUtils.trimToNull(str);
/*
* 去两端 给定字符串中任意字符
* StringUtils.strip(null, *) = null
* StringUtils.strip("", *) = ""
* StringUtils.strip("abc", null) = "abc"
* StringUtils.strip(" abc", null) = "abc"
* StringUtils.strip("abc ", null) = "abc"
* StringUtils.strip(" abc ", null) = "abc"
* StringUtils.strip(" abcyx", "xyz") = " abc"
*/
newStr = StringUtils.strip(str, "stripChars");
// 去左端 给定字符串中任意字符
newStr = StringUtils.stripStart(str, "stripChars");
// 去右端 给定字符串中任意字符
newStr = StringUtils.stripEnd(str, "stripChars");

3. 字符串分割

1
2
3
4
5
6
7
8
9
10
11
Java复制代码/*
* 按照空格分割字符串 结果为数组
* StringUtils.split(null) = null
* StringUtils.split("") = []
* StringUtils.split("abc def") = ["abc", "def"]
 * StringUtils.split("abc def") = ["abc", "def"]
 * tringUtils.split(" abc ") = ["abc"]
 */
 StringUtils.split(str);
 // 按照某些字符分割 结果为数组,自动去除了截取后的空字符串
 StringUtils.split(str, ",");

4. 取子字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
Java复制代码// 获得"ab.cc.txt"中最后一个.之前的字符串
StringUtils.substringBeforeLast("ab.cc.txt", "."); // ab.cc
// 相似方法
// 获得"ab.cc.txt"中最后一个.之后的字符串(常用于获取文件后缀名)
StringUtils.substringAfterLast("ab.cc.txt", "."); // txt
// 获得"ab.cc.txt"中第一个.之前的字符串
StringUtils.substringBefore("ab.cc.txt", "."); // ab
// 获得"ab.cc.txt"中第一个.之后的字符串
StringUtils.substringAfter("ab.cc.txt", "."); // cc.txt
// 获取"ab.cc.txt"中.之间的字符串
StringUtils.substringBetween("ab.cc.txt", "."); // cc
// 看名字和参数应该就知道干什么的了
StringUtils.substringBetween("a(bb)c", "(", ")"); // bb

5. 其他

1
2
3
4
5
6
7
8
9
10
11
Java复制代码// 首字母大写
StringUtils.capitalize("test"); // Test
// 字符串合并
StringUtils.join(new int[]{1,2,3}, ",");// 1,2,3
// 缩写
StringUtils.abbreviate("abcdefg", 6);// "abc..."
// 判断字符串是否是数字
StringUtils.isNumeric("abc123");// false
// 删除指定字符
StringUtils.remove("abbc", "b"); // ac
// ... ... 还有很多,感兴趣可以自己研究

6. 随机字符串

1
2
3
4
5
6
7
8
Java复制代码// 随机生成长度为5的字符串
RandomStringUtils.random(5);
// 随机生成长度为5的"只含大小写字母"字符串
RandomStringUtils.randomAlphabetic(5);
// 随机生成长度为5的"只含大小写字母和数字"字符串
RandomStringUtils.randomAlphanumeric(5);
// 随机生成长度为5的"只含数字"字符串
RandomStringUtils.randomNumeric(5);
  1. 反射相关

反射是Java中非要重要的特性,原生的反射API代码冗长,Lang包中反射相关的工具类可以很方便的实现反向相关功能,下面看例子

1. 属性操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Java复制代码public class ReflectDemo {
private static String sAbc = "111";
private String abc = "123";
public void fieldDemo() throws Exception {
ReflectDemo reflectDemo = new ReflectDemo();
// 反射获取对象实例属性的值
// 原生写法
Field abcField = reflectDemo.getClass().getDeclaredField("abc");
        abcField.setAccessible(true);// 设置访问级别,如果private属性不设置则访问会报错
String value = (String) abcField.get(reflectDemo);// 123
// commons写法
String value2 = (String) FieldUtils.readDeclaredField(reflectDemo, "abc", true);//123
// 方法名如果不含Declared会向父类上一直查找
}
}

注:方法名含Declared的只会在当前类实例上寻找,不包含Declared的在当前类上找不到则会递归向父类上一直查找。

相关方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Java复制代码public class ReflectDemo {
private static String sAbc = "111";
private String abc = "123";
public void fieldRelated() throws Exception {
ReflectDemo reflectDemo = new ReflectDemo();
// 反射获取对象属性的值
String value2 = (String) FieldUtils.readField(reflectDemo, "abc", true);//123
// 反射获取类静态属性的值
String value3 = (String) FieldUtils.readStaticField(ReflectDemo.class, "sAbc", true);//111
// 反射设置对象属性值
FieldUtils.writeField(reflectDemo, "abc", "newValue", true);
// 反射设置类静态属性的值
FieldUtils.writeStaticField(ReflectDemo.class, "sAbc", "newStaticValue", true);
}
}

2. 获取注解方法

1
2
3
4
5
6
7
8
9
10
11
Java复制代码// 获取被Test注解标识的方法

// 原生写法
List<Method> annotatedMethods = new ArrayList<Method>();
for (Method method : ReflectDemo.class.getMethods()) {
    if (method.getAnnotation(Test.class) != null) {
        annotatedMethods.add(method);
    }
}
// commons写法
Method[] methods = MethodUtils.getMethodsWithAnnotation(ReflectDemo.class, Test.class);

3. 方法调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Java复制代码private static void testStaticMethod(String param1) {}
private void testMethod(String param1) {}
  
public void invokeDemo() throws Exception {
    // 调用函数"testMethod"
    ReflectDemo reflectDemo = new ReflectDemo();
    // 原生写法
    Method testMethod = reflectDemo.getClass().getDeclaredMethod("testMethod");
    testMethod.setAccessible(true); // 设置访问级别,如果private函数不设置则调用会报错
    testMethod.invoke(reflectDemo, "testParam");
    // commons写法
    MethodUtils.invokeExactMethod(reflectDemo, "testMethod", "testParam");
    
    // ---------- 类似方法 ----------
    // 调用static方法
    MethodUtils.invokeExactStaticMethod(ReflectDemo.class, "testStaticMethod", "testParam");
    // 调用方法(含继承过来的方法)
    MethodUtils.invokeMethod(reflectDemo, "testMethod", "testParam");
    // 调用static方法(当前不存在则向父类寻找匹配的静态方法)
    MethodUtils.invokeStaticMethod(ReflectDemo.class, "testStaticMethod", "testParam");
}
其他还有ClassUtils,ConstructorUtils,TypeUtils等不是很常用,有需求的可以现翻看类的源码。
  1. 系统相关

主要是获取操作系统和JVM一些信息,下面看例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Java复制代码// 判断操作系统类型
boolean isWin = SystemUtils.IS_OS_WINDOWS;
boolean isWin10 = SystemUtils.IS_OS_WINDOWS_10;
boolean isWin2012 = SystemUtils.IS_OS_WINDOWS_2012;
boolean isMac = SystemUtils.IS_OS_MAC;
boolean isLinux = SystemUtils.IS_OS_LINUX;
boolean isUnix = SystemUtils.IS_OS_UNIX;
boolean isSolaris = SystemUtils.IS_OS_SOLARIS;
// ... ...

// 判断java版本
boolean isJava6 = SystemUtils.IS_JAVA_1_6;
boolean isJava8 = SystemUtils.IS_JAVA_1_8;
boolean isJava11 = SystemUtils.IS_JAVA_11;
boolean isJava14 = SystemUtils.IS_JAVA_14;
// ... ...

// 获取java相关目录
File javaHome = SystemUtils.getJavaHome();
File userHome = SystemUtils.getUserHome();// 操作系统用户目录
File userDir = SystemUtils.getUserDir();// 项目所在路径
File tmpDir = SystemUtils.getJavaIoTmpDir();
  1. 总结

除了以上介绍的工具类外,还有其他不是很常用的就不多做介绍了。感兴趣的可以自行翻阅源码研究。

后续章节我将继续给大家介绍commons中其他好用的工具类库,期待你的关注。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Serverless真的是云计算未来发展方向吗?

发表于 2021-07-09

本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!

前言

前几天一个非常偶然的机会,通过线上直播听了GMTC峰会上几个互联网头部大厂的大神对Serverless落地实践的分享,不说收获颇丰,起码摆脱了先前对Servlerless只有肤浅的概念层次的认知。后续又对腾讯云的SCF做了个大致了解,突然有了一个很特立独行的想法,Serverless真的如大神所说的是云计算未来发展的方向吗?本人目前接触Servless不是太深,可能见解有些片面甚至主观,以下纯属个人观点:对症则为良方,反之或为毒药。

到底什么是Serverless?

从字面也可以做个大概了解,Serverless=Server+Less也就是无服务器。这里的无服务器不是真的没有服务器,只是不需要有自己运维的服务器。为了便于理解,这里的简要说下服务器的一个发展历程,从最开始物理机部署,到vm虚拟机,Docker容器,到云计算,再到当下的Serverless

2.png
云原生的概念裹挟着k8s汹涌而来,当下云主机,云服务,Xaas概念也相当火爆。从云计算的三大结构层IAAS,PAAS,SAAS看,其本质实际上是硬件,基本组件和中间件的不断下沉融合的云化过程,似乎就差最后一个业务层的云化了,所以Serverless横空出世,并且带了主角光环。那什么是Serverless呢?更被业内广泛接受的解释是Faas和Baas的组合。Faas云函数,提供逻辑处理,而 Baas云后台,提供数据&文件的存储,消息推送等中间件服务,结合后俨然一个云微服务。Serverless以其粒度更细,更廉价更便捷,按需提供服务备受各大云计算厂商推崇!

Serverless到底能给我们带来什么?

1.开发者不再需要关心底层的服务器资源和运维

2.快速搭建服务上线,按需使用资源,降低服务器成本

3.提升前后端协作的效率

我们是否真的需要或什么时候需要Serverless?

降本提效的主旨,水到渠成的发展过程,似乎预示着Serverless将大行其道。的确对于个人开发者或者规模较小的初创公司,初期缺乏强大的资金实力的支撑加上新业务模式的市场预期的不明朗,贸然投入大量服务器资源和运维成本,很可能成为压倒骆驼的最后一根稻草,导致产品倒在了黎明前的黑暗里。而Serverless的支撑快速搭建应用,并且有多少流量付多少钱,不再需要公司投入高昂的服务器成本,帮助技术配合业务快速迭代试错,加快抢占市场的先机,终于迎来了黎明的曙光,产品也进入快速成长期。随之而来的产品的各种业态,操作模式层出不穷,业务复杂度的几何级上升。而Serverless的函数云化特性,起码目前来看对后端工程化特别是整体架构设计不够友好,很容易形成烟囱式的代码结构,稍有不慎就会给后续优化和迭代挖坑。

另外就是目前来看Serverless本身存在一个比较致命的问题,那就是没有形成统一的业内标准。各大厂商都发力提供更快速的加载,覆盖更多的语言,提供更便捷的SDK等等用以快速抢占用户,扩大市场份额。在商言商无可厚非,但某些方面看这增加了兼容的壁垒,更难形成统一的标准。一旦使用了A厂商的Serverless服务,因为标准不统一就不能无损使用B厂商的服务了,而且Serverless云函数内部出于安全考虑不允许相关外网络操作。而达到一定规模的公司,各种云的混部非常常见,无论从经济上还是从安全上考虑也不会把自己强捆绑到一艘船上,一损俱损是不能接受的。

写在最后

以上纯属个人的一些看法,Serverless作为新兴的理念,随之云计算的普及和发展,必定未来可期!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

你还在统一返回 ApiResult 吗?✋ duck 不必,

发表于 2021-07-09

分享 Java、.NET、Javascript、效率、软件工程、编程语言等技术知识。

本文 GitHub TechShare 已收录,没有广告,Just For Fun。

为什么写这篇文章?

相信不少 Java 开发都在项目中使用过类似 ApiResult 这样的对象来包装 Api 返回类型,这相比什么都不包装有一定的好处,但这真的就是最好的做法吗?

关于封装 ResultBean 对象,晓风轻在他的 程序员你为什么这么累 系列文章中有过不错的分享,但统一封装 ResultBean 实际上也是一种重复工作,秉承 DRY 的理念,还有必要对其继续优化。

统一返回 ApiResult 还不是最佳实践,必须不断思考优化,就像 React 所提倡的 Rethinking Best Practices 。

ApiResult 现状

我们先看一个常见的 ApiResult 对象,代码如下:

1
2
3
4
5
6
java复制代码@Data
public class ApiResult<T> implements Serializable {
private int code;
private String message;
private T data;
}

好处:客户端可以使用统一的处理方式。

存在的问题:

  1. 在统一返回 ApiResult 的情况下,即使是正常返回,也会带上 code、message 属性,多数情况 下属于 冗余 。
  2. Controller 层代码存在重复,返回对象重复定义、包装调用编写重复,代码整洁度下降 。
  3. 统一返回 200 状态码不利于 请求监控 。
  4. ApiResult 同时承担了Api结果和错误结果的职责,不符合 单一职责 原则。

像下面这样一段获取列表数据的代码,若不涉及业务预期内的请求验证,是没必要包装一层 ApiResult 的,什么是业务预期内的验证呢,举个例子,比如非会员无法获取列表,业务上需要提醒用户购买会员,这属于合法请求,此时仍然可以使用 ApiResult 携带 code 明确返回给客户端。

1
2
3
java复制代码public ApiResult<List<Data>> demo() {
return ApiResult.ok(getList());
}

ApiResult 要根据业务场景使用,不需要每个场景都使用它。

当 API 越来越多时,统一返回 ApiResult 的问题会被放大,如何解决这些问题呢?请接着看。

使用 HTTP 状态码

有许多项目采用的方式是,在 API 调用成功时使用正常的数据模型,而在出现错误时,返回相应的 HTTP 错误码 和描述信息。我们看一段 jhipster 中的代码:

1
2
3
4
5
java复制代码@GetMapping("/authors/{id}")
public ResponseEntity<AuthorDTO> getAuthor(@PathVariable Long id) {
Optional<AuthorDTO> authorDTO = authorService.findOne(id);
return ResponseUtil.wrapOrNotFound(authorDTO);
}

主要 HTTP 状态码的含义:

  • 1XX – Informational
  • 2XX – Success
  • 3XX – Redirection
  • 4XX – Client Error
  • 5XX – Server Error

采用 HTTP 状态码就不再需要统一返回 ApiResult ,但问题也随之而来,那就是 ApiResult 中定义的 error code 很难跟 HTTP 错误码一一对应,光有 HTTP 错误码和描述信息是不够的,还需要定义专门的错误模型。

API 错误模型

如何定义一个好的 API 错误模型,这需要根据 业务的复杂程度 来定,我们先来看看几个 Big Company 都是怎么做的。

先看 twitter 的,其中省略了无关的 HTTP 输出信息。

1
2
3
json复制代码HTTP/1.1 400 Bad Request

{"errors":[{"code":215,"message":"Bad Authentication data."}]}

使用了错误码,并且错误模型是一个数组,意味着可能会返回多个错误。

再来看 Facebook 的 Graph API。

1
2
3
4
5
6
7
8
9
10
json复制代码HTTP/1.1 200

{
"error": {
"message": "Syntax error \"Field picture specified more than once. This is only possible before version 2.1\" at character 23: id,name,picture,picture",
"type": "OAuthException",
"code": 2500,
"fbtrace_id": "xxxxxxxxxxx"
}
}

注意,其返回的是统一的 200 状态码,错误模型中还包含 异常类型 和 trace_id,这两个属性有助于排查错误。

最后看看巨头微软 Bing 的错误模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
json复制代码HTTP/1.1 200

{
"SearchResponse": {
"Version": "2.2",
"Query": { "SearchTerms": "api error codes" },
"Errors": [
{
"Code": 1001,
"Message": "Required parameter is missing.",
"Parameter": "SearchRequest.AppId",
"HelpUrl": "http\u003a\u002f\u002fmsdn.microsoft.com\u002fen-us\u002flibrary\u002fdd251042.aspx"
}
]
}
}

其返回的也是 200 状态码,但可以看到它使用了类似 ApiResult 的包装方式,并且还包含了 输入信息、输入参数 和 帮助链接 ,原来这就是 大佬 的做事方式吗?

果然 API 错误模型的设计,根据业务复杂程序的不同,实现起来也不太一样,这三个中,我们参考 twitter 的 API 设计 来看看在 Spring 项目中实现起来有哪些需要注意的,毕竟绝大多数项目的复杂度都达不到 FB 和 Bing 的程度。

Spring API 错误模型实战

错误模型的定义是非常简单的,代码如下。

ErrorResponse.java

1
2
3
4
java复制代码@Data
public class ErrorResponse implements Serializable {
private ErrorDetail error;
}

ErrorDetail.java

1
2
3
4
5
6
java复制代码@Data
public class ErrorDetail implements Serializable {
private int code;
private String message;
private String type;
}

错误详情中增加了一个 type 属性,可以帮助更好地定位到异常。

在 Controller 层编写时至需要返回正常的数据模型,如 List、VO、DTO 之类。

异常使用 AOP 的方式来处理。

编写一个 ControllerAdvice 类,。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
java复制代码@ControllerAdvice
@ResponseBody
@Slf4j
public class CustomExceptionHandler {

@ExceptionHandler(value = Exception.class)
public ResponseEntity<ErrorResponse> exceptionHandler(Exception exception) {
return serverErrorResponse(ApiCode.SYSTEM_EXCEPTION, exception);
}

private ResponseEntity<ErrorResponse> serverErrorResponse(ApiCode apiCode, Exception exception) {
String message = apiCode.getMessage();
//服务端异常需要记录日志
log.error(message, exception);
//服务端异常使用api code中的message,避免敏感异常信息发送到客户端
return new ResponseEntity<>(errorResponse(apiCode, ErrorMessageType.API_CODE, exception), HttpStatus.INTERNAL_SERVER_ERROR);
}

private ResponseEntity<ErrorResponse> requestErrorResponse(ApiCode apiCode, Exception exception) {
String message = apiCode.getMessage();
//客户端请求错误只记录debug日志
if (log.isDebugEnabled()) {
log.debug(message, exception);
}
//客户端异常使用异常中的message
return new ResponseEntity<>(errorResponse(apiCode, ErrorMessageType.EXCEPTION, exception), HttpStatus.BAD_REQUEST);
}

private ErrorResponse errorResponse(ApiCode code, ErrorMessageType messageType, Exception exception) {
ErrorDetail errorDetail = new ErrorDetail();
errorDetail.setCode(code.getCode());
if (messageType.equals(ErrorMessageType.API_CODE) || StrUtil.isBlank(exception.getMessage())) {
errorDetail.setMessage(code.getMessage());
} else {
errorDetail.setMessage(exception.getMessage());
}
errorDetail.setType(exception.getClass().getSimpleName());

ErrorResponse errorResponse = new ErrorResponse();
errorResponse.setError(errorDetail);
return errorResponse;
}

@ExceptionHandler(value = RequestVerifyException.class)
public ResponseEntity<ErrorResponse> requestVerifyExceptionHandler(RequestVerifyException e) {
return requestErrorResponse(ApiCode.PARAMETER_EXCEPTION, e);
}

}

上面的代码只放了两个 ExceptionHandler ,一个是针对 请求验证错误 ,一个是针对 未知服务器错误 ,分别对应的是 400 和 500 的 HTTP 状态码。需要对其他异常做专门处理,也仍然是使用以上的公共 errorResponse 方法,就看异常被定义为 请求异常 还是 服务端异常 。

至此,API 就能返回 "漂亮" 的错误模型了。

结束了吗?

先别走,还没结束呢,如果正常和错误情况下返回的数据模型不一样,那接口文档该如何定义呢?如果使用了 swagger ,那么我们需要添加针对 400 和 500 状态码的 全局输出模型。

在最新版本的 springfox 中要实现起来还是有点费劲的,来看部分代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@Bean
public Docket createRestApi(TypeResolver typeResolver) {
//附加错误模型
Docket builder = new Docket(DocumentationType.SWAGGER_2)
.host(swaggerProperties.getHost())
.apiInfo(apiInfo(swaggerProperties))
.additionalModels(typeResolver.resolve(ErrorResponse.class));

//添加400错误码输出模型
List<Response> responseMessages = new ArrayList<>();
ResponseBuilder responseBuilder = new ResponseBuilder();
responseBuilder.code("400").description("");
if (!StringUtils.isEmpty(globalResponseMessageBody.getModelRef())) {
responseBuilder.representation(MediaType.APPLICATION_JSON)
.apply(rep -> rep.model(m -> m.referenceModel(
re -> re.key(key->key.qualifiedModelName(new QualifiedModelName("com.package.api","ErrorResponse")))
)));
}
responseMessages.add(responseBuilder.build());

builder.useDefaultResponseMessages(false)
.globalResponses(HttpMethod.GET, responseMessages)
.globalResponses(HttpMethod.POST, responseMessages);

return builder.select().build();
}

以上仅为部分代码,主要在于 需要附加模型 并指定输出模型,在实际项目中应该将模型信息放在配置当中,根据配置自动添加,关于 swagger 的自动配置,若读者朋友感兴趣,可以有机会专门写篇文章来讲解。

写在最后

在每个接口中返回统一的 ApiResult,笔者觉得是一件挺无聊的事情,写程序应该是一件能发挥创造力的事情。不断去思考最佳实践,学习优秀的设计,这件小小的事情,我们在工作当中几乎每天都会碰到,它是值得被改进的。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Unity零基础到进阶 ☀️ Unity中的对象池技术 O

发表于 2021-07-09

「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」

对象池

1.对象池简单定义

对象池是一种Unity经常用到的内存管理服务,针对需要经常生成消失的对象,作用在于可以减少创建每个对象的系统开销。我们在对象需要消失的时候不Destroy而是SetActive(false),然后放入池子中(Queue),当需要再次显示一个新的对象的时候,先去池子中看有没有隐藏的对象,有就取出SetActive(true),若池子里没有可用的则再Instantiate。

2.使用对象池的原因

在Unity游戏开发的过程中经常会创建一些新的对象,如果数量较少还可以接受,如果创建的新对象数量庞大,那么对内存而言是一个极大的隐患。例如射击游戏当中,每发射一颗子弹,都要创建一个新的子弹对象,那么子弹是数量庞大,可想而知一场游戏当中会创建多少这样的新对象,那么如果这些子弹创建之后都对游戏起着关键且持续性的作用也无可厚非,问题是子弹发射完成之后,几秒之后就不再拥有任何的意义,一般会将它自动的隐藏,也就是我们所说的SetActive(false),因此大量的非活跃对象出现在游戏场景当中。

由于对象池中的对象只是SetActive(false)了,并非真正销毁了,所以一般运用于游戏中经常用到或需大量生成的物体。并不能将所有对象都用此技术,如若不然便是得不偿失了。

3.创建并使用一个简单对象池

首先我们先生成一个对象池的class,里面的内容就很简单,首先有一个Queue用来存放池子中的对象,然后实现两个方法,一个取对象,一个放对象。取对象的时候,若池子中有可用对象则取出一个,若没有则Instantiate一个;放对象即将对象SetActive(false)并且放入池子中。
下图是一个简单的效果
在这里插入图片描述

首先有一个Singleton.cs 用于将其他脚本写成单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
csharp复制代码namespace Utilty
{
public class Singleton<T>
{
private static T instance;
public static T GetInstance()
{
if (instance == null)
{
instance = (T)Activator.CreateInstance(typeof(T), true);
}
return instance;
}
}
}

AssetsManager.cs 用于加载生成游戏对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
csharp复制代码using System.Collections.Generic;
using UnityEngine;

//通用框架
namespace Utilty
{
public class AssetsManager : Singleton<AssetsManager>
{
protected AssetsManager()
{
assetsCache = new Dictionary<string, Object>();
}
//缓存字典
private Dictionary<string, Object> assetsCache;

//获取资源
public virtual T GetAssets<T>(string path) where T : Object
{
//先查看缓存池中有没有这个资源
if (assetsCache.ContainsKey(path))
{
//直接将这个资源返回
return assetsCache[path] as T;
}
else
{
//通过Resource.Load去加载资源
T assets = Resources.Load<T>(path);
//将新资源加载到缓存池里
assetsCache.Add(path,assets);
//返回资源
return assets;
}
}
//卸载未使用的资源
public void UnloadUnusedAssets()
{
//卸载
Resources.UnloadUnusedAssets();
}
}
//加载图片
public class SpriteManager : Singleton<SpriteManager>
{
}
public class PrefabManager : Singleton<PrefabManager>
{
private PrefabManager() { }
public GameObject CreateGameObjectByPrefab(string path)
{
//获取预设体
GameObject prefab = AssetsManager.GetInstance().GetAssets<GameObject>(path);
//生成
GameObject obj = Object.Instantiate(prefab);
// 返回
return obj;
}
public GameObject CreateGameObjectByPrefab(string path, Vector3 pos, Quaternion qua)
{
//生成对象
GameObject obj = CreateGameObjectByPrefab(path);
//设置坐标和旋转
obj.transform.position = pos;
obj.transform.rotation = qua;
//返回
return obj;
}

public GameObject CreateGameObjectByPrefab(string path, Transform parent, Vector3 localPos, Quaternion localQua)
{
//生成对象
GameObject obj = CreateGameObjectByPrefab(path);
//设置父物体
obj.transform.SetParent(parent);
//设置坐标和旋转
obj.transform.localPosition = localPos;
obj.transform.localRotation = localQua;
//返回
return obj;
}
}
}

ObjectPool.cs 用于通过对象池 生成和回收 游戏对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
csharp复制代码using System.Collections.Generic;
using UnityEngine;

namespace Utilty
{
public class ObjectPool : Singleton<ObjectPool>
{
//私有构造
private ObjectPool()
{
pool = new Dictionary<string, List<GameObject>>();
}

//对象池
private Dictionary<string, List<GameObject>> pool;
//通过对象池生成游戏对象
public GameObject SpawnObject(string name)
{
GameObject needObj;
//查看是否有该名字所对应的子池,且子池中有对象
if (pool.ContainsKey(name) && pool[name].Count > 0)
{
//将0号对象返回
needObj = pool[name][0];
//将0号对象从List中移除
pool[name].RemoveAt(0);
}
else
{
//直接通过Instantiate生成
needObj = PrefabManager.GetInstance().CreateGameObjectByPrefab(name);
//修改名称(去掉Clone)
needObj.name = name;
}
//设置为激活
needObj.SetActive(true);
//返回
return needObj;
}

/// <summary>
/// 回收游戏对象到对象池
/// </summary>
/// <param name="Objname"></param>
public void RecycleObj(GameObject Objname)
{
//防止被看到,设置为非激活哦
Objname.SetActive(false);
if (pool.ContainsKey(Objname.name))
{
//将当前对象放入对象子池
pool[Objname.name].Add(Objname);
}
else
{
//创建该子池并将对象放入
pool.Add(Objname.name, new List<GameObject> { Objname });
}
}
}
}

PoolObject.cs 用于延迟回收对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
csharp复制代码using System.Collections;
using UnityEngine;
using Utilty;

public class PoolObject : MonoBehaviour
{
private void OnEnable()
{
StartCoroutine(DelayRececle(2));
}
//延迟回收协程
IEnumerator DelayRececle(float interval)
{
//等待几秒
yield return new WaitForSeconds(interval);
//回收当前对象
ObjectPool.GetInstance().RecycleObj(gameObject);
}
}

Demo.cs 用于简单的使用对象池技术

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
csharp复制代码using UnityEngine;
using Utilty;

public class Demo : MonoBehaviour
{
private void Update()
{
if (Input.GetKeyDown(KeyCode.J))
{
GameObject go = ObjectPool.GetInstance().SpawnObject("Prefab");
go.transform.position = Vector3.zero;
go.transform.rotation = Quaternion.identity;
go.GetComponent<Rigidbody>().velocity = Vector3.zero;
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Kubernetes Pod 垂直自动伸缩(VPA) 1 V

发表于 2021-07-09

本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战

1 VPA 简介

VPA 全称 Vertical Pod Autoscaler,即垂直 Pod 自动扩缩容,它根据容器资源使用率自动设置 CPU 和 内存 的requests,从而允许在节点上进行适当的调度,以便为每个 Pod 提供适当的资源。
它既可以缩小过度请求资源的容器,也可以根据其使用情况随时提升资源不足的容量。
PS: VPA不会改变Pod的资源limits值。

废话不多说,直接上图,看VPA工作流程

2 部署metrics-server

2.1 下载部署清单文件

1
bash复制代码[root@VM-10-48-centos ~]#  wget https://github.com/kubernetes-sigs/metrics-server/releases/download/v0.3.7/components.yaml

2.2 修改components.yaml文件

  • 修改了镜像地址为:scofield/metrics-server:v0.3.7
  • 修改了metrics-server启动参数args
1
2
3
4
5
6
7
8
9
yaml复制代码- name: metrics-server
image: scofield/metrics-server:v0.3.7
imagePullPolicy: IfNotPresent
args:
- --cert-dir=/tmp
- --secure-port=4443
- /metrics-server
- --kubelet-insecure-tls
- --kubelet-preferred-address-types=InternalIP

2.3 执行部署

1
bash复制代码[root@VM-10-48-centos ~]# kubectl  apply -f components.yaml

2.4 验证

1
2
3
4
5
6
7
8
9
10
bash复制代码[root@VM-10-48-centos ~]# kubectl get po -n kube-system | grep metrics-server
metrics-server-5b58f4df77-f7nks 1/1 Running 0 35d

# 能获取要top信息视为成功
[root@VM-10-48-centos ~]# kubectl top nodes
NAME CPU(cores) CPU% MEMORY(bytes) MEMORY%
10.1.2.15 138m 3% 4207Mi 29%
10.1.2.16 159m 4% 3138Mi 45%
10.1.2.17 147m 3% 4118Mi 59%
10.1.50.2 82m 4% 1839Mi 55%

3 部署vertical-pod-autoscaler

3.1 克隆autoscaler项目

1
bash复制代码[root@VM-10-48-centos ~]# git clone https://github.com/kubernetes/autoscaler.git

3.2 修改部署文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bash复制代码[root@VM-10-48-centos ~]# cd autoscaler/vertical-pod-autoscaler/deploy
admission-controller-deployment.yaml
us.gcr.io/k8s-artifacts-prod/autoscaling/vpa-admission-controller:0.8.0
改为
scofield/vpa-admission-controller:0.8.0

recommender-deployment.yaml
us.gcr.io/k8s-artifacts-prod/autoscaling/vpa-recommender:0.8.0
改为
image: scofield/vpa-recommender:0.8.0

updater-deployment.yaml
us.gcr.io/k8s-artifacts-prod/autoscaling/vpa-updater:0.8.0
改为
scofield/vpa-updater:0.8.0

3.3 部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
bash复制代码[root@VM-10-48-centos ~]# cd autoscaler/vertical-pod-autoscaler
[root@VM-10-48-centos ~]# ./hack/vpa-up.sh
customresourcedefinition.apiextensions.k8s.io/verticalpodautoscalers.autoscaling.k8s.io created
customresourcedefinition.apiextensions.k8s.io/verticalpodautoscalercheckpoints.autoscaling.k8s.io created
clusterrole.rbac.authorization.k8s.io/system:metrics-reader created
clusterrole.rbac.authorization.k8s.io/system:vpa-actor created
clusterrole.rbac.authorization.k8s.io/system:vpa-checkpoint-actor created
clusterrole.rbac.authorization.k8s.io/system:evictioner created
clusterrolebinding.rbac.authorization.k8s.io/system:metrics-reader created
clusterrolebinding.rbac.authorization.k8s.io/system:vpa-actor created
clusterrolebinding.rbac.authorization.k8s.io/system:vpa-checkpoint-actor created
clusterrole.rbac.authorization.k8s.io/system:vpa-target-reader created
clusterrolebinding.rbac.authorization.k8s.io/system:vpa-target-reader-binding created
clusterrolebinding.rbac.authorization.k8s.io/system:vpa-evictionter-binding created
serviceaccount/vpa-admission-controller created
clusterrole.rbac.authorization.k8s.io/system:vpa-admission-controller created
clusterrolebinding.rbac.authorization.k8s.io/system:vpa-admission-controller created
clusterrole.rbac.authorization.k8s.io/system:vpa-status-reader created
clusterrolebinding.rbac.authorization.k8s.io/system:vpa-status-reader-binding created
serviceaccount/vpa-updater created
deployment.apps/vpa-updater created
serviceaccount/vpa-recommender created
deployment.apps/vpa-recommender created
Generating certs for the VPA Admission Controller in /tmp/vpa-certs.
Generating RSA private key, 2048 bit long modulus (2 primes)
............................................................................+++++
.+++++
e is 65537 (0x010001)
Generating RSA private key, 2048 bit long modulus (2 primes)
............+++++
...........................................................................+++++
e is 65537 (0x010001)
Signature ok
subject=CN = vpa-webhook.kube-system.svc
Getting CA Private Key
Uploading certs to the cluster.
secret/vpa-tls-certs created
Deleting /tmp/vpa-certs.
deployment.apps/vpa-admission-controller created
service/vpa-webhook created

这里如果出现错误:ERROR: Failed to create CA certificate for self-signing. If the error is “unknown option -addext”, update your openssl version or deploy VPA from the vpa-release-0.8 branch

需要升级openssl的版本解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bash复制代码[root@VM-10-48-centos ~]# yum install gcc gcc-c++ -y
[root@VM-10-48-centos ~]# openssl version -a
[root@VM-10-48-centos ~]# wget https://www.openssl.org/source/openssl-1.1.1k.tar.gz && tar zxf openssl-1.1.1k.tar.gz && cd openssl-1.1.1k
[root@VM-10-48-centos ~]# ./config
[root@VM-10-48-centos ~]# make && make install
[root@VM-10-48-centos ~]# mv /usr/local/bin/openssl /usr/local/bin/openssl.bak
[root@VM-10-48-centos ~]# mv apps/openssl /usr/local/bin
[root@VM-10-48-centos ~]# openssl version -a
OpenSSL 1.1.1k 25 Mar 2021 (Library: OpenSSL 1.1.1g FIPS 21 Apr 2020)
built on: Mon Mar 29 23:48:12 2021 UTC
platform: linux-x86_64
options: bn(64,64) rc4(16x,int) des(int) idea(int) blowfish(ptr)
compiler: gcc -fPIC -pthread -m64 -Wa,--noexecstack -Wall -O3 -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -Wa,--noexecstack -Wa,--generate-missing-build-notes=yes -DOPENSSL_USE_NODELETE -DL_ENDIAN -DOPENSSL_PIC -DOPENSSL_CPUID_OBJ -DOPENSSL_IA32_SSE2 -DOPENSSL_BN_ASM_MONT -DOPENSSL_BN_ASM_MONT5 -DOPENSSL_BN_ASM_GF2m -DSHA1_ASM -DSHA256_ASM -DSHA512_ASM -DKECCAK1600_ASM -DRC4_ASM -DMD5_ASM -DAESNI_ASM -DVPAES_ASM -DGHASH_ASM -DECP_NISTZ256_ASM -DX25519_ASM -DPOLY1305_ASM -DZLIB -DNDEBUG -DPURIFY -DDEVRANDOM="\"/dev/urandom\""
OPENSSLDIR: "/etc/pki/tls"
ENGINESDIR: "/usr/lib64/engines-1.1"
Seeding source: os-specific

再次执行vertical-pod-autoscaler/pkg/admission-controller/gencerts.sh

3.4 查看结果

可以看到metrics-server和vpa都已经正常运行了

1
2
3
4
5
bash复制代码[root@VM-10-48-centos ~]# kubectl get po -n kube-system | grep -E "metrics-server|vpa"
metrics-server-5b58f4df77-f7nks 1/1 Running 0 35d
vpa-admission-controller-7ff888c959-tvtmk 1/1 Running 0 104m
vpa-recommender-74f69c56cb-zmzwg 1/1 Running 0 104m
vpa-updater-79b88f9c55-m4xx5 1/1 Running 0 103m

4 示例

4.1 updateMode: Off

1 首先我们部署一个nginx服务,部署到namespace: vpa中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
yaml复制代码apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: nginx
name: nginx
namespace: vpa
spec:
replicas: 2
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- image: nginx
name: nginx
resources:
requests:
cpu: 100m
memory: 250Mi

看下结果,正常运行了2个pod

1
2
3
4
bash复制代码[root@VM-10-48-centos ~]# kubectl get po -n vpa
NAME READY STATUS RESTARTS AGE
nginx-59fdffd754-cb5dn 1/1 Running 0 8s
nginx-59fdffd754-cw8d7 1/1 Running 0 9s

2 创建一个NodePort类型的service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
bash复制代码[root@VM-10-48-centos ~]# cat svc.yaml 
apiVersion: v1
kind: Service
metadata:
name: nginx
namespace: vpa
spec:
type: NodePort
ports:
- port: 80
targetPort: 80
selector:
app: nginx

[root@VM-10-48-centos ~]# kubectl get svc -n vpa | grep nginx
nginx NodePort 10.255.253.166 <none> 80:30895/TCP 54s

[root@VM-2-16-centos ~]# curl -I 10.1.2.16:30895
HTTP/1.1 200 OK
Server: nginx/1.21.1
Date: Fri, 09 Jul 2021 09:54:58 GMT
Content-Type: text/html
Content-Length: 612
Last-Modified: Tue, 06 Jul 2021 14:59:17 GMT
Connection: keep-alive
ETag: "60e46fc5-264"
Accept-Ranges: bytes

3 创建VPA
这里先使用updateMode: "Off"模式,这种模式仅获取资源推荐,但不更新Pod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bash复制代码[root@VM-10-48-centos ~]# cat nginx-vpa-demo.yaml
apiVersion: autoscaling.k8s.io/v1beta2
kind: VerticalPodAutoscaler
metadata:
name: nginx-vpa
namespace: vpa
spec:
targetRef:
apiVersion: "apps/v1"
kind: Deployment
name: nginx
updatePolicy:
updateMode: "Off"
resourcePolicy:
containerPolicies:
- containerName: "nginx"
minAllowed:
cpu: "250m"
memory: "100Mi"
maxAllowed:
cpu: "2000m"
memory: "2048Mi"

4 查看部署结果

1
2
3
bash复制代码[root@VM-10-48-centos ~]# kubectl get vpa -n vpa
NAME MODE CPU MEM PROVIDED AGE
nginx-vpa Off 7s

5 使用describe查看vpa详情,主要关注Container Recommendations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
bash复制代码[root@VM-10-48-centos ~]# kubectl describe vpa nginx-vpa -n vpa
Name: nginx-vpa
Namespace: vpa
Spec:
Resource Policy:
Container Policies:
Container Name: nginx
Max Allowed:
Cpu: 2000m
Memory: 2048Mi
Min Allowed:
Cpu: 250m
Memory: 100Mi
Target Ref:
API Version: apps/v1
Kind: Deployment
Name: nginx
Update Policy:
Update Mode: Off
Status:
Conditions:
Last Transition Time: 2021-07-09T09:59:50Z
Status: True
Type: RecommendationProvided
Recommendation:
Container Recommendations:
Container Name: nginx
Lower Bound:
Cpu: 250m
Memory: 262144k
Target:
Cpu: 250m
Memory: 262144k
Uncapped Target:
Cpu: 25m
Memory: 262144k
Upper Bound:
Cpu: 670m
Memory: 700542995

其中

1
2
3
4
5
json复制代码Lower Bound:                 下限值
Target: 推荐值
Upper Bound: 上限值
Uncapped Target: 如果没有为VPA提供最小或最大边界,则表示目标利用率
上述结果表明,推荐的 Pod 的 CPU 请求为 25m,推荐的内存请求为 262144k 字节。

6 现在对nginx进行压测
执行压测命令

1
2
3
4
5
6
7
8
9
10
text复制代码[root@VM-10-48-centos ~]# ab -c 100 -n 10000000 http://10.1.2.16:30895/
This is ApacheBench, Version 2.3 <$Revision: 1430300 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 10.1.2.16 (be patient)

Completed 1000000 requests
Completed 2000000 requests
Completed 3000000 requests

7 几分钟后再观察VPA Recommendation变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
bash复制代码[root@VM-10-48-centos ~]# kubectl describe vpa -n vpa nginx-vpa | tail -n 20
Conditions:
Last Transition Time: 2021-07-09T09:59:50Z
Status: True
Type: RecommendationProvided
Recommendation:
Container Recommendations:
Container Name: nginx
Lower Bound:
Cpu: 250m
Memory: 262144k
Target:
Cpu: 1643m
Memory: 262144k
Uncapped Target:
Cpu: 1643m
Memory: 262144k
Upper Bound:
Cpu: 2
Memory: 562581530
Events: <none>

从输出信息可以看出,VPA对Pod给出了推荐值:Cpu: 1643m,因为我们这里设置了updateMode: "Off",所以不会更新Pod

4.2 updateMode: Auto

1 把updateMode: “Auto”,看看VPA会有什么动作
这里把resources改为:memory: 50Mi,cpu: 100m

1
2
3
4
bash复制代码[root@VM-10-48-centos ~]# kubectl get po -n vpa
NAME READY STATUS RESTARTS AGE
nginx-5594c66dc6-lzs67 1/1 Running 0 26s
nginx-5594c66dc6-zk6h9 1/1 Running 0 21s

2 再次部署vpa,这里VPA部署文件nginx-vpa-demo.yaml只改了updateMode: "Auto"

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
text复制代码[root@k8s-node001 examples]# cat  nginx-vpa-demo.yaml
apiVersion: autoscaling.k8s.io/v1beta2
kind: VerticalPodAutoscaler
metadata:
name: nginx-vpa-2
namespace: vpa
spec:
targetRef:
apiVersion: "apps/v1"
kind: Deployment
name: nginx
updatePolicy:
updateMode: "Auto"
resourcePolicy:
containerPolicies:
- containerName: "nginx"
minAllowed:
cpu: "250m"
memory: "100Mi"
maxAllowed:
cpu: "2000m"
memory: "2048Mi"

3 再次压测

1
bash复制代码[root@VM-10-48-centos ~]# ab -c 100 -n 10000000 http://10.1.2.16:30895/

4 几分钟后,使用describe查看vpa详情,同样只关注Container Recommendations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
bash复制代码[root@VM-10-48-centos ~]# kubectl describe vpa nginx-vpa  -n vpa | tail -n 20
Conditions:
Last Transition Time: 2021-07-09T09:59:50Z
Status: True
Type: RecommendationProvided
Recommendation:
Container Recommendations:
Container Name: nginx
Lower Bound:
Cpu: 250m
Memory: 262144k
Target:
Cpu: 1643m
Memory: 262144k
Uncapped Target:
Cpu: 1643m
Memory: 262144k
Upper Bound:
Cpu: 2
Memory: 511550327
Events: <none>

Target变成了Cpu:1643m ,Memory:262144k

5、来看下event事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
bash复制代码[root@VM-10-48-centos ~]# kubectl get event -n vpa
LAST SEEN TYPE REASON OBJECT MESSAGE
38s Normal Scheduled pod/nginx-5594c66dc6-d8d6h Successfully assigned vpa/nginx-5594c66dc6-d8d6h to 10.1.2.16
38s Normal Pulling pod/nginx-5594c66dc6-d8d6h Pulling image "nginx"
37s Normal Pulled pod/nginx-5594c66dc6-d8d6h Successfully pulled image "nginx"
37s Normal Created pod/nginx-5594c66dc6-d8d6h Created container nginx
37s Normal Started pod/nginx-5594c66dc6-d8d6h Started container nginx
3m10s Normal Scheduled pod/nginx-5594c66dc6-lzs67 Successfully assigned vpa/nginx-5594c66dc6-lzs67 to 10.1.2.15
3m9s Normal Pulling pod/nginx-5594c66dc6-lzs67 Pulling image "nginx"
3m5s Normal Pulled pod/nginx-5594c66dc6-lzs67 Successfully pulled image "nginx"
3m5s Normal Created pod/nginx-5594c66dc6-lzs67 Created container nginx
3m5s Normal Started pod/nginx-5594c66dc6-lzs67 Started container nginx
99s Normal EvictedByVPA pod/nginx-5594c66dc6-lzs67 Pod was evicted by VPA Updater to apply resource recommendation.
99s Normal Killing pod/nginx-5594c66dc6-lzs67 Stopping container nginx
98s Normal Scheduled pod/nginx-5594c66dc6-tdmnh Successfully assigned vpa/nginx-5594c66dc6-tdmnh to 10.1.2.15
98s Normal Pulling pod/nginx-5594c66dc6-tdmnh Pulling image "nginx"
97s Normal Pulled pod/nginx-5594c66dc6-tdmnh Successfully pulled image "nginx"
97s Normal Created pod/nginx-5594c66dc6-tdmnh Created container nginx
97s Normal Started pod/nginx-5594c66dc6-tdmnh Started container nginx
3m5s Normal Scheduled pod/nginx-5594c66dc6-zk6h9 Successfully assigned vpa/nginx-5594c66dc6-zk6h9 to 10.1.2.17
3m4s Normal Pulling pod/nginx-5594c66dc6-zk6h9 Pulling image "nginx"
3m Normal Pulled pod/nginx-5594c66dc6-zk6h9 Successfully pulled image "nginx"
2m59s Normal Created pod/nginx-5594c66dc6-zk6h9 Created container nginx
2m59s Normal Started pod/nginx-5594c66dc6-zk6h9 Started container nginx
39s Normal EvictedByVPA pod/nginx-5594c66dc6-zk6h9 Pod was evicted by VPA Updater to apply resource recommendation.
39s Normal Killing pod/nginx-5594c66dc6-zk6h9 Stopping container nginx
3m10s Normal SuccessfulCreate replicaset/nginx-5594c66dc6 Created pod: nginx-5594c66dc6-lzs67
3m5s Normal SuccessfulCreate replicaset/nginx-5594c66dc6 Created pod: nginx-5594c66dc6-zk6h9
99s Normal SuccessfulCreate replicaset/nginx-5594c66dc6 Created pod: nginx-5594c66dc6-tdmnh
38s Normal SuccessfulCreate replicaset/nginx-5594c66dc6 Created pod: nginx-5594c66dc6-d8d6h
35m Normal Scheduled pod/nginx-59fdffd754-cb5dn Successfully assigned vpa/nginx-59fdffd754-cb5dn to 10.1.2.16
35m Normal Pulling pod/nginx-59fdffd754-cb5dn Pulling image "nginx"
35m Normal Pulled pod/nginx-59fdffd754-cb5dn Successfully pulled image "nginx"
35m Normal Created pod/nginx-59fdffd754-cb5dn Created container nginx
35m Normal Started pod/nginx-59fdffd754-cb5dn Started container nginx
3m5s Normal Killing pod/nginx-59fdffd754-cb5dn Stopping container nginx
35m Normal Scheduled pod/nginx-59fdffd754-cw8d7 Successfully assigned vpa/nginx-59fdffd754-cw8d7 to 10.1.2.16
35m Normal Pulling pod/nginx-59fdffd754-cw8d7 Pulling image "nginx"
35m Normal Pulled pod/nginx-59fdffd754-cw8d7 Successfully pulled image "nginx"
35m Normal Created pod/nginx-59fdffd754-cw8d7 Created container nginx
35m Normal Started pod/nginx-59fdffd754-cw8d7 Started container nginx
2m58s Normal Killing pod/nginx-59fdffd754-cw8d7 Stopping container nginx
35m Normal SuccessfulCreate replicaset/nginx-59fdffd754 Created pod: nginx-59fdffd754-cw8d7
35m Normal SuccessfulCreate replicaset/nginx-59fdffd754 Created pod: nginx-59fdffd754-cb5dn
3m5s Normal SuccessfulDelete replicaset/nginx-59fdffd754 Deleted pod: nginx-59fdffd754-cb5dn
2m58s Normal SuccessfulDelete replicaset/nginx-59fdffd754 Deleted pod: nginx-59fdffd754-cw8d7
35m Normal ScalingReplicaSet deployment/nginx Scaled up replica set nginx-59fdffd754 to 2
34m Normal EnsuringService service/nginx Deleted Loadbalancer
34m Normal EnsureServiceSuccess service/nginx Service Sync Success. RetrunCode: S2000
3m10s Normal ScalingReplicaSet deployment/nginx Scaled up replica set nginx-5594c66dc6 to 1
3m5s Normal ScalingReplicaSet deployment/nginx Scaled down replica set nginx-59fdffd754 to 1
3m5s Normal ScalingReplicaSet deployment/nginx Scaled up replica set nginx-5594c66dc6 to 2
2m58s Normal ScalingReplicaSet deployment/nginx Scaled down replica set nginx-59fdffd754 to 0

从输出信息可以了解到,vpa执行了EvictedByVPA,自动停掉了nginx,然后使用 VPA推荐的资源启动了新的nginx
,我们查看下nginx的pod可以得到确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
bash复制代码[root@VM-10-48-centos ~]# kubectl describe po -n vpa nginx-5594c66dc6-d8d6h
Name: nginx-5594c66dc6-d8d6h
Namespace: vpa
Priority: 0
Node: 10.1.2.16/10.1.2.16
Start Time: Fri, 09 Jul 2021 18:09:26 +0800
Labels: app=nginx
pod-template-hash=5594c66dc6
Annotations: tke.cloud.tencent.com/networks-status:
[{
"name": "tke-bridge",
"interface": "eth0",
"ips": [
"10.252.1.50"
],
"mac": "e6:38:26:0b:c5:97",
"default": true,
"dns": {}
}]
vpaObservedContainers: nginx
vpaUpdates: Pod resources updated by nginx-vpa: container 0: cpu request, memory request
Status: Running
IP: 10.252.1.50
IPs:
IP: 10.252.1.50
Controlled By: ReplicaSet/nginx-5594c66dc6
Containers:
nginx:
Container ID: docker://42e45f5f122ba658e293395d78a073cfe51534c773f9419a179830fd6d1698ea
Image: nginx
Image ID: docker-pullable://nginx@sha256:8df46d7414eda82c2a8c9c50926545293811ae59f977825845dda7d558b4125b
Port: <none>
Host Port: <none>
State: Running
Started: Fri, 09 Jul 2021 18:09:27 +0800
Ready: True
Restart Count: 0
Requests:
cpu: 1643m
memory: 262144k
Environment: <none>
Mounts:
/var/run/secrets/kubernetes.io/serviceaccount from default-token-m2j2z (ro)

看重点Requests:cpu: 1643m,memory: 262144k
再回头看看部署文件

1
2
3
yaml复制代码requests:
cpu: 100m
memory: 50Mi

现在可以知道VPA做了哪些事了吧。当然,随着服务的负载的变化,VPA的推荐之也会不断变化。当目前运行的pod的资源达不到VPA的推荐值,就会执行pod驱逐,重新部署新的足够资源的服务。

4.3 VPA使用限制

  • 不能与HPA(Horizontal Pod Autoscaler )一起使用
  • Pod比如使用副本控制器,例如属于Deployment或者StatefulSet

4.4 VPA有啥好处

  • Pod 资源用其所需,所以集群节点使用效率高。
  • Pod 会被安排到具有适当可用资源的节点上。
  • 不必运行基准测试任务来确定 CPU 和内存请求的合适值。
  • VPA 可以随时调整 CPU 和内存请求,无需人为操作,因此可以减少维护时间。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…614615616…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%