开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

【每日算法】力扣350 两个数组的交集 II

发表于 2021-11-25

这是我参与11月更文挑战的第25天,活动详情查看:2021最后一次更文挑战。

描述

给定两个数组,编写一个函数来计算它们的交集。

1
2
3
4
5
6
7
8
ini复制代码示例 1:

输入:nums1 = [1,2,2,1], nums2 = [2,2]
输出:[2,2]
示例 2:

输入:nums1 = [4,9,5], nums2 = [9,4,9,8,4]
输出:[4,9]

说明:

  • 输出结果中每个元素出现的次数,应与元素在两个数组中出现次数的最小值一致。
  • 我们可以不考虑输出结果的顺序。

做题

这个说明的第一条我没太看懂。

从题目“两个数组的交集”上推测,应该是不需要考虑两个数组的顺序。

我一开始还以为,要考虑两个数组的顺序,像 试例 2 的 [4,9] 就有顺序,那样就比较难做了,唯一的做法可能就是两层遍历循环了。

因为不需要考虑结果的顺序,所以我们可以考虑使用 map 来存储每个数出现的个数,然后取相同的次数最少的数作为结果返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码public int[] intersect(int[] nums1, int[] nums2) {
Map<Integer,Integer> map1=new HashMap<>();
Map<Integer,Integer> map2=new HashMap<>();

for (int i = 0; i < nums1.length; i++) {
int num = nums1[i];
Integer integer = map1.get(num);
if (integer == null){
integer = 1;
map1.put(num,integer);
continue;
}
integer +=1;
map1.put(num,integer);
}

for (int i = 0; i < nums2.length; i++) {
int num = nums2[i];
Integer integer = map2.get(num);
if (integer == null){
integer = 1;
map2.put(num,integer);
continue;
}
integer +=1;
map2.put(num,integer);
}

List<Integer> list = new ArrayList<>();
Set<Integer> integers = map1.keySet();
Iterator<Integer> iterator = integers.iterator();
while (iterator.hasNext()){
Integer next = iterator.next();
Integer integer2 = map2.get(next);
if (integer2==null){
continue;
}
Integer integer1 = map1.get(next);
integer1=integer2>integer1?integer1:integer2;
while (integer1-->0){
list.add(next);
}
}
int[] result = new int[list.size()];
for (int i = 0; i < list.size(); i++) {
result[i]=list.get(i);
}

return result;
}

image.png

跑是跑通了。

但是效率好低,代码也很长。

这部分代码优化一下,把它和第二循环结合到一起,降低时间复杂度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ini复制代码public int[] intersect1(int[] nums1, int[] nums2) {
Map<Integer,Integer> map1=new HashMap<>();
List<Integer> list = new ArrayList<>();
for (int i = 0; i < nums1.length; i++) {
int num = nums1[i];
Integer integer = map1.get(num);
if (integer == null){
integer = 1;
map1.put(num,integer);
continue;
}
map1.put(num,++integer);
}

for (int i = 0; i < nums2.length; i++) {
int num = nums2[i];
Integer integer1 = map1.get(num);
if (integer1 == null){
//当 integer1 等于 null 时,就不需要再做后面的操作了
continue;
}
if ( integer1!=0){
list.add(num);
map1.put(num,--integer1);
}

}
int[] result = new int[list.size()];
for (int i = 0; i < list.size(); i++) {
result[i]=list.get(i);
}

return result;
}

不优化不知道,一优化就减少了二十多行代码,还把 map2 优化掉了。

image.png

不过这个运行结果怎么内存占用还变高了?

今天就到这里了。

这里是程序员徐小白,【每日算法】是我新开的一个专栏,在这里主要记录我学习算法的日常,也希望我能够坚持每日学习算法,不知道这样的文章风格您是否喜欢,不要吝啬您免费的赞,您的点赞、收藏以及评论都是我下班后坚持更文的动力。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

当springmvc搭上springboot的便车后,tom

发表于 2021-11-25

前面几篇基本上是把web开发从servlet带到了springmvc,开发越来越简便,配置文件越来越少,索性可以直接抛弃了,但是随着springboot的出现,我们还能做到更简便,只需要引入一个依赖,然后一个main方法直接一键启动web应用,而且连tomcat都不需要自己配了。

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

但是在享受这种便捷的同时还是需要了解为什么,只有了解了整个流程才能运筹帷幄,虽然我一直觉得知道的越多不会的就越多(难受.jpeg)。好了废话不多说,来看一看在springboot中tomcat去哪了。

内嵌容器的启动

首先我们还记得前面提到过的Servlet3.0提供的两个扩展接口吧,先去瞅一瞅在springboot中的实现是怎么样的,首先试一试还是在META-INF下按规则创件一个文件

image.png
然后启动应用,发现控制台并没有按照预期和前面讲springmvc那样打印出这几个类的名字,这是咋回事呢?这里就带着这个问题进入主题。
点开这个ServletContainerInitializer发现他的实现类比起前面springmvc那个实现SpringServletContainerInitializer还额外多了TomcatStarter,点进去发现这是springboot的实现,但是这跟前面那个不生效有什么关系呢。

我们知道当我门在springboot中引入web依赖的时候只需要一键启动,并不需要像运行springmvc工程那样提前配置tomcat,这是因为在web依赖中已经引入了tomcat的依赖,这里使用的是内嵌的tomcat,当然也可以像springmvc那样使用外部tomcat(后面再说),那既然前面使用内嵌容器效果不符合预期,那就来分析一下启动流程,直接进入run方法,就能看到springboot启动的核心流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scss复制代码ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
ConfigurableEnvironment environment = prepareEnvironment(listeners, applicationArguments);
configureIgnoreBeanInfo(environment);
Banner printedBanner = printBanner(environment);
context = createApplicationContext();
exceptionReporters = getSpringFactoriesInstances(SpringBootExceptionReporter.class,
new Class[] { ConfigurableApplicationContext.class }, context);
prepareContext(context, environment, listeners, applicationArguments, printedBanner);
refreshContext(context);
afterRefresh(context, applicationArguments);
stopWatch.stop();
if (this.logStartupInfo) {
new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), stopWatch);
}
listeners.started(context);
callRunners(context, applicationArguments);

其他的我们暂时不需要关心,前面准备好环境之后,来到refreshContext这一步,这里就会进到AbstractApplicationContext的refresh方法,知道spring的人应该都知道这个方法,然后我们注意力放在onfresh()这个方法上,打上断点,直接step into 发现该方法进入到了ServletWebServerApplicationContext中

1
2
3
4
5
6
7
8
9
10
typescript复制代码@Override
protected void onRefresh() {
super.onRefresh();
try {
createWebServer();
}
catch (Throwable ex) {
throw new ApplicationContextException("Unable to start web server", ex);
}
}

然后这里就是关键了,看名字就是创建server,在这里默认就是创建内嵌的tomcat,所以我们可以看出来springboot和springmvc的启动室友明显差别的,前者是依赖spring的refresh带动tomcat启动,后者是依赖tomcat启动加监听器来带动spring的refresh,继续跟进

1
ini复制代码this.webServer = factory.getWebServer(getSelfInitializer());

来到selfInitialize方法,这里先设置父容器,所以在springboot中也是存在父子容器的

1
2
3
4
5
6
7
8
scss复制代码private void selfInitialize(ServletContext servletContext) throws ServletException {
prepareWebApplicationContext(servletContext);
registerApplicationScope(servletContext);
WebApplicationContextUtils.registerEnvironmentBeans(getBeanFactory(), servletContext);
for (ServletContextInitializer beans : getServletContextInitializerBeans()) {
beans.onStartup(servletContext);
}
}

然后就出现了一个ServletContextInitializer,是不是感觉他和servlet提供的那个ServletContainerInitializer很相似,但是这是springboot提供的,我们看一下她的实现类,有好几个,但是我们先记住DispatcherServletRegistrationBean,看名字就知道他是用来注册DispatcherServlet的,然后是在这一步执行这些实现类的onstartup方法的,连方法名都一样,值得思考。

继续往下来到创建tomcat的方法里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
scss复制代码@Override
public WebServer getWebServer(ServletContextInitializer... initializers) {
if (this.disableMBeanRegistry) {
Registry.disableRegistry();
}
Tomcat tomcat = new Tomcat();
File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat");
tomcat.setBaseDir(baseDir.getAbsolutePath());
Connector connector = new Connector(this.protocol);
connector.setThrowOnFailure(true);
tomcat.getService().addConnector(connector);
customizeConnector(connector);
tomcat.setConnector(connector);
tomcat.getHost().setAutoDeploy(false);
configureEngine(tomcat.getEngine());
for (Connector additionalConnector : this.additionalTomcatConnectors) {
tomcat.getService().addConnector(additionalConnector);
}
prepareContext(tomcat.getHost(), initializers);
return getTomcatWebServer(tomcat);
}

在tomcat创建好后进入prepareContext方法,同时传入了前面拿到的ServletContextInitializer们,然后进入最关键的一步configureContext(),

1
2
3
4
5
6
7
ini复制代码TomcatStarter starter = new TomcatStarter(initializers);
if (context instanceof TomcatEmbeddedContext) {
TomcatEmbeddedContext embeddedContext = (TomcatEmbeddedContext) context;
embeddedContext.setStarter(starter);
embeddedContext.setFailCtxIfServletStartFails(true);
}
context.addServletContainerInitializer(starter, NO_CLASSES);

这里创建了TomcatStarter,我们前面说了她又springboot提供,并且是实现了servlet提供的ServletContainerInitializer接口的,所以到这里就应该很明了了,springboot通过控制内嵌tomcat的启动,手动管理了ServletContainerInitializer的注册,这里只注册了TomcatStarter这一个实现,所以我们现在应该知道了为什么我们最开始写的那个例子不生效了,因为人压根儿就没察觉到你的存在,他只会关心TomcatStarter这一个实现,然后sprngboot又在这个实现里传入了自己提供的扩展接口ServletContextInitializer,看起来就好像效果一样,我们只需要实现这个接口就能达到一样的效果,而且还不用额外去添加一个META-INF文件了。

回过头来总结一下吧,首先是sprinboot并没有说抛弃ServletContainerInitializer,只是她为了能更好的控制整个应用的生命周期,所以强行提供了一个类似的接口ServletContextInitializer,他俩的效果几乎是一摸一样的,但是对于我们开发者来说,如果是使用了内嵌容器的这种方式,我们的开发更便捷了,更加不用考虑容器底层的实现机制了,只用关心我们的业务,但是能够了解这个过程也还是很有好处的。

DispatchServlet

但凡是springmvc的应用,都离不开DispatchServlet,在分析springmvc的时候我们知道了他的加载实际(分别是xml形式和config形式),那我们再来看一看在springboot中他又是咋个加载的。
上面我们让注意一个东西,就是DispatcherServletRegistrationBean,他是ServletContextInitializer的实现,所以在内嵌容器启动的时候他就会执行onstartup方法
首先进到这个class,借助idea的提示,在class前面有一个绿色的圈圈,表示这个class是被springboot自动配置过的,也就是说springboot在自动创建bean的时候是包含了DispatchServlet相关的bean的(自动配置这些不是我想要说的内容,默认大家都知道了),不信点击那个绿色的圈圈

image.png
就能进入这个自动配置类DispatcherServletAutoConfiguration,然后再回头来看onstartup的实现,进入RegistrationBean,看到需要调用子类的register方法

1
2
3
4
5
6
7
8
9
java复制代码@Override
protected final void register(String description, ServletContext servletContext) {
D registration = addRegistration(description, servletContext);
if (registration == null) {
logger.info(StringUtils.capitalize(description) + " was not registered (possibly already registered?)");
return;
}
configure(registration);
}

继续进入addRegistration方法来到ServletRegistrationBean,发现这是一个泛型class

1
scala复制代码public class ServletRegistrationBean<T extends Servlet> extends DynamicRegistrationBean<ServletRegistration.Dynamic>

其实现类就是前面提到的就是DispatcherServletRegistrationBean,提供的泛型恰好就是DispatcherServlet,so?bean也有了,类型也确定了,

1
2
3
4
5
typescript复制代码@Override
protected ServletRegistration.Dynamic addRegistration(String description, ServletContext servletContext) {
String name = getServletName();
return servletContext.addServlet(name, this.servlet);
}

那么这个放法前面我已经演示过了,可以动态注册servlet,是不是觉得自己突然懂了什么。。。

最后的最后,请一定注意,这里面说的所有东西都是基于内嵌容器来说的,如果使用了外部容器,我们前面那个例子就会生效了,因为他走的就是以前springmvc的流程

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

性能监控之 blackbox_exporter+Promet

发表于 2021-11-25

「这是我参与11月更文挑战的第25天,活动详情查看:2021最后一次更文挑战」

一、什么是黑盒监控?

前面文章中我们主要介绍了 Prometheus 下如何进行白盒监控,我们监控主机的资源用量等运行数据。 这些都是支持业务和服务的基础设施,通过白盒能够了解其内部的实际运行状态,通过对监控指标的观察能够预判可能出现的问题,从而对潜在的不确定因素进行优化。而从完整的全局监控逻辑的角度,除了大量的应用白盒监控以外,还应该添加适当的黑盒监控。黑盒监控即以用户的身份测试服务的外部可见性,常见的黑盒监控包括 HTT P探针、TCP 探针等用于检测站点或者服务的可访问性,以及访问效率等。

二、blackbox_exporter 简介

blackbox_exporter 是 Prometheus 官方提供的官方黑盒监控解决方案,其中 exporter 之一,可以提供 http(s)、dns、tcp、icmp 的方式对网络进行探测。

Github地址:github.com/prometheus/…

目前支持的应用场景:

  • ICMP 测试
    • 主机探活机制
  • TCP 测试
    • 业务组件端口状态监听
    • 应用层协议定义与监听
  • HTTP 测试
    • 定义 Request Header 信息
    • 判断 Http status / Http Respones Header / Http Body 内容
  • POST 测试
    • 接口联通性
  • SSL 证书过期时间
  • 自定义测试(扩展)

三、安装

1、二进制包

各个版本的 blackbox_exporter 下载地址为: github.com/prometheus/…

以 Linux 系统为例,下载编译好的二进制包,解压使用:

1
2
3
4
5
6
7
8
bash复制代码# cd /data/blackbox_exporter/
# ./blackbox_exporter --version
blackbox_exporter, version 0.16.0 (branch: HEAD, revision: 991f89846ae10db22a3933356a7d196642fcb9a9)
build user: root@64f600555645
build date: 20191111-16:27:24
go version: go1.13.4

nohup ./blackbox_exporter &

2、docker

1
bash复制代码docker run -id --name blackbox-exporter -p 9115:9115  prom/blackbox-exporter

四、使用原理

官方解释:github.com/prometheus/…

运行 blackbox_exporter 时,需要用户提供探针的配置信息,这些配置信息可能是一些自定义的 http 头信息,也可能是探测时需要的一些 tls 配置,也可能是探针本身的验证行为。在 blackbox_exporter 每一个探针配置称为一个 module,并且以 yaml 配置文件的形式提供给 blackbox_exporter 。每一个 module 主要包含以下配置内容,包括探针类型(prober)、验证访问超时时间(timeout)、以及当前探针的具体配置项:

1
2
3
4
5
6
7
8
9
10
11
bash复制代码  # 探针类型: http https tcp dns icmp
prober: <prober_string>

# 超时时间
[ timeout: <duration> ] #默认单位秒

# 探针的详细配置,最多只能配置其中一个
[ http: <http_probe> ]
[ tcp: <tcp_probe> ]
[ dns: <dns_probe> ]
[ icmp: <icmp_probe> ]

<http_probe>可配置参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
bash复制代码  # 此探针接受的状态代码。 默认为2xx。
[ valid_status_codes: <int>, ... | default = 2xx ]

# 此探针接受的 HTTP 版本。
[ valid_http_versions: <string>, ... ]

# 探针将使用的 HTTP 方法。
[ method: <string> | default = "GET" ]

# 为探针设置的 HTTP 标头。
headers:
[ <string>: <string> ... ]

# 探针是否将遵循任何重定向。
[ no_follow_redirects: <boolean> | default = false ]

# 如果存在SSL,则探测失败。
[ fail_if_ssl: <boolean> | default = false ]

# 如果不存在SSL,则探测失败。
[ fail_if_not_ssl: <boolean> | default = false ]

# 如果响应主体与正则表达式匹配,则探测失败。
fail_if_body_matches_regexp:
[ - <regex>, ... ]

# 如果响应主体与正则表达式不匹配,则探测失败。
fail_if_body_not_matches_regexp:
[ - <regex>, ... ]

# 如果响应头与正则表达式匹配,则探测失败。 对于具有多个值的标头,如果*至少一个*匹配,则失败。
fail_if_header_matches:
[ - <http_header_match_spec>, ... ]

# 如果响应头与正则表达式不匹配,则探测失败。 对于具有多个值的标头,如果* none *不匹配,则失败。
fail_if_header_not_matches:
[ - <http_header_match_spec>, ... ]

# HTTP 探针的 TLS 协议的配置。
tls_config:
[ <tls_config> ]

# 目标的 HTTP 基本身份验证凭据。
basic_auth:
[ username: <string> ]
[ password: <secret> ]
[ password_file: <filename> ]

# 目标的承载令牌。
[ bearer_token: <secret> ]

# 目标的承载令牌文件。
[ bearer_token_file: <filename> ]

# 用于连接到目标的 HTTP 代理服务器。
[ proxy_url: <string> ]

# HTTP 探针的 IP 协议(ip4,ip6)
[ preferred_ip_protocol: <string> | default = "ip6" ]
[ ip_protocol_fallback: <boolean> | default = true ]

# 探针中使用的 HTTP 请求的主体。
body: [ <string> ]

五、几种应用场景

1、ICMP 测试(主机探活)

可以通过 ping(icmp) 检测服务器的存活,在 blackbox.yml 配置文件中配置使用 icmp module:

1
2
3
yaml复制代码modules:
icmp:
prober: icmp

在 prometheus 配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
yml复制代码  - job_name: 'blackbox-ping'
metrics_path: /probe
params:
modelus: [icmp]
static_configs:
- targets:
- 172.16.106.208 #被监控端ip
- 172.16.106.80
relabel_configs:
- source_labels: [__address__]
target_label: __param_target
- source_labels: [__param_target]
target_label: instance
- target_label: __address__
replacement: 172.16.106.84:9115 #blackbox-exporter 所在的机器和端口

这里分别配置了名为 blackbox-ping 的采集任务,并且通过 params 指定使用的探针(module)以及探测目标(target)。那问题就来了,假如我们有 N 个目标站点且都需要 M 种探测方式,那么 Prometheus 中将包含 N * M 个采集任务,从配置管理的角度来说显然是不可接受的。 在前面的文章我们介绍了 Prometheus 的 Relabeling 能力,这里我们也可以采用 Relabling 的方式对这些配置进行简化。
这里针对每一个探针服务(如 icmp)定义一个采集任务,并且直接将任务的采集目标定义为我们需要探测的站点。在采集样本数据之前通过 relabel_configs 对采集任务进行动态设置。

  • 第1步,根据 target 实例的地址,写入 __param_target 标签中。__param_<name> 形式的标签表示,在采集任务时会在请求目标地址中添加 <name> 参数,等同于 params 的设置;
  • 第2步,获取 __param_target 的值,并覆写到 instance 标签中;
  • 第3步,覆写 target 实例的 __address__ 标签值为 blackbox_exporter 实例的访问地址。

通过以上3个 relabel 步骤,即可大大简化 Prometheus 任务配置的复杂度。
Blackbox Target实例

2、TCP 测试(监控主机端口存活状态)

在 blackbox.yml 配置文件中配置使用 tcp module:

1
2
3
yaml复制代码modules:
tcp_connect:
prober: tcp

在 prometheus 配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
yml复制代码  - job_name: 'blackbox-tcp'
metrics_path: /probe
params:
modelus: [tcp_connect]
static_configs:
- targets:
- 172.16.106.208:6443
- 172.16.106.80:6443
relabel_configs:
- source_labels: [__address__]
target_label: __param_target
- source_labels: [__param_target]
target_label: instance
- target_label: __address__
replacement: 172.16.106.84:9115

3、HTTP检测(监控网站状态)

http 探针是进行黑盒监控时最常用的探针之一,通过 http 探针能够网站或者 http 服务建立有效的监控,包括其本身的可用性,以及用户体验相关的如响应时间等等。除了能够在服务出现异常的时候及时报警,还能帮助运维同学分析和优化网站体验。

在 blackbox.yml 配置文件中配置使用 http module:

1
2
3
4
5
6
7
8
9
yml复制代码modules:
http_2xx:
prober: http
http:
method: GET
http_post_2xx:
prober: http
http:
method: POST

在 prometheus 配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
yml复制代码  - job_name: 'blackbox-http'
metrics_path: /probe
params:
modelue: [http_2xx]
static_configs:
- targets:
- http://monitor.mall.demo.com/login
relabel_configs:
- source_labels: [__address__]
target_label: __param_target
- source_labels: [__param_target]
target_label: instance
- target_label: __address__
replacement: 172.16.106.84:9115 #blackbox-exporter 所在的机器和端口

通过 prober 配置项指定探针类型。配置项 http 用于自定义探针的探测方式,这里有没对 http 配置项添加任何配置,表示完全使用 http 探针的默认配置,该探针将使用 http get 的方式对目标服务进行探测,并且验证返回状态码是否为 2xx,是则表示验证成功,否则失败。

采集的数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
bash复制代码# DNS解析时间,单位 s
probe_dns_lookup_time_seconds 0.000199105
# 探测从开始到结束的时间,单位 s,请求这个页面响应时间
probe_duration_seconds 0.010889113
# HELP probe_failed_due_to_regex Indicates if probe failed due to regex
# TYPE probe_failed_due_to_regex gauge
probe_failed_due_to_regex 0
# HTTP 内容响应的长度
probe_http_content_length -1
# 按照阶段统计每阶段的时间
probe_http_duration_seconds{phase="connect"} 0.001083728 #连接时间
probe_http_duration_seconds{phase="processing"} 0.008365885 #处理请求的时间
probe_http_duration_seconds{phase="resolve"} 0.000199105 #响应时间
probe_http_duration_seconds{phase="tls"} 0 #校验证书的时间
probe_http_duration_seconds{phase="transfer"} 0.000446424 #传输时间
# 重定向的次数
probe_http_redirects 0
# ssl 指示是否将 SSL 用于最终重定向
probe_http_ssl 0
# 返回的状态码
probe_http_status_code 200
# 未压缩的响应主体长度
probe_http_uncompressed_body_length 1766
# http 协议的版本
probe_http_version 1.1
# HELP probe_ip_addr_hash Specifies the hash of IP address. It's useful to detect if the IP address changes.
probe_ip_addr_hash 3.24030434e+09
# 使用的 ip 协议的版本号
probe_ip_protocol 4
# 是否探测成功
probe_success 1

4、自定义 HTTP 请求

http 服务通常会以不同的形式对外展现,有些可能就是一些简单的网页,而有些则可能是一些基于 REST 的 API 服务。 对于不同类型的 http 的探测需要管理员能够对 http 探针的行为进行更多的自定义设置,包括:http 请求方法、http 头信息、请求参数等。对于某些启用了安全认证的服务还需要能够对 http 探测设置相应的 auth 支持。对于 https 类型的服务还需要能够对证书进行自定义设置。
如下所示,这里通过 method 定义了探测时使用的请求方法,对于一些需要请求参数的服务,还可以通过 headers 定义相关的请求头信息,使用 body 定义请求内容:

1
2
3
4
5
6
7
8
yaml复制代码http_post_2xx:
prober: http
timeout: 5s
http:
method: POST
headers:
Content-Type: application/json
body: '{}'

如果 http 服务启用了安全认证,blackbox_exporter 内置了对 basic_auth 的支持,可以直接设置相关的认证信息即可:

1
2
3
4
5
6
7
8
9
10
yaml复制代码http_basic_auth_example:
prober: http
timeout: 5s
http:
method: POST
headers:
Host: "login.example.com"
basic_auth:
username: "username"
password: "mysecret"

对于使用了Bear Token 的服务也可以通过 bearer_token 配置项直接指定令牌字符串,或者通过 bearer_token_file 指定令牌文件。
对于一些启用了 https 的服务,但是需要自定义证书的服务,可以通过 tls_config 指定相关的证书信息:

1
2
3
4
5
6
yml复制代码 http_custom_ca_example:
prober: http
http:
method: GET
tls_config:
ca_file: "/certs/my_cert.crt"

5、自定义探针行为

在默认情况下 http 探针只会对 http 返回状态码进行校验,如果状态码为 2XX(200 <= StatusCode < 300)则表示探测成功,并且探针返回的指标 probe_success 值为1。
如果用户需要指定 http 返回状态码,或者对 http 版本有特殊要求,如下所示,可以使用 valid_http_versions 和valid_status_codes 进行定义:

1
2
3
4
5
6
yaml复制代码  http_2xx_example:
prober: http
timeout: 5s
http:
valid_http_versions: ["HTTP/1.1", "HTTP/2"]
valid_status_codes: []

默认情况下,Blockbox 返回的样本数据中也会包含指标 probe_http_ssl,用于表明当前探针是否使用了 ssl:

1
2
3
yml复制代码# HELP probe_http_ssl Indicates if SSL was used for the final redirect
# TYPE probe_http_ssl gauge
probe_http_ssl 0

而如果用户对于 http 服务是否启用 ssl 有强制的标准。则可以使用 fail_if_ssl 和 fail_if_not_ssl 进行配置。fail_if_ssl 为 true 时,表示如果站点启用了 ssl 则探针失败,反之成功。fail_if_not_ssl 刚好相反。

1
2
3
4
5
6
7
8
9
yaml复制代码  http_2xx_example:
prober: http
timeout: 5s
http:
valid_status_codes: []
method: GET
no_follow_redirects: false
fail_if_ssl: false
fail_if_not_ssl: false

除了基于 http 状态码,http 协议版本以及是否启用 ssl 作为控制探针探测行为成功与否的标准以外,还可以匹配 http 服务的响应内容。使用 fail_if_matches_regexp 和 fail_if_not_matches_regexp 用户可以定义一组正则表达式,用于验证 http 返回内容是否符合或者不符合正则表达式的内容。

1
2
3
4
5
6
7
8
9
yaml复制代码  http_2xx_example:
prober: http
timeout: 5s
http:
method: GET
fail_if_matches_regexp:
- "Could not connect to database"
fail_if_not_matches_regexp:
- "Download the latest version here"

最后需要提醒的时,默认情况 下http 探针会走 ipv6 的协议。 在大多数情况下,可以使用 preferred_ip_protocol=ip4 强制通过ipv4 的方式进行探测。在 Bloackbox 响应的监控样本中,也会通过指标 probe_ip_protocol,表明当前的协议使用情况:

1
2
3
yaml复制代码# HELP probe_ip_protocol Specifies whether probe ip protocol is IP4 or IP6
# TYPE probe_ip_protocol gauge
probe_ip_protocol 6

6、检查配置文件

检查配置文件是否书写正确

1
2
bash复制代码cd /data/prometheus
./promtool check config prometheus.yml

六、集成 Grafana

在 grafana 导入 blackbox_exporter 9965 号模板:grafana.com/grafana/das…

注意:此模板需要安装饼状图插件,下载地址:grafana.com/grafana/plu…

安装插件,重启grafana生效。

1
2
bash复制代码grafana-cli plugins install grafana-piechart-panel
service grafana-server restart

查看数据如下:

在这里插入图片描述

八、小结

黑盒监控相较于白盒监控最大的不同在于黑盒监控是以故障为导向当故障发生时,黑盒监控能快速发现故障,而白盒监控则侧重于主动发现或者预测潜在的问题。一个完善的监控目标是要能够从白盒的角度发现潜在问题,能够在黑盒的角度快速发现已经发生的问题。

示例资料:

  • github.com/zuozewei/bl…

参考资料:

  • [1]:cloud.tencent.com/developer/a…
  • [2]:www.cnblogs.com/xiao9873341…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

数仓分层及命名规范

发表于 2021-11-25

「这是我参与11月更文挑战的第8天,活动详情查看:2021最后一次更文挑战」。

结合生产进行总结。

一 数仓分层

1.数据引入层(ODS,Operational Data Store,又称数据基础层)

将原始数据几乎无处理地存放在数据仓库系统中,结构上与源系统基本保持一致,是数据仓库的数据准备区。

1
2
3
复制代码1)埋点数据
2)mysql静态同步数据
3)mysql离线同步数据
2.数据公共层(CDM,Common Dimenions Model)

存放明细事实数据、维表数据及公共指标汇总数据。

其中,明细事实数据、维表数据一般根据ODS层数据加工生成。公共指标汇总数据一般根据维表数据和明细事实数据加工生成。

CDM层又细分为维度层(DIM)、明细数据层(DWD)和汇总数据层(DWS),采用维度模型方法作为理论基础, 可以定义维度模型主键与事实模型中外键关系,减少数据冗余,也提高明细数据表的易用性。

在汇总数据层同样可以关联复用统计粒度中的维度,采取更多的宽表化手段构建公共指标数据层,提升公共指标的复用性,减少重复加工。

1
2
3
4
5
6
7
8
vbnet复制代码1)维度层(DIM,Dimension)    
以维度作为建模驱动,基于每个维度的业务含义,通过添加维度属性、关联维度等定义计算逻辑,完成属性定义的过程并建立一致的数据分析维表。为了避免在维度模型中冗余关联维度的属性,基于雪花模型构建维度表。
2)明细数据层(DWD,Data Warehouse Detail)
由ODS层数据接入,增量或全量导入,将明细事实表的某些重要属性字段做适当冗余,也即宽表化处理。
3)汇总数据层(DWS,Data Warehouse Summary)
以分析的主题对象作为建模驱动,基于上层的应用和产品的指标需求,构建公共粒度的汇总指标表。以宽表化手段物理化模型,构建命名规范、口径一致的统计指标,为上层提供公共指标,建立汇总宽表、明细事实表。
4)数据中间层(MID,Data Middle Conversion)
数据中间层,部分复杂指标计算中间层,一般体现在行转列,多维度中间存储等。
3.数据应用层(ADS,Application Data Store)

存放数据产品个性化的统计指标数据,根据CDM层与ODS层加工生成。

1
2
makefile复制代码1)BI指标: 基础指标 + 自定义指标 + 漏斗模型 + 月报
2)离线标签: 平台行为 + 点击行为 + 付费 + 基础属性

二 命名规范

1.ods: 平台名/库名 或 事件名/表名
1
ini复制代码 例: 埋点 [xxx]_[xxxx]     mysql [xxx]_[xxxx]
2.dim: 维度类型
1
ini复制代码 例: [dim]_[xxxx]
3.dwd: dwd_平台名/库名_事件名/表名_..
1
ini复制代码 例: dwd_[x]_[xxxx]
4.dws: dws_平台名_主题名
1
ini复制代码 例: dws_[x]_[xxxx]
5.ads: ads_平台名_应用名_指标名/模型名 ,平台拆开
1
2
ini复制代码 例: bi应用    ads_[x]_[bi]_[xxx]_[xxxx]      
画像应用 ads_[x]_[portrayal]_[xxx]_[xxxx]

三 分层意义

1.清晰数据结构

每一个数据分层都有它的作用域,这样我们在使用表的时候能更方便地定位和理解;

2.数据血缘追踪

最终计算好呈现给业务的数据来源有很多,如果有一张来源表出问题了,我们希望能够快速准确地定位到问题,并清楚它的危害范围;

3.减少重复开发

规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算;

4.把复杂问题简单化

将一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解,而且便于维护数据的准确性,当数据出现问题之后,可以不用修复所有的数据,只需要从有问题的步骤开始修复。屏蔽原始数据的异常,屏蔽业务的影响,不必改一次业务就需要重新接入数据。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

性能监控之 node_exporter+Prometheus

发表于 2021-11-25

「这是我参与11月更文挑战的第25天,活动详情查看:2021最后一次更文挑战」

一、概述

在 Prometheus 的架构设计中,Prometheus Server 并不直接服务监控特定的目标,其主要任务负责数据的收集,存储并且对外提供数据查询支持。因此为了能够能够监控到某些东西,如主机的 CPU 使用率,我们需要使用到 Exporter。Prometheus 周期性的从 Exporter 暴露的 HTTP 服务地址(通常是 /metrics)拉取监控样本数据。

从上面的描述中可以看出 Exporter 可以是一个相对开放的概念,其可以是一个独立运行的程序独立于监控目标以外,也可以是直接内置在监控目标中。只要能够向 Prometheus 提供标准格式的监控样本数据即可。

这里为了能够采集到主机的运行指标如 CPU, 内存,磁盘等信息。我们可以使用 node_exporter。

node_exporter 用于采集服务器层面的运行指标,包括机器的 loadavg、filesystem、meminfo等基础监控,类似于传统主机监控维度的 zabbix-agent
node-export 由 prometheus 官方提供、维护,不会捆绑安装,但基本上是必备的 exporter

二、功能

node_exporter 用于提供 *NIX 内核的硬件以及系统指标。

  • 如果是 windows 系统,可以使用 wmi_exporterr
  • 如果是采集 NVIDIA 的 GPU 指标,可以使用 prometheus-dcgm

根据不同的 *NIX 操作系统,node_exporter 采集指标的支持也是不一样的,如:

  • diskstats 支持 Darwin, Linux
  • cpu 支持Darwin, Dragonfly, FreeBSD, Linux, Solaris 等,

详细信息参考:node_exporter

我们可以使用 –collectors.enabled 参数指定node_exporter 收集的功能模块,或者用 –no-collector 指定不需要的模块,如果不指定,将使用默认配置。

三、安装

1、二进制包

node_exporter 同样采用 Golang 编写,并且不存在任何的第三方依赖,只需要下载,解压即可运行。可以从prometheus.io/download/ 获取最新的 node_exporter 版本的二进制包。

1
2
bash复制代码curl -OL https://github.com/prometheus/node_exporter/releases/download/v1.1.2/node_exporter-1.1.2.darwin-amd64.tar.gz
tar -xzf node_exporter-1.1.2.darwin-amd64.tar.gz

运行 node_exporter:

1
2
3
bash复制代码cd node_exporter-1.1.2.darwin-amd64
cp node_exporter-1.1.2.darwin-amd64/node_exporter /usr/local/bin/
node_exporter

启动成功后,可以看到以下输出:

1
bash复制代码INFO[0000] Listening on :9100                            source="node_exporter.go:76"

访问 http://localhost:9100/ 可以看到以下结果:

1
2
3
4
5
6
7
8
bash复制代码# curl http://localhost:9100
<html>
<head><title>Node Exporter</title></head>
<body>
<h1>Node Exporter</h1>
<p><a href="/metrics">Metrics</a></p>
</body>
</html>

2、docker容器

1
2
3
4
5
6
bash复制代码docker run -d \
--net="host" \
--pid="host" \
-v "/:/host:ro,rslave" \
quay.io/prometheus/node-exporter \
--path.rootfs /host

四、node_exporter 监控指标

如果是二进制或者 docke r部署,部署成功后可以访问:http://${IP}:9100/metrics,可以看到当前 node_exporter 获取到的当前主机的所有监控数据,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bash复制代码......
de_cpu_seconds_total{cpu="2",mode="nice"} 1.43
node_cpu_seconds_total{cpu="2",mode="softirq"} 77.66
node_cpu_seconds_total{cpu="2",mode="steal"} 618.42
node_cpu_seconds_total{cpu="2",mode="system"} 20981.5
node_cpu_seconds_total{cpu="2",mode="user"} 26925.45
node_cpu_seconds_total{cpu="3",mode="idle"} 2.52970118e+06
node_cpu_seconds_total{cpu="3",mode="iowait"} 58.83
node_cpu_seconds_total{cpu="3",mode="irq"} 0
node_cpu_seconds_total{cpu="3",mode="nice"} 1.57
node_cpu_seconds_total{cpu="3",mode="softirq"} 54.37
node_cpu_seconds_total{cpu="3",mode="steal"} 538.14
node_cpu_seconds_total{cpu="3",mode="system"} 18511.33
node_cpu_seconds_total{cpu="3",mode="user"} 24297.44
# HELP node_disk_io_now The number of I/Os currently in progress.
# TYPE node_disk_io_now gauge
node_disk_io_now{device="dm-0"} 0
node_disk_io_now{device="dm-1"} 0
node_disk_io_now{device="vda"} 0
# HELP node_disk_io_time_seconds_total Total seconds spent doing I/Os.
# TYPE node_disk_io_time_seconds_total counter
node_disk_io_time_seconds_total{device="dm-0"} 0.321
node_disk_io_time_seconds_total{device="dm-1"} 13765.443000000001
node_disk_io_time_seconds_total{device="vda"} 317.065
......

每一个监控指标之前都会有一段类似于如下形式的信息:

1
2
3
4
5
bash复制代码# HELP node_disk_io_time_seconds_total Total seconds spent doing I/Os.
# TYPE node_disk_io_time_seconds_total counter
node_disk_io_time_seconds_total{device="dm-0"} 0.321
node_disk_io_time_seconds_total{device="dm-1"} 13765.443000000001
node_disk_io_time_seconds_total{device="vda"} 317.065

其中 HELP 用于解释当前指标的含义,TYPE 则说明当前指标的数据类型。

除了这些以外,在当前页面中根据物理主机系统的不同,你还可能看到如下监控指标:

  • node_boot_time_seconds:系统启动时间
  • node_cpu_seconds_total:系统CPU使用量
  • nodedisk*:磁盘IO
  • nodefilesystem*:文件系统用量
  • node_load1:系统负载
  • node_memory*:内存使用量
  • node_network*:网络带宽
  • node_time:当前系统时间
  • go_*:node exporter中go相关指标
  • process_*:node exporter自身进程相关运行指标

五、配置 Prometheus

为了能够让 Prometheus Server 能够从当前 node exporter 获取到监控数据,这里需要修改 Prometheus 配置文件。编辑 prometheus.yml 并在 scrape_configs 节点下添加以下内容:

1
2
3
4
5
6
7
8
yaml复制代码scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
# 采集node exporter监控数据
- job_name: 'node'
static_configs:
- targets: ['172.16.106.84:9100']

重新启动 Prometheus Server
访问 http://${IP}:9090/,进入到 Prometheus Server。如果输入“up”并且点击执行按钮以后,可以看到如下结果:

在这里插入图片描述

其中“1”表示正常,反之“0”则为异常。

此时查看 targets 状态:

在这里插入图片描述

注意:对于而一组用于相同采集目的的实例,或者同一个采集进程的多个副本则通过一个一个任务(Job)进行管理。

1
2
3
bash复制代码* job: node
* instance 2: 1.2.3.4:9100
* instance 4: 5.6.7.8:9100

对于配置文件设置如下:

1
2
3
4
5
6
7
8
yaml复制代码 job_name: node
static_configs:
- targets: ['172.16.106.116:9100']
labels:
instance: vm-1
- targets: ['172.16.106.119:9100']
labels:
instance: vm-2

六、使用 PromQL 查询监控数据

Prometheus UI 是 Prometheus 内置的一个可视化管理界面,通过 Prometheus UI 用户能够轻松的了解 Prometheus 当前的配置,监控任务运行状态等。 通过 Graph 面板,用户还能直接使用 PromQL 实时查询监控数据,也可以使用 PromQL 表达式查询特定监控指标的监控数据。如下所示,查询主机负载变化情况,可以使用关键字 node_load1 可以查询出 Prometheus 采集到的主机负载的样本数据,这些样本数据按照时间先后顺序展示,形成了主机负载随时间变化的趋势图表:

在这里插入图片描述

PromQL 是 Prometheus 自定义的一套强大的数据查询语言,除了使用监控指标作为查询关键字以为,还内置了大量的函数,帮助用户进一步对时序数据进行处理。例如使用 rate() 函数,可以计算在单位时间内样本数据的变化情况即增长率,因此通过该函数我们可以近似的通过 CPU 使用时间计算 CPU 的利用率:

rate(nodecpusecondstotal[2m])rate(node_cpu_seconds_total[2m])rate(nodec​pus​econdst​otal[2m])

在这里插入图片描述

这时如果要忽略是哪一个 CPU 的,只需要使用 without 表达式,将标签 CPU 去除后聚合数据即可:

avgwithout(cpu)(rate(nodecpusecondstotal[2m]))avg without(cpu) (rate(node_cpu_seconds_total[2m]))avgwithout(cpu)(rate(nodec​pus​econdst​otal[2m]))
在这里插入图片描述

七、数据可视化

Prometheus UI 提供了快速验证 PromQL 以及临时可视化支持的能力,而在大多数场景下引入监控系统通常还需要构建可以长期使用的监控数据可视化面板(Dashboard)。这时用户可以考虑使用第三方的可视化工具如 Grafana,Grafana是一个开源的可视化平台,并且提供了对 Prometheus 的完整支持。

二进制包安装:

1
2
bash复制代码wget https://dl.grafana.com/oss/release/grafana-7.4.5.linux-amd64.tar.gz
tar -zxvf grafana-7.4.5.linux-amd64.tar.gz

docker 安装:

1
bash复制代码docker run -d --name=grafana -p 3000:3000 grafana/grafana

访问 http://localhost:3000 就可以进入到 Grafana 的界面中,默认情况下使用账户 admin/admin 进行登录。
在 Grafana 首页中显示默认的使用向导,包括:安装、添加数据源、创建 Dashboard、邀请成员、以及安装应用和插件等主要流程:

在这里插入图片描述

这里将添加 Prometheus 作为默认的数据源,如下图所示,指定数据源类型为 Prometheus 并且设置 Prometheus 的访问地址即可,在配置正确的情况下点击“Add”按钮,会提示连接成功的信息:

在这里插入图片描述

在完成数据源的添加之后就可以 在Grafana 中再配置一个 node_exporter 的模板,当然作为开源软件,Grafana 社区鼓励用户分享 Dashboard 通过 grafana.com/dashboards 网站,可以找到大量可直接使用的Dashboard:比如我这里选择了热门模板(ID:8919),展示效果如下:

在这里插入图片描述

在这里插入图片描述

八、扩展知识

1、推荐的exporter

node_exporterr 是Prometheus官方推荐的 exporter,类似的还有:

在这里插入图片描述

官方推荐的都会在 github.com/prometheus 下,在 exporter推荐页,也会有很多第三方的exporter,由个人或者组织开发上传,如果有自定义的采集需求,可以自己编写exporter。

2、注意版本

因为 node_exporter 是比较老的组件,有一些最佳实践并没有 merge 进去,比如符合 Prometheus 命名规范),因此建议使用较新版本,目前(2021.3)最新版本为 1.1.2

一些指标名字的变化(详细比对)

1
2
3
4
5
6
7
8
9
10
11
bash复制代码* node_cpu ->  node_cpu_seconds_total
* node_memory_MemTotal -> node_memory_MemTotal_bytes
* node_memory_MemFree -> node_memory_MemFree_bytes
* node_filesystem_avail -> node_filesystem_avail_bytes
* node_filesystem_size -> node_filesystem_size_bytes
* node_disk_io_time_ms -> node_disk_io_time_seconds_total
* node_disk_reads_completed -> node_disk_reads_completed_total
* node_disk_sectors_written -> node_disk_written_bytes_total
* node_time -> node_time_seconds
* node_boot_time -> node_boot_time_seconds
* node_intr -> node_intr_total

解决版本问题的方法有两种:

  • 一是在机器上启动两个版本的node-exporter,都让prometheus去采集。
  • 二是使用指标转换器,他会将旧指标名称转换为新指标
  • 对于 grafana 的展示,可以找同时支持两套指标的 dashboard 模板

3、实现原理

node-exporter的主函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
go复制代码package main

import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"os/user"
"sort"

"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
"github.com/prometheus/exporter-toolkit/web"
"github.com/prometheus/node_exporter/collector"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

可以看到 exporter 的实现需要引入 github.com/prometheus/client_golang/prometheus 库,client_golang 是 prometheus 的官方 go 库,既可以用于集成现有应用,也可以作为连接 Prometheus HTTP API 的基础库。

比如定义了基础的数据类型以及对应的方法:

1
2
3
4
bash复制代码Counter:收集事件次数等单调递增的数据
Gauge:收集当前的状态,比如数据库连接数
Histogram:收集随机正态分布数据,比如响应延迟
Summary:收集随机正态分布数据,和 Histogram 是类似的

参考地址:github.com/prometheus/…

client_golang 库的详细解析可以参考:Prometheus 原理和源码分析

九、小结

或许有人觉得有了 Prometheus+Grafana+node_exporter 这样的组合工具之后,基本上都不再用手工执行什么命令了。但我们要了解的是,对于监控平台来说,它取的所有的数据必然是被监控者可以提供的数据,像 node_exporter 这样小巧的监控收集器,它可以获取的监控数据,并不是整个系统全部的性能数据,只是取到了常见的计数器而已。这些计数器不管是用命令查看,还是用这样炫酷的工具查看,它的值本身都不会变。所以不管是在监控平台上看到的数据,还是在命令行中看到的数据,我们最重要的是要知道含义以及这些值的变化对性能测试和分析的下一步骤的影响。

参考资料:

  • [1]:www.xuyasong.com/?p=1539
  • [2]:《性能测试实战30讲》

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一文搞定:Linux共享内存原理

发表于 2021-11-25

在Linux系统中,每个进程都有独立的虚拟内存空间,也就是说不同的进程访问同一段虚拟内存地址所得到的数据是不一样的,这是因为不同进程相同的虚拟内存地址会映射到不同的物理内存地址上。

但有时候为了让不同进程之间进行通信,需要让不同进程共享相同的物理内存,Linux通过 共享内存 来实现这个功能。下面先来介绍一下Linux系统的共享内存的使用。

共享内存使用

1. 获取共享内存

要使用共享内存,首先需要使用 shmget() 函数获取共享内存,shmget() 函数的原型如下:

int shmget(key_t key, size_t size, int shmflg);

参数 key 一般由 ftok() 函数生成,用于标识系统的唯一IPC资源。

参数 size 指定创建的共享内存大小。

参数 shmflg 指定 shmget() 函数的动作,比如传入 IPC_CREAT 表示要创建新的共享内存。

函数调用成功时返回一个新建或已经存在的的共享内存标识符,取决于shmflg的参数。失败返回-1,并设置错误码。

2. 关联共享内存

shmget() 函数返回的是一个标识符,而不是可用的内存地址,所以还需要调用 shmat() 函数把共享内存关联到某个虚拟内存地址上。shmat() 函数的原型如下:

void

shmat(int shmid, const void

shmaddr, int shmflg);

参数 shmid 是 shmget() 函数返回的标识符。

参数 shmaddr 是要关联的虚拟内存地址,如果传入0,表示由系统自动选择合适的虚拟内存地址。

参数 shmflg 若指定了 SHM_RDONLY 位,则以只读方式连接此段,否则以读写方式连接此段。

函数调用成功返回一个可用的指针(虚拟内存地址),出错返回-1。

3. 取消关联共享内存

当一个进程不需要共享内存的时候,就需要取消共享内存与虚拟内存地址的关联。取消关联共享内存通过 shmdt() 函数实现,原型如下:

int shmdt(const void *shmaddr);

参数 shmaddr 是要取消关联的虚拟内存地址,也就是 shmat() 函数返回的值。

函数调用成功返回0,出错返回-1。

共享内存使用例子

下面通过一个例子来介绍一下共享内存的使用方法。在这个例子中,有两个进程,分别为 进程A 和 进程B,进程A 创建一块共享内存,然后写入数据,进程B 获取这块共享内存并且读取其内容。

进程A

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
arduino复制代码#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

#define SHM_PATH "/tmp/shm"
#define SHM_SIZE 128

int main(int argc, char *argv[])
{
int shmid;
char *addr;
key_t key = ftok(SHM_PATH, 0x6666);

shmid = shmget(key, SHM_SIZE, IPC_CREAT|IPC_EXCL|0666);
if (shmid < 0) {
printf("failed to create share memory\n");
return -1;
}

addr = shmat(shmid, NULL, 0);
if (addr <= 0) {
printf("failed to map share memory\n");
return -1;
}

sprintf(addr, "%s", "Hello World\n");

return 0;
}

进程B

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
arduino复制代码#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

#define SHM_PATH "/tmp/shm"
#define SHM_SIZE 128

int main(int argc, char *argv[])
{
int shmid;
char *addr;
key_t key = ftok(SHM_PATH, 0x6666);

char buf[128];

shmid = shmget(key, SHM_SIZE, IPC_CREAT);
if (shmid < 0) {
printf("failed to get share memory\n");
return -1;
}

addr = shmat(shmid, NULL, 0);
if (addr <= 0) {
printf("failed to map share memory\n");
return -1;
}

strcpy(buf, addr, 128);
printf("%s", buf);

return 0;
}

测试时先运行进程A,然后再运行进程B,可以看到进程B会打印出 “Hello World”,说明共享内存已经创建成功并且读取。

共享内存实现原理

我们先通过一幅图来了解一下共享内存的大概原理,如下图:

通过上图可知,共享内存是通过将不同进程的虚拟内存地址映射到相同的物理内存地址来实现的,下面将会介绍Linux的实现方式。

在Linux内核中,每个共享内存都由一个名为 struct shmid_kernel 的结构体来管理,而且Linux限制了系统最大能创建的共享内存为128个。通过类型为 struct shmid_kernel 结构的数组来管理,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
arduino复制代码struct shmid_ds {
struct ipc_perm shm_perm; /* operation perms */
int shm_segsz; /* size of segment (bytes) */
__kernel_time_t shm_atime; /* last attach time */
__kernel_time_t shm_dtime; /* last detach time */
__kernel_time_t shm_ctime; /* last change time */
__kernel_ipc_pid_t shm_cpid; /* pid of creator */
__kernel_ipc_pid_t shm_lpid; /* pid of last operator */
unsigned short shm_nattch; /* no. of current attaches */
unsigned short shm_unused; /* compatibility */
void *shm_unused2; /* ditto - used by DIPC */
void *shm_unused3; /* unused */
};

struct shmid_kernel
{
struct shmid_ds u;
/* the following are private */
unsigned long shm_npages; /* size of segment (pages) */
pte_t *shm_pages; /* array of ptrs to frames -> SHMMAX */
struct vm_area_struct *attaches; /* descriptors for attaches */
};

static struct shmid_kernel *shm_segs[SHMMNI]; // SHMMNI等于128

从注释可以知道 struct shmid_kernel 结构体各个字段的作用,比如 shm_npages 字段表示共享内存使用了多少个内存页。而 shm_pages 字段指向了共享内存映射的虚拟内存页表项数组等。

另外 struct shmid_ds 结构体用于管理共享内存的信息,而 shm_segs数组 用于管理系统中所有的共享内存。

shmget() 函数实现

通过前面的例子可知,要使用共享内存,首先需要调用 shmget() 函数来创建或者获取一块共享内存。shmget() 函数的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
ini复制代码asmlinkage long sys_shmget (key_t key, int size, int shmflg)
{
struct shmid_kernel *shp;
int err, id = 0;

down(&current->mm->mmap_sem);
spin_lock(&shm_lock);
if (size < 0 || size > shmmax) {
err = -EINVAL;
} else if (key == IPC_PRIVATE) {
err = newseg(key, shmflg, size);
} else if ((id = findkey (key)) == -1) {
if (!(shmflg & IPC_CREAT))
err = -ENOENT;
else
err = newseg(key, shmflg, size);
} else if ((shmflg & IPC_CREAT) && (shmflg & IPC_EXCL)) {
err = -EEXIST;
} else {
shp = shm_segs[id];
if (shp->u.shm_perm.mode & SHM_DEST)
err = -EIDRM;
else if (size > shp->u.shm_segsz)
err = -EINVAL;
else if (ipcperms (&shp->u.shm_perm, shmflg))
err = -EACCES;
else
err = (int) shp->u.shm_perm.seq * SHMMNI + id;
}
spin_unlock(&shm_lock);
up(&current->mm->mmap_sem);
return err;
}

shmget() 函数的实现比较简单,首先调用 findkey() 函数查找值为key的共享内存是否已经被创建,findkey() 函数返回共享内存在 shm_segs数组 的索引。如果找到,那么直接返回共享内存的标识符即可。否则就调用 newseg() 函数创建新的共享内存。newseg() 函数的实现也比较简单,就是创建一个新的 struct shmid_kernel 结构体,然后设置其各个字段的值,并且保存到 shm_segs数组 中。

shmat() 函数实现

shmat() 函数用于将共享内存映射到本地虚拟内存地址,由于 shmat() 函数的实现比较复杂,所以我们分段来分析这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
objectivec复制代码asmlinkage long sys_shmat (int shmid, char *shmaddr, int shmflg, ulong *raddr)
{
struct shmid_kernel *shp;
struct vm_area_struct *shmd;
int err = -EINVAL;
unsigned int id;
unsigned long addr;
unsigned long len;

down(&current->mm->mmap_sem);
spin_lock(&shm_lock);
if (shmid < 0)
goto out;

shp = shm_segs[id = (unsigned int) shmid % SHMMNI];
if (shp == IPC_UNUSED || shp == IPC_NOID)
goto out;

上面这段代码主要通过 shmid 标识符来找到共享内存描述符,上面说过系统中所有的共享内存到保存在 shm_segs 数组中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ini复制代码if (!(addr = (ulong) shmaddr)) {
if (shmflg & SHM_REMAP)
goto out;
err = -ENOMEM;
addr = 0;
again:
if (!(addr = get_unmapped_area(addr, shp->u.shm_segsz))) // 获取一个空闲的虚拟内存空间
goto out;
if(addr & (SHMLBA - 1)) {
addr = (addr + (SHMLBA - 1)) & ~(SHMLBA - 1);
goto again;
}
} else if (addr & (SHMLBA-1)) {
if (shmflg & SHM_RND)
addr &= ~(SHMLBA-1); /* round down */
else
goto out;
}

上面的代码主要找到一个可用的虚拟内存地址,如果在调用 shmat() 函数时没有指定了虚拟内存地址,那么就通过 get_unmapped_area() 函数来获取一个可用的虚拟内存地址。

1
2
3
4
5
6
7
8
9
10
11
ini复制代码spin_unlock(&shm_lock);
err = -ENOMEM;
shmd = kmem_cache_alloc(vm_area_cachep, SLAB_KERNEL);
spin_lock(&shm_lock);
if (!shmd)
goto out;
if ((shp != shm_segs[id]) || (shp->u.shm_perm.seq != (unsigned int) shmid / SHMMNI)) {
kmem_cache_free(vm_area_cachep, shmd);
err = -EIDRM;
goto out;
}

上面的代码主要通过调用 kmem_cache_alloc() 函数创建一个 vm_area_struct 结构,在内存管理一章知道,vm_area_struct 结构用于管理进程的虚拟内存空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ini复制代码shmd->vm_private_data = shm_segs + id;
shmd->vm_start = addr;
shmd->vm_end = addr + shp->shm_npages * PAGE_SIZE;
shmd->vm_mm = current->mm;
shmd->vm_page_prot = (shmflg & SHM_RDONLY) ? PAGE_READONLY : PAGE_SHARED;
shmd->vm_flags = VM_SHM | VM_MAYSHARE | VM_SHARED
| VM_MAYREAD | VM_MAYEXEC | VM_READ | VM_EXEC
| ((shmflg & SHM_RDONLY) ? 0 : VM_MAYWRITE | VM_WRITE);
shmd->vm_file = NULL;
shmd->vm_offset = 0;
shmd->vm_ops = &shm_vm_ops;

shp->u.shm_nattch++; /* prevent destruction */
spin_unlock(&shm_lock);
err = shm_map(shmd);
spin_lock(&shm_lock);
if (err)
goto failed_shm_map;

insert_attach(shp,shmd); /* insert shmd into shp->attaches */

shp->u.shm_lpid = current->pid;
shp->u.shm_atime = CURRENT_TIME;

*raddr = addr;
err = 0;
out:
spin_unlock(&shm_lock);
up(&current->mm->mmap_sem);
return err;
...
}

上面的代码主要是设置刚创建的 vm_area_struct 结构的各个字段,比较重要的是设置其 vm_ops 字段为 shm_vm_ops,shm_vm_ops 定义如下:

1
2
3
4
5
6
7
8
9
10
11
arduino复制代码static struct vm_operations_struct shm_vm_ops = {
shm_open, /* open - callback for a new vm-area open */
shm_close, /* close - callback for when the vm-area is released */
NULL, /* no need to sync pages at unmap */
NULL, /* protect */
NULL, /* sync */
NULL, /* advise */
shm_nopage, /* nopage */
NULL, /* wppage */
shm_swapout /* swapout */
};

shm_vm_ops 的 nopage 回调为 shm_nopage() 函数,也就是说,当发生页缺失异常时将会调用此函数来恢复内存的映射。

从上面的代码可看出,shmat() 函数只是申请了进程的虚拟内存空间,而共享内存的物理空间并没有申请,那么在什么时候申请物理内存呢?答案就是当进程发生缺页异常的时候会调用 shm_nopage() 函数来恢复进程的虚拟内存地址到物理内存地址的映射。

shm_nopage() 函数实现

shm_nopage() 函数是当发生内存缺页异常时被调用的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
scss复制代码static struct page * shm_nopage(struct vm_area_struct * shmd, unsigned long address, int no_share)
{
pte_t pte;
struct shmid_kernel *shp;
unsigned int idx;
struct page * page;

shp = *(struct shmid_kernel **) shmd->vm_private_data;
idx = (address - shmd->vm_start + shmd->vm_offset) >> PAGE_SHIFT;

spin_lock(&shm_lock);
again:
pte = shp->shm_pages[idx]; // 共享内存的页表项
if (!pte_present(pte)) { // 如果内存页不存在
if (pte_none(pte)) {
spin_unlock(&shm_lock);
page = get_free_highpage(GFP_HIGHUSER); // 申请一个新的物理内存页
if (!page)
goto oom;
clear_highpage(page);
spin_lock(&shm_lock);
if (pte_val(pte) != pte_val(shp->shm_pages[idx]))
goto changed;
} else {
...
}
shm_rss++;
pte = pte_mkdirty(mk_pte(page, PAGE_SHARED)); // 创建页表项
shp->shm_pages[idx] = pte; // 保存共享内存的页表项
} else
--current->maj_flt; /* was incremented in do_no_page */

done:
get_page(pte_page(pte));
spin_unlock(&shm_lock);
current->min_flt++;
return pte_page(pte);
...
}

shm_nopage() 函数的主要功能是当发生内存缺页时,申请新的物理内存页,并映射到共享内存中。由于使用共享内存时会映射到相同的物理内存页上,从而不同进程可以共用此块内存。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

AsposeWords for Java2111去除水印

发表于 2021-11-25

前言
工欲善其事,必先利其器

源码分析

1. 下载Aspose.Words for Java21.11官方jar包

2. 开始分析

  1. 调用授权方法
1
2
3
java复制代码InputStream is = new FileInputStream("..license.xml");
License license = new License();
license.setLicense(is);

license.xml文件内容这里是个过期的文件主要是格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
xml复制代码<License>
<Data>
<Products>
<Product>Aspose.Total for Java</Product>
<Product>Aspose.Words for Java</Product>
</Products>
<EditionType>Enterprise</EditionType>
<SubscriptionExpiry>20991231</SubscriptionExpiry>
<LicenseExpiry>20991231</LicenseExpiry>
<SerialNumber>8bfe198c-7f0c-4ef8-8ff0-acc3237bf0d7</SerialNumber>
</Data>
<Signature>
sNLLKGMUdF0r8O1kKilWAGdgfs2BvJb/2Xp8p5iuDVfZXmhppo+d0Ran1P9TKdjV4ABwAgKXxJ3jcQTqE/2IRfqwnPf8itN8aFZlV3TJPYeD3yWE7IT55Gz6EijUpC7aKeoohTb4w2fpox58wWoF3SNp6sK6jDfiAUGEHYJ9pjU=
</Signature>
</License>
  1. 分析License类的setLicense方法找到关键代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public void setLicense(String licenseName) throws Exception {
if (licenseName == null) {
throw new NullPointerException(zzVu.zzIR().zzZ42(new byte[]{105, 108, 101, 99, 115, 110, 78, 101, 109, 97, 101}));
} else {
(new zzXDb()).zzY0J(licenseName, zzWJD.zzWIQ());
}
}

public void setLicense(InputStream stream) throws Exception {
if (stream == null) {
throw new NullPointerException(zzVu.zzIR().zzZ42(new byte[]{116, 115, 101, 114, 109, 97}));
} else {
(new zzXDb()).zzY0J(stream);
}
}

setLicense的两个重载方法最终都调用了(new zzXDb()).zzY0J(stream);中的zzY0J方法,进入zzY0J方法观察代码发现重点在于void zzY0J方法下面的 (InputStream var1) throws Exception这个重载方法里面,但是里面代码很多不太好找到关键代码,所以转头去寻找关于验证对外调用的静态方法,最终找到了在zzY0J方法下面的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码static byte[] zzX8p() {
boolean var0 = zzWiV == null || zzWiV.zzWSL == zzYeQ.zzX0q || (new Date()).after(zzWiV.zzZ3l) || zzYKk.zzWy3() == 4096;
if (zzW5s == 0L) {
zzW5s ^= zzVWj;
}

boolean var1 = false;
if (zzZB8.zzxn() != null) {
var1 = zzZB8.zzZ7p() == zzu3.zzX0q;
byte[] var2 = var0 && var1 ? zzYeQ.zzX0q : zzYeQ.zzXgr;
return var2;
} else {
return null;
}
}

static byte[] zzWQR() {
boolean var0 = zzWiV == null || zzWiV.zzWSL == zzYeQ.zzX0q || (new Date()).after(zzWiV.zzZ3l) || zzYKk.zzWy3() == 4096;
boolean var1 = zzZB8.zzZ7p() == zzu3.zzX0q;
byte[] var2 = var0 && var1 ? zzYeQ.zzX0q : zzYeQ.zzXgr;
return var2;
}

这两个方法主要在于对外返回了一个byte数组,返回值是zzYeQ中的静态常量,所以重点就在于上面的判断语句boolean var0 = zzWiV == null || zzWiV.zzWSL == zzYeQ.zzX0q || (new Date()).after(zzWiV.zzZ3l) || zzYKk.zzWy3() == 4096;让它返回什么数据。

这里需要分析zzWiV.zzWSL zzWiV.zzZ3l zzYKk.zzWy3()这三个数据,在当前zzXDbclass文件中搜索找到在void zzY0J(InputStream var1) throws Exception方法中关键的关键位置赋值了

1
2
java复制代码this.zzWSL = zzYeQ.zzXgr;
zzWiV = this;

观察它上下位置代码发现看起来都是做验证错误的处理,所以可以尝试去掉上下的验证。

再来看zzWiV.zzZ3l变量属性为Date应该是时间什么的可以直接给个最大值。

然后是zzYKk.zzWy3()进入看到

1
2
3
java复制代码static int zzWy3() {
return zzYU8 == 128 && !zzyS ? 256 : 4096;
}

那么返回值就是256和4096二选一,尝试后选择返回256。

3. 分析结果

  • 修改void zzY0J(InputStream var1)方法体为
1
2
3
java复制代码this.zzZ3l = new java.util.Date(Long.MAX_VALUE);//Date赋值最大值
this.zzWSL = zzYeQ.zzXgr;//直接返回验证成功的执行
zzWiV = this;//直接返回验证成功的执行
  • 修改zzYKk类下的static int zzWy3()方法体为
1
java复制代码return 256;

执行操作

1. 添加Javassist修改class字节码文件

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.28.0-GA</version>
</dependency>

2. 添加修改方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码/**
* 修改words jar包里面的校验
*/
public static void modifyWordsJar() {
try {
//这一步是完整的jar包路径,选择自己解压的jar目录
ClassPool.getDefault().insertClassPath("D:\\aspose-words-21.11.0-java\\lib\\aspose-words-21.11.0-jdk17.jar");
//获取指定的class文件对象
CtClass zzZJJClass = ClassPool.getDefault().getCtClass("com.aspose.words.zzXDb");
//从class对象中解析获取指定的方法
CtMethod[] methodA = zzZJJClass.getDeclaredMethods("zzY0J");
//遍历重载的方法
for (CtMethod ctMethod : methodA) {
CtClass[] ps = ctMethod.getParameterTypes();
if (ctMethod.getName().equals("zzY0J")) {
System.out.println("ps[0].getName==" + ps[0].getName());
//替换指定方法的方法体
ctMethod.setBody("{this.zzZ3l = new java.util.Date(Long.MAX_VALUE);this.zzWSL = com.aspose.words.zzYeQ.zzXgr;zzWiV = this;}");
}
}
//这一步就是将破译完的代码放在桌面上
zzZJJClass.writeFile("C:\\Users\\roc\\Desktop\\");

//获取指定的class文件对象
CtClass zzZJJClassB = ClassPool.getDefault().getCtClass("com.aspose.words.zzYKk");
//从class对象中解析获取指定的方法
CtMethod methodB = zzZJJClassB.getDeclaredMethod("zzWy3");
//替换指定方法的方法体
methodB.setBody("{return 256;}");
//这一步就是将破译完的代码放在桌面上
zzZJJClassB.writeFile("C:\\Users\\roc\\Desktop\\");
} catch (Exception e) {
System.out.println("错误==" + e);
}
}

运行修改方法后会在桌面生成 com 修改后的文件夹

3. 修改jar包里面的数据

为了不修改原jar包建议复制一份重命名。

  1. 打开jar包将桌面com文件夹覆盖到jar包com文件夹

image.png
2. 删除jar包里面的.RSA和.SF文件

image.png

4. 重新导入修改后的jar包进行测试

  1. maven移除旧的jar包,导入修改后的jar包
  2. 调用测试方法进行测试转换后的文件是否去除水印和数量限制成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码String sourceFile = "D:\\b.doc";//输入的文件
String targetFile = "D:\\转换后.pdf";//输出的文件
/**
* Word转PDF操作
*
* @param sourceFile 源文件
* @param targetFile 目标文件
*/
public static void doc2pdf(String sourceFile, String targetFile) {
try {
long old = System.currentTimeMillis();
FileOutputStream os = new FileOutputStream(targetFile);
com.aspose.words.Document doc = new com.aspose.words.Document(sourceFile);
doc.save(os, com.aspose.words.SaveFormat.PDF);
os.close();
long now = System.currentTimeMillis();
System.out.println("共耗时:" + ((now - old) / 1000.0) + "秒"); //转化用时
} catch (Exception e) {
e.printStackTrace();
}
}

声明
本方法只做个人研究学习使用,切勿用于商用。
其他参考
Aspose.PDF for Java21.11去除水印和数量限制

Aspose.Cells for Java21.11去除水印和数量限制

Aspose.Slides for Java21.10去除水印和数量限制

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【源码共读】YARN HA方案ResourceManager

发表于 2021-11-25

「这是我参与11月更文挑战的第3天,活动详情查看:2021最后一次更文挑战」

YARN HA方案 EmbeddedElector +StandByTransitionRunnable

执行原理 : YARN HA ResourceManager的选举,首先由EmbeddedElector来做最多3次的同步尝试选举,如果没有选举成功,则交给StandByTransitionRunnable线程来维护选举

首先来研究YARN HA的实现方案:

先来简要的说明上图:

  • 灰色部分代表的是zookeeper中的部分,/是根目录,在选举方案中根目录下面有一个znode = yarn-leader-election,在该znode下面还有一个znode代表的clusterID
  • 当开启了HA模式的时候,集群启动,两个ResourceManager就会争相竞选,的在clusterID的znode下面创建ActiveStandbyElectorLock锁节点,两个ResourceManager谁首先创建了该节点谁就是active竞选成功,然后将自己的 ResourceManagerID 写到此节点中,并且创建另一个znode(ActiveBreadCrumb)来存储active节点的一些信息
  • 另一个竞选失败的RM将会监听ActiveStandbyElectorLock,当集群中active发展故障下线的时候,ActiveStandbyElectorLock锁节点就会被删除,然后standby节点就会开始争抢创建锁节点

下面从源码中来印证,开启了HA,集群启动时候,会走ResourceManager.serviceInit() 方法的elector服务来进行竞选,也就是 ActiveStandbyElectorBasedElectorService类,其serviceInit()方法之前的文章(Yarn ResourceManager启动流程源码解读)已经叙述,下面直接从serviceStart()方法入手:

进行选举的入口

localActiveNodeInfo = clusterid + rmid 对应zookeeper上面znode所存储的数据的内容

1
2
3
4
java复制代码protected void serviceStart() throws Exception {
   elector.joinElection(localActiveNodeInfo);
   super.serviceStart();
}

下面详看 joinElection(localActiveNodeInfo) 方法

首先是拷贝数据,,然后转到另一个方法joinElectionInternal()

1
2
ini复制代码appData = new byte[data.length];
joinElectionInternal()

详看 joinElectionInternal() 方法

确保Client的存在

1
2
3
4
5
6
ini复制代码if (zkClient == null) {
     if (!reEstablishSession()) {
       fatalError("Failed to reEstablish connection with ZooKeeper");
       return;
    }
}

通过异步的方式创建分布式锁节点

1
2
3
4
javascript复制代码createLockNodeAsync(){
   //创建一个选举的临时节点 EPHEMERAL
   zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,this, zkClient);
}

上述代码的cb为一个回调函数(第五个参数),当zookeeper创建节点后转到回调函数进行相关的处理

回调函数cb为:callback = ActiveStandbyElector.processResult()

下面详细的查看函数processResult()

  1. 当创建锁节点成功的时候,当前的节点就会变为active状态
1
2
3
4
5
6
7
8
9
10
11
12
scss复制代码processResult(){
   if (isSuccess(code)) {
     // 当成功创建了znode的时候(ActiveStandbyElectorLock),状态就会编程active leader ,开启监控
     //todo 成为active
     if (becomeActive()) {
       monitorActiveStatus();
    } else {
       reJoinElectionAfterFailureToBecomeActive();
    }
     return;
  }
}

becomeActive() 方法的具体内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scss复制代码ActiveStandbyElector.becomeActive(){
   //如果已经成为了active节点 说明该做的事情已经做完了 直接返回
   if (state == State.ACTIVE) {
     // already active
     return true;
  }
   
   //当没有成为active节点的时候
   //创建 ActiveBreadCrumb znode节点
   writeBreadCrumbNode(oldBreadcrumbStat);
   //进行状态的切换
   appClient.becomeActive();
   //对state成员变量赋值为ACTIVE
   state = State.ACTIVE;
}

跳转到 ActiveStandbyElectorBasedElectorService.becomeActive() 方法

将active节点才能启动的Service都进行启动

1
2
3
4
5
6
7
8
9
10
11
12
13
scss复制代码rm.getRMContext().getRMAdminService().transitionToActive(req){
   rm.transitionToActive(){
       public Void run() throws Exception {
         //todo 启动相关的服务
         startActiveServices(){
             if (activeServices != null) {
    clusterTimeStamp = System.currentTimeMillis();
    activeServices.start();
  }
        }
    }
  }
}

走到此处的时候,就跳转到另一个Service activeServices ,直接看该Service的serviceStart()方法即可


  1. 当没有成功创建的时候 isSuccess(code) = false

此时有两种情况:

1
2
markdown复制代码1. 节点已经存在,别人已经创建成功
2. 节点不存在,别人没有创建成功,但是我也没有创建成功

第一种,别人已经创建成功锁节点,isNodeExists(code) = true ,此时当前节点直接成为standby状态

1
2
3
4
5
6
7
scss复制代码if (isNodeExists(code)) {
     if (createRetryCount == 0) {
       becomeStandby();
    }
     monitorActiveStatus();
     return;
  }

becomeStandby() 方法的具体内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
scss复制代码ActiveStandbyElectorBasedElectorService.becomeStandby(){
   rm.getRMContext().getRMAdminService().transitionToStandby(req){
       rm.transitionToStandby(true){
           //todo 获取状态
  HAServiceState state = rmContext.getHAServiceState();
  //todo 将状态设置为standby
  rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
           //如果获取到之前的状态为actice 则关闭那些active的服务
  if (state == HAServiceProtocol.HAServiceState.ACTIVE) {
      //todo 关闭active service
    stopActiveServices();
    //todo initialize = true
    reinitialize(initialize){
               ClusterMetrics.destroy();
  QueueMetrics.clearQueueMetrics();
  getResourceScheduler().resetSchedulerMetrics();
  //todo true
  if (initialize) {
                   //重置 上下文环境
    resetRMContext();
                   //创建并且初始化新的activeService
    createAndInitActiveServices(true);
  }
          }
      }
  }
}

第二种,没有成功创建锁节点并且锁节点到目前为止还没有被创建成功

  • 当重试的次数小于最大的重试次数的时候就重新进行竞选,createLockNodeAsync() 就又返回到创建锁节点的地方
1
2
3
4
5
ini复制代码if (createRetryCount < maxRetryNum) {
       ++createRetryCount;
       createLockNodeAsync();
       return;
    }
  • 当重试的次数大于了 maxRetryNum 的时候

最后交给线程StandByTransitionRunnable来选举 使用一个守护线程来进行选举

不要阻塞ResourceManager主线程其它服务的启动

1
2
3
4
5
6
7
8
scss复制代码fatalError(errorMessage){
   appClient.notifyFatalError{
       notifyFatalError{
           // RMFatalEventDispatcher.handle
           rm.getRMContext().getDispatcher().getEventHandler().handle(new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,errorMessage));
      }
  }
}

上面的方法用到了AsyncDispatcher,跳转到 RMFatalEventDispatcher.handle()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
typescript复制代码RMFatalEventDispatcher.handle(){
   //启动线程 StandByTransitionRunnable
   handleTransitionToStandByInNewThread(){
       standByTransitionThread.start(){
           public void run() {
               //获得选举的选举器 elector
               EmbeddedElector elector = rmContext.getLeaderElectorService();
               //重新加入选举
          elector.rejoinElection(){
                   //退出可能存在的选举
  elector.quitElection(false);
  //重新加入选举
  elector.joinElection(localActiveNodeInfo){
                       //拷贝数据
  appData = new byte[data.length];
                       //选举
  joinElectionInternal();
                  }
              }
          }
      }
  }
}

此处就转到了原来的方法 joinElectionInternal() 重新进行选举,之后的流程就完全相同了

至此,YARN HA 的ResourceManager的选举就全部结束


本人在学习的路上,上述文章如有错误还请指教批评。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

垃圾回收器 垃圾回收器 扩展 小结

发表于 2021-11-25

「这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战」

垃圾回收器

《Java 虚拟机规范》并没有对垃圾收集器的具体实现做任何的规定,因此每家垃圾收集器的实现方式都不同,但比较常用的垃圾回收器是 OracleJDK 中自带的 HotSpot 虚拟机。HotSpot 中使用的垃圾收集器主要包括 7 个:Serial、ParNew、Parallel Scavenge、Serial Old、Parallel Old、CMS 和 G1(Garbage First)收集器。

Serial 收集器属于最早期的垃圾收集器,也是 JDK 1.3 版本之前唯一的垃圾收集器。它是单线程运行的垃圾收集器,其单线程是指在进行垃圾回收时所有的工作线程必须暂停,直到垃圾回收结束为止。
Serial 收集器的特点是简单和高效,并且本身的运行对内存要求不高,因此它在客户端模式下使用的比较多。

ParNew收集器实际上是 Serial 收集器的多线程并行版本。
Parallel Scavenge 收集器和 ParNew 收集器类似,它也是一个并行运行的垃圾回收器;不同的是,该收集器关注的侧重点是实现一个可以控制的吞吐量。而这个吞吐量计算的也很奇怪,它的计算公式是:用户运行代码的时间 / (用户运行代码的时间 + 垃圾回收执行的时间)。比如用户运行的时间是 8 分钟,垃圾回收运行的时间是 2 分钟,那么吞吐量就是 80%。Parallel Scavenge 收集器追求的目标就是将这个吞吐量的值,控制在一定的范围内。

Parallel Scavenge 收集器有两个重要的参数:

  • -XX:MaxGCPauseMillis 参数:它是用来控制垃圾回收的最大停顿时间;
  • -XX:GCTimeRatio 参数:它是用来直接设置吞吐量的值的。

Serial Old 收集器为 Serial 收集器的老年代版本,而 Parallel Old 收集器是 Parallel Scavenge 收集器的老年代版本。

CMS(Concurrent Mark Sweep)收集器与以吞吐量为目标的 Parallel Scavenge 收集器不同,它强调的是提供最短的停顿时间,因此可能会牺牲一定的吞吐量。它主要应用在 Java Web 项目中,它满足了系统需要短时间停顿的要求,以此来提高用户的交互体验。

Garbage First(简称 G1)收集器是历史发展的产物,也是一款更先进的垃圾收集器,主要面向服务端应用的垃圾收集器。它将内存划分为多个 Region 分区,回收时则以分区为单位进行回收,这样它就可以用相对较少的时间优先回收包含垃圾最多区块。从 JDK 9 之后也成了官方默认的垃圾收集器,官方也推荐使用 G1 来代替选择 CMS 收集器。

扩展

1.分代收集

说到垃圾收集器不得不提的一个理论就是“分代收集”,因为目前商用虚拟机的垃圾收集器都是基于分代收集的理论进行设计的,它是指将不同“年龄”的数据分配到不同的内存区域中进行存储,所谓的“年龄”指的是经历过垃圾收集的次数。这样我们就可以把那些朝生暮死的对象集中分配到一起,把不容易消亡的对象分配到一起,对于不容易死亡的对象我们就可以设置较短的垃圾收集频率,这样就能消耗更少的资源来实现更理想的功能了。

通常情况下分代收集算法会分为两个区域:新生代(Young Generation)和老年代(Old Generation),其中新生代用于存储刚刚创建的对象,这个区域内的对象存活率不高,而对于经过了一定次数的 GC 之后还存活下来的对象,就可以成功晋级到老生代了。

对于上面介绍的 7 个垃圾收集器来说,新生代垃圾收集器有:Serial、ParNew、Parallel Scavenge,老生代的垃圾收集器有:Serial Old、Parallel Old、CMS,而 G1 属于混合型的垃圾收集器

image.png

  1. CMS 收集器的具体执行流程

CMS 收集器是基于标记-清除算法实现的,我们之前有讲过关于标记-清除的算法,这里简单地回顾一下。标记-清除的算法是由标记阶段和清除阶段构成的,标记阶段会给所有的存活对象做上标记;而清除阶段会把被标记为死亡的对象进行回收,而死亡对象的判断是通过引用计数法或者是目前主流的可达性分析算法实现的。但是 CMS 的实现稍微复杂一些,它的整个过程可以分为四个阶段:

  • 初始标记(CMS initial mark)
  • 并发标记(CMS concurrent mark)
  • 重新标记(CMS remark)
  • 并发清除(CMS concurrent sweep)

首先,初始标记阶段的执行时间很短,它只是标记一下 GC Roots 的关联对象;并发阶段是从 GC Roots 关联的对象进行遍历判断并标识死亡对象,这个过程比较慢,但不需要停止用户线程,用户的线程可以和垃圾收集线程并发执行;而重新标记阶段则是为了判断并标记,刚刚并发阶段用户继续运行的那一部分对象,所以此阶段的执行时间也比较短;最后是并发清除阶段,也就是清除上面标记的死亡对象,由于 CMS 使用的是标记-清除算法,而非标记-整理算法,因此无须移动存活的对象,这个阶段垃圾收集线程也可以和用户线程并发执行。

CMS 的整个执行过程中只有执行时间很短的初始标记和重新标记需要 Stop The World(全局停顿)
因为 CMS 是一款基于标记清除算法实现的垃圾收集器,因此会在收集时产生大量的空间碎片,为了解决这个问题,CMS 收集器提供了一个 -XX:+UseCMS-CompactAtFullCollection 的参数(默认是开启的,此参数从 JDK9 开始废弃),用于在 CMS 收集器进行 Full GC 时开启内存碎片的合并和整理。

但又因为碎片整理的过程必须移动存活的对象,所以它和用户线程是无法并发执行的,为了解决这个问题 CMS 收集器又提供了另外一个参数-XX:CMSFullGCsBefore-Compaction,用于规定多少次(根据此参数的值决定)之后再进行一次碎片整理。

  1. ZGC

ZGC 收集器是 JDK 11 中新增的垃圾收集器,它是由 Oracle 官方开发的,并且支持 TB 级别的堆内存管理,而且 ZGC 收集器也非常高效,可以做到 10ms 以内完成垃圾收集。

在 ZGC 收集器中没有新生代和老生代的概念,它只有一代。ZGC 收集器采用的着色指针技术,利用指针中多余的信息位来实现着色标记,并且 ZGC 使用了读屏障来解决 GC 线程和应用线程可能存在的并发(修改对象状态的)问题,从而避免了Stop The World(全局停顿),因此使得 GC 的性能大幅提升。

ZGC 的执行流程和 CMS 比较相似,首先是进行 GC Roots 标记,然后再通过指针进行并发着色标记,之后便是对标记为死亡的对象进行回收(被标记为橘色的对象),最后是重定位,将 GC 之后存活的对象进行移动,以解决内存碎片的问题。

小结

上面介绍了 JDK 11 之前的 7 种垃圾收集器:Serial、Serial Old、ParNew、Parallel Scavenge、Parallel Old、CMS、G1,其中 CMS 收集器是 JDK 8 之前的主流收集器,而 JDK 9 之后的默认收集器为 G1,并且在文章的最后,介绍了性能更加强悍、综合表现更好的 ZGC 收集器

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

C++ 线程的使用

发表于 2021-11-25

C++11 之前,C++ 语言没有对并发编程提供语言级别的支持,这使得我们在编写可移植的并发程序时,存在诸多的不便。现在 C++11 中增加了线程以及线程相关的类,很方便地支持了并发编程,使得编写的多线程程序的可移植性得到了很大的提高。\

C++11 中提供的线程类叫做 std::thread,基于这个类创建一个新的线程非常的简单,只需要提供线程函数或者函数对象即可,并且可以同时指定线程函数的参数。我们首先来了解一下这个类提供的一些常用 API:

  1. 构造函数

1
vbnet复制代码// ①thread() noexcept;// ②thread( thread&& other ) noexcept;// ③template< class Function, class... Args >explicit thread( Function&& f, Args&&... args );// ④thread( const thread& ) = delete;

构造函数①:默认构造函,构造一个线程对象,在这个线程中不执行任何处理动作

构造函数②:移动构造函数,将 other 的线程所有权转移给新的 thread 对象。之后 other 不再表示执行线程。

构造函数③:创建线程对象,并在该线程中执行函数 f 中的业务逻辑,args 是要传递给函数 f 的参数

任务函数 f 的可选类型有很多,具体如下:

  • 普通函数,类成员函数,匿名函数,仿函数(这些都是可调用对象类型)
  • 可以是可调用对象包装器类型,也可以是使用绑定器绑定之后得到的类型(仿函数)

构造函数④:使用 =delete 显示删除拷贝构造,不允许线程对象之间的拷贝

2.1 get_id()

应用程序启动之后默认只有一个线程,这个线程一般称之为主线程或父线程,通过线程类创建出的线程一般称之为子线程,每个被创建出的线程实例都对应一个线程 ID,这个 ID 是唯一的,可以通过这个 ID 来区分和识别各个已经存在的线程实例,这个获取线程 ID 的函数叫做 get_id(),函数原型如下:

1
arduino复制代码std::thread::id get_id() const noexcept;

示例程序如下:

1
2
3
4
c复制代码#include <iostream>#include <thread>#include <chrono>using namespace std;
void func(int num, string str){ for (int i = 0; i < 10; ++i) { cout << "子线程: i = " << i << "num: " << num << ", str: " << str << endl; }}
void func1(){ for (int i = 0; i < 10; ++i) { cout << "子线程: i = " << i << endl; }}
int main(){ cout << "主线程的线程ID: " << this_thread::get_id() << endl; thread t(func, 520, "i love you"); thread t1(func1); cout << "线程t 的线程ID: " << t.get_id() << endl; cout << "线程t1的线程ID: " << t1.get_id() << endl;}
  1. thread t(func, 520, "i love you");:创建了子线程对象 t,func() 函数会在这个子线程中运行
  • func() 是一个回调函数,线程启动之后就会执行这个任务函数,程序猿只需要实现即可
  • func() 的参数是通过 thread 的参数进行传递的,520,i love you 都是调用 func() 需要的实参
  • 线程类的构造函数③ 是一个变参函数,因此无需担心线程任务函数的参数个数问题
  • 任务函数 func() 一般返回值指定为 void,因为子线程在调用这个函数的时候不会处理其返回值
  1. thread t1(func1);:子线程对象 t1 中的任务函数func1(),没有参数,因此在线程构造函数中就无需指定了 通过线程对象调用 get_id() 就可以知道这个子线程的线程 ID 了,t.get_id(),t1.get_id()。
  2. 基于命名空间 this_thread 得到当前线程的线程 ID

在上面的示例程序中有一个 bug,在主线程中依次创建出两个子线程,打印两个子线程的线程 ID,最后主线程执行完毕就退出了(主线程就是执行 main () 函数的那个线程)。默认情况下,主线程销毁时会将与其关联的两个子线程也一并销毁,但是这时有可能子线程中的任务还没有执行完毕,最后也就得不到我们想要的结果了。

当启动了一个线程(创建了一个 thread 对象)之后,在这个线程结束的时候(std::terminate ()),我们如何去回收线程所使用的资源呢?thread 库给我们两种选择:

  • 加入式(join())
  • 分离式(detach())

另外,我们必须要在线程对象销毁之前在二者之间作出选择,否则程序运行期间就会有 bug 产生。

2.2 join()

join() 字面意思是连接一个线程,意味着主动地等待线程的终止(线程阻塞)。在某个线程中通过子线程对象调用 join() 函数,调用这个函数的线程被阻塞,但是子线程对象中的任务函数会继续执行,当任务执行完毕之后 join() 会清理当前子线程中的相关资源然后返回,同时,调用该函数的线程解除阻塞继续向下执行。

再次强调,我们一定要搞清楚这个函数阻塞的是哪一个线程,函数在哪个线程中被执行,那么函数就阻塞哪个线程。该函数的函数原型如下:

1
csharp复制代码void join();

有了这样一个线程阻塞函数之后,就可以解决在上面测试程序中的 bug 了,如果要阻塞主线程的执行,只需要在主线程中通过子线程对象调用这个方法即可,当调用这个方法的子线程对象中的任务函数执行完毕之后,主线程的阻塞也就随之解除了。修改之后的示例代码如下:

1
c复制代码int main(){    cout << "主线程的线程ID: " << this_thread::get_id() << endl;    thread t(func, 520, "i love you");    thread t1(func1);    cout << "线程t 的线程ID: " << t.get_id() << endl;    cout << "线程t1的线程ID: " << t1.get_id() << endl;    t.join();    t1.join();}

当主线程运行到第八行 t.join();,根据子线程对象 t 的任务函数 func() 的执行情况,主线程会做如下处理:

  • 如果任务函数 func() 还没执行完毕,主线程阻塞,直到任务执行完毕,主线程解除阻塞,继续向下运行
  • 如果任务函数 func() 已经执行完毕,主线程不会阻塞,继续向下运行

同样,第 9 行的代码亦如此。

为了更好的理解 join() 的使用,再来给大家举一个例子,场景如下:

程序中一共有三个线程,其中两个子线程负责分段下载同一个文件,下载完毕之后,由主线程对这个文件进行下一步处理,那么示例程序就应该这么写:

1
2
3
4
5
c复制代码#include <iostream>#include <thread>#include <chrono>using namespace std;
void download1(){ // 模拟下载, 总共耗时500ms,阻塞线程500ms this_thread::sleep_for(chrono::milliseconds(500)); cout << "子线程1: " << this_thread::get_id() << ", 找到历史正文...." << endl;}
void download2(){ // 模拟下载, 总共耗时300ms,阻塞线程300ms this_thread::sleep_for(chrono::milliseconds(300)); cout << "子线程2: " << this_thread::get_id() << ", 找到历史正文...." << endl;}
void doSomething(){ cout << "集齐历史正文, 呼叫罗宾...." << endl; cout << "历史正文解析中...." << endl; cout << "起航,前往拉夫德尔...." << endl; cout << "找到OnePiece, 成为海贼王, 哈哈哈!!!" << endl; cout << "若干年后,草帽全员卒...." << endl; cout << "大海贼时代再次被开启...." << endl;}
int main(){ thread t1(download1); thread t2(download2); // 阻塞主线程,等待所有子线程任务执行完毕再继续向下执行 t1.join(); t2.join(); doSomething();}

示例程序输出的结果:

1
makefile复制代码子线程2: 72540, 找到历史正文....子线程1: 79776, 找到历史正文....集齐历史正文, 呼叫罗宾....历史正文解析中....起航,前往拉夫德尔....找到OnePiece, 成为海贼王, 哈哈哈!!!若干年后,草帽全员卒....大海贼时代再次被开启....

在上面示例程序中最核心的处理是在主线程调用 doSomething(); 之前在第 35、36行通过子线程对象调用了 join() 方法,这样就能够保证两个子线程的任务都执行完毕了,也就是文件内容已经全部下载完成,主线程再对文件进行后续处理,如果子线程的文件没有下载完毕,主线程就去处理文件,很显然从逻辑上讲是有问题的。

2.3 detach()

detach() 函数的作用是进行线程分离,分离主线程和创建出的子线程。在线程分离之后,主线程退出也会一并销毁创建出的所有子线程,在主线程退出之前,它可以脱离主线程继续独立的运行,任务执行完毕之后,这个子线程会自动释放自己占用的系统资源。(其实就是孩子翅膀硬了,和家里断绝关系,自己外出闯荡了,如果家里被诛九族还是会受牵连)。该函数函数原型如下:

1
csharp复制代码void detach();

线程分离函数没有参数也没有返回值,只需要在线程成功之后,通过线程对象调用该函数即可,继续将上面的测试程序修改一下:

1
c复制代码int main(){    cout << "主线程的线程ID: " << this_thread::get_id() << endl;    thread t(func, 520, "i love you");    thread t1(func1);    cout << "线程t 的线程ID: " << t.get_id() << endl;    cout << "线程t1的线程ID: " << t1.get_id() << endl;    t.detach();    t1.detach();    // 让主线程休眠, 等待子线程执行完毕    this_thread::sleep_for(chrono::seconds(5));}

注意事项:线程分离函数 detach () 不会阻塞线程,子线程和主线程分离之后,在主线程中就不能再对这个子线程做任何控制了,比如:通过 join () 阻塞主线程等待子线程中的任务执行完毕,或者调用 get_id () 获取子线程的线程 ID。有利就有弊,鱼和熊掌不可兼得,建议使用 join ()。

2.5 joinable()

joinable() 函数用于判断主线程和子线程是否处理关联(连接)状态,一般情况下,二者之间的关系处于关联状态,该函数返回一个布尔类型:

  • 返回值为 true:主线程和子线程之间有关联(连接)关系
  • 返回值为 false:主线程和子线程之间没有关联(连接)关系
1
arduino复制代码bool joinable() const noexcept;

示例代码如下:

1
2
3
4
5
6
c复制代码#include <iostream>#include <thread>#include <chrono>using namespace std;
void foo(){ this_thread::sleep_for(std::chrono::seconds(1));}
int main(){ thread t; cout << "before starting, joinable: " << t.joinable() << endl;
t = thread(foo); cout << "after starting, joinable: " << t.joinable() << endl;
t.join(); cout << "after joining, joinable: " << t.joinable() << endl;
thread t1(foo); cout << "after starting, joinable: " << t1.joinable() << endl; t1.detach(); cout << "after detaching, joinable: " << t1.joinable() << endl;}

示例代码打印的结果如下:

1
yaml复制代码before starting, joinable: 0after starting, joinable: 1after joining, joinable: 0after starting, joinable: 1after detaching, joinable: 0

基于示例代码打印的结果可以得到以下结论:

  • 在创建的子线程对象的时候,如果没有指定任务函数,那么子线程不会启动,主线程和这个子线程也不会进行连接
  • 在创建的子线程对象的时候,如果指定了任务函数,子线程启动并执行任务,主线程和这个子线程自动连接成功
  • 子线程调用了detach()函数之后,父子线程分离,同时二者的连接断开,调用joinable()返回false
  • 在子线程调用了join()函数,子线程中的任务函数继续执行,直到任务处理完毕,这时join()会清理(回收)当前子线程的相关资源,所以这个子线程和主线程的连接也就断开了,因此,调用join()之后再调用joinable()会返回false。

2.6 operator=

线程中的资源是不能被复制的,因此通过 = 操作符进行赋值操作最终并不会得到两个完全相同的对象。

1
2
3
ini复制代码

// move (1) thread& operator= (thread&& other) noexcept;// copy [deleted] (2) thread& operator= (const other&) = delete;

通过以上 = 操作符的重载声明可以得知:

  • 如果 other 是一个右值,会进行资源所有权的转移
  • 如果 other 不是右值,禁止拷贝,该函数被显示删除(=delete),不可用
  1. 静态函数

thread 线程类还提供了一个静态方法,用于获取当前计算机的 CPU 核心数,根据这个结果在程序中创建出数量相等的线程,每个线程独自占有一个 CPU 核心,这些线程就不用分时复用 CPU 时间片,此时程序的并发效率是最高的。

1
arduino复制代码static unsigned hardware_concurrency() noexcept;

示例代码如下:

1
2
c复制代码#include <iostream>#include <thread>using namespace std;
int main(){ int num = thread::hardware_concurrency(); cout << "CPU number: " << num << endl;}
  1. C 线程库

C 语言提供的线程库不论在 window 还是 Linux 操作系统中都是可以使用的,看明白了这些 C 语言中的线程函数之后会发现它和上面的 C++ 线程类使用很类似(其实就是基于面向对象的思想进行了封装),但 C++ 的线程类用起来更简单一些,感兴趣的可以一看。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…197198199…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%