开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

NIO编程(八)—— 多线程优化服务端

发表于 2021-11-16

这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战

多线程架构

之前说到的服务端程序都是在一个线程上进行的,这个线程不仅负责连接客户端发来的请求,同时还要处理读写事件,这样效率还是不够高。如今电脑都是多核处理器,这意味着可以同时进行多个线程,所以服务端应该充分利用这一点。

服务端线程可以建立多个线程,将这些线程分成两组:

  • 其中一个线程专门处理 accept 事件,称为Boss线程
  • 创建 cpu 核心数的线程,每个线程配一个Selector,轮流处理 read 事件,称为Worker线程

他们之间的的关系可以通过下面这张图进行理解:

在这里插入图片描述

Boss线程只负责Accept事件,Worker线程负责客户端与服务端之间的读写问题,他们都各自维护一个Selector负责监听通道的事件。当Boss线程检测到有客户端的连接请求,就会把这个连接返回的SocketChannel注册到某一个Worker线程上。当有读写事件发生时,其中一个Worker线程就会检测到事件,就会在该线程中进行处理,这样的设计做到了功能在线程上的分离。

Worker类的实现

上面分析了Worker类是一个监听并且处理读写事件的新线程,所以在Worker类中需要一个Thread对象用来启动线程,还需要一个Selector用来监听事件管理channel,此外为Worker设定一个name。这些可以在构造函数中进行初始化。

之后需要实现一个register函数,这个函数接收一个SocketChannel对象,Worker类的register函数将这个SocketChannel对象注册到Worker类的selector上。

下面就是完整的Worker类的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码static class Worker implements Runnable{

private String name;
private Thread thread;
private Selector selector;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) throws IOException {
this.name = name;
thread = new Thread(this, name);
thread.start();
selector = Selector.open();
}


//初始化线程和selector
public void register(SocketChannel sc) throws IOException {
//将任务添加到队列中
queue.add(()->{
try {
sc.register(this.selector,SelectionKey.OP_READ,null);//boss
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup();
}


@Override
public void run() {
while(true){
try{
selector.select();//worker-0 一开始会阻塞,下面执行不了,所以要先wakeup一下来注册
Runnable task = queue.poll();
if(task!=null)
{
task.run();//执行了sc.register(this.selector,SelectionKey.OP_READ,null);//boss
}
Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = ((SocketChannel) key.channel());
channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugAll(buffer);//buffer可视化
key.cancel();
}
}
}catch (Exception e)
{

}
}
}
}

在这段代码中,还有一个需要注意的问题:必须保证sc.register(this.selector,SelectionKey.OP_READ,null)不会因为selector.select()被阻塞,否则就不能给socketchannel注册上。所以这里采用的方法是使用消息队列,当有线程想要注册到selector上时,就先使用selector.wakeup()唤醒,紧接着在selector.select()下面进行注册,即拿出消息队列中的注册任务执行。这种方式就能够保证当注册通道时不会被select方法阻塞住。

多Worker

上面讲解了Worker类,在实际服务端代码里,肯定是使用多个Worker去管理多个channel,那么可以使用一个worker数组,每一个channel注册worker数组中的某一个worker,此外,可以使用一个变量去给channel轮流分配worker,具体是这样实现的:

  1. 创建worker数组,并对每一项进行初始化,此外使用一个workerindex表示给下一个通道分配的worker在数组的下标,需要注意的是,因为是多线程环境,所以不能使用int类型,使用的是AtomicInteger类型。
1
2
3
4
5
6
java复制代码Worker[] workers = new Worker[5];
for (int i =0 ;i< workers.length;i++)
{
workers[i] = new Worker("worker-"+i);
}
AtomicInteger workerindex = new AtomicInteger();

2、给通道注册worker,这里需要模数组长度以达到循环使用的目的,做到了负载均衡。

1
java复制代码workers[workerindex.getAndIncrement()% workers.length].register(sc);

服务端代码

下面给出服务端所有的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
java复制代码@Slf4j
public class ThreadNIOServer {
public static void main(String[] args) {
try(ServerSocketChannel server = ServerSocketChannel.open()) {
Thread.currentThread().setName("BOSS");
server.bind(new InetSocketAddress(8080));
server.configureBlocking(false);

Selector boss = Selector.open();
server.register(boss, SelectionKey.OP_ACCEPT);

//1、创建固定数量的worker
Worker[] workers = new Worker[5];
for (int i =0 ;i< workers.length;i++)
{
workers[i] = new Worker("worker-"+i);
}
AtomicInteger workerindex = new AtomicInteger();


while(true)
{
boss.select();

Set<SelectionKey> selectionKeys = boss.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext())
{
SelectionKey key = iter.next();
iter.remove();

if(key.isAcceptable())
{
SocketChannel sc = server.accept();
sc.configureBlocking(false);
log.debug("connected...{}",sc.getRemoteAddress());
//2、关联socket channel和worker
log.debug("before connect...");
workers[workerindex.getAndIncrement()% workers.length].register(sc);

log.debug("after connect...");
}
else if(key.isReadable())
{

}
}
}
}catch (Exception e){

}
}


static class Worker implements Runnable{

private String name;
private Thread thread;
private Selector selector;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) throws IOException {
this.name = name;
thread = new Thread(this, name);
thread.start();
selector = Selector.open();
}


//初始化线程和selector
public void register(SocketChannel sc) throws IOException {
//将任务添加到队列中
queue.add(()->{
try {
sc.register(this.selector,SelectionKey.OP_READ,null);//boss
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});

selector.wakeup();
}


@Override
public void run() {
while(true){
try{
selector.select();//worker-0 一开始会阻塞,下面执行不了,所以要先wakeup一下来注册
Runnable task = queue.poll();
if(task!=null)
{
task.run();//执行了sc.register(this.selector,SelectionKey.OP_READ,null);//boss
}
Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = ((SocketChannel) key.channel());
channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugAll(buffer);//buffer可视化
key.cancel();
}
}
}catch (Exception e)
{

}
}
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

《python宝典》:由浅入深、循序渐进地为读者讲解编程开发

发表于 2021-11-16

《python宝典》:由浅入深、循序渐进地为读者讲解编程开发

书本简介:

Python是目前流行的脚本语言之一。《Python宝典》由浅入深、循序渐进地为读者讲解了如何使用Python进行编程开发。全书内容共分三篇,分为入门篇、高级篇和案例篇。入门篇包括Python的认识和安装、开发工具简介、Python基本语法、数据结构与算法、多媒体编程、系统应用、图像处理和GUI编程等内容。高级篇包括用Python劋作数据库、进行Web开发、网络编程、科学计算程多线程编程等内容。案例篇选择了3个案例演示了Python在Windows系统优化、大数据处理和游戏开发方面的应用。本书针对Python的常用扩展模块给出了详细的语法介绍,并且给出了典型案例,通过对本书的学习,读者能够很快地使用Python进行编程开发。

获取方式:

《python宝典》

书本内容:

第1章初识Python

1.1 Python是什么

1.2 Python有什么优点

1.3其他程序设计语言中的Python

1.4 快速搭建Python开发环境

1.4.1哪些系统中可使用Python

1.4.2 Python的下载和安装

1.4.3用VS2008编译Python源码

1.4.4 Python开发工具: Vim

1.4.5 Python开发工具: Emacs

1.4.6 Python开发I具: PythonWin

1.4.7其他的Python开发工具

1.5 第一个Python程序

1.5.1从”Hello, Python!”开始

1.5.2 Python的交互解释器

1.6本章小结

《python宝典》:由浅入深、循序渐进地为读者讲解编程开发

第2章 Python起步必备

2.1 Python代码的组织形式

2.1.1用缩进来分层

2.1.2两种代码注释的方式

2.1.3 Python语句的断行

2.2 Python的基本输入输出函数

2.2.1接收输入的input函数

2.2.2输出内容的print函数

2.3 Python对中文的支持

2.3.1 Python 3之前版本如何使用中文

2.3.2更全面的中文支持

2.4 简单实用的Python计算器

2.4.1直接进行算术运算

2.4.2 math模块提供丰富的数学函数

2.4.3 Python对大整数的支持

2.5本章小结

《python宝典》:由浅入深、循序渐进地为读者讲解编程开发

第3章 Python数据类型与基本语句

3.1 Python数据类型:数字

3.1.1整型和浮点型

3.1.2运算符

3.2 Python数据类型:字符串

3.2.1 Python中的字符串

3.2.2字符串中的转义字符

3.2.3操作字符串

3.2.4字符串的索引和分片

3.2.5格式化字符串

3.2.6字符串、数字类型的转换

3.2.7原始字符串(Raw String)

3.3 Python数据类型:列表和元组

3.3.1创建和操作列表

3.3.2创建和操作元组

3.4 Python数据类型:字典

3.5 Python数据类型:文件

13.6 Python的流程控制语句

3.6.1分支结构: if语句

3.6.2循环结构: for语句

3.6.3循环结构: while语句

3.7本章小结

《python宝典》:由浅入深、循序渐进地为读者讲解编程开发

第4章可复用的函数与模块

4.1 Python自定义函数

4.1.1函数声明

4.1.2函数调用

4.2 参数让函数更有价值

4.2.1有默认值的参数

4.2.2参数的传递方式

4.2.3如何传递任意数量的参数

4.2.4用参数返回计算结果

4.3变量的作用域

4.4最简单的函数:用lambda声明函数

4.5 可重用结构: Python模块

4.5.1 Python模块的基本用法

《python宝典》:由浅入深、循序渐进地为读者讲解编程开发

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用外置的Servlet容器及其原理

发表于 2021-11-16

这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战
我们上一章就说过我们可以使用外置的Servlet容器,我们原本创建Spring Boot项目想要打包,部署到我们服务器上是打成Jar包的方式,但是我们也可以使用war的方式,两种打包方式都有各自的好处,我们下面进行分析。

嵌入式Servlet容器:应用打成可执行的jar

优点:简单、便携;

缺点:默认不支持JSP、优化定制比较复杂(使用定制器【ServerProperties、自定义EmbeddedServletContainerCustomizer】,自己编写嵌入式Servlet容器的创建工厂【EmbeddedServletContainerFactory】);

外置的Servlet容器:外面安装Tomcat—应用war包的方式打包;

我们之前使用的是jar的方式来创建我们的项目,这回我们使用war的方式来创建项目

1.新建一个项目,打包方式选择war

2.选择模块时,我们还是选择一个Web。

创建好像项目时是没有webapp的

3.我们按下Ctrl+Shift+Alt+S

选择一个正确的位置

4.配置好我们的Tomcat

5.部署工件

6.创建一个jsp页面

7.启动

jar包:执行SpringBoot主类的main方法,启动ioc容器,创建嵌入式的Servlet容器;

\

war包:启动服务器,服务器启动SpringBoot应用【SpringBootServletInitializer】,启动ioc容器;

\

servlet3.0(Spring注解版):

\

8.2.4 Shared libraries / runtimes pluggability:

\

规则:

1)、服务器启动(web应用启动)会创建当前web应用里面每一个jar包里面ServletContainerInitializer实例:

2)、ServletContainerInitializer的实现放在jar包的META-INF/services文件夹下,有一个名为javax.servlet.ServletContainerInitializer的文件,内容就是ServletContainerInitializer的实现类的全类名

3)、还可以使用@HandlesTypes,在应用启动的时候加载我们感兴趣的类;

流程:

1)、启动Tomcat

2)、org\springframework\spring-web\4.3.14.RELEASE\spring-web-4.3.14.RELEASE.jar!\META-INF\services\javax.servlet.ServletContainerInitializer:

Spring的web模块里面有这个文件:org.springframework.web.SpringServletContainerInitializer

3)、SpringServletContainerInitializer将@HandlesTypes(WebApplicationInitializer.class)标注的所有这个类型的类都传入到onStartup方法的Set<Class<?>>;为这些WebApplicationInitializer类型的类创建实例;

4)、每一个WebApplicationInitializer都调用自己的onStartup;

5)、相当于我们的SpringBootServletInitializer的类会被创建对象,并执行onStartup方法

6)、SpringBootServletInitializer实例执行onStartup的时候会createRootApplicationContext;创建容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
scss复制代码protected WebApplicationContext createRootApplicationContext(
ServletContext servletContext) {
//1、创建SpringApplicationBuilder
SpringApplicationBuilder builder = createSpringApplicationBuilder();
StandardServletEnvironment environment = new StandardServletEnvironment();
environment.initPropertySources(servletContext, null);
builder.environment(environment);
builder.main(getClass());
ApplicationContext parent = getExistingRootWebApplicationContext(servletContext);
if (parent != null) {
this.logger.info("Root context already created (using as parent).");
servletContext.setAttribute(
WebApplicationContext.ROOT_WEB_APPLICATION_CONTEXT_ATTRIBUTE, null);
builder.initializers(new ParentContextApplicationContextInitializer(parent));
}
builder.initializers(
new ServletContextApplicationContextInitializer(servletContext));
builder.contextClass(AnnotationConfigEmbeddedWebApplicationContext.class);

//调用configure方法,子类重写了这个方法,将SpringBoot的主程序类传入了进来
builder = configure(builder);

//使用builder创建一个Spring应用
SpringApplication application = builder.build();
if (application.getSources().isEmpty() && AnnotationUtils
.findAnnotation(getClass(), Configuration.class) != null) {
application.getSources().add(getClass());
}
Assert.state(!application.getSources().isEmpty(),
"No SpringApplication sources have been defined. Either override the "
+ "configure method or add an @Configuration annotation");
// Ensure error pages are registered
if (this.registerErrorPageFilter) {
application.getSources().add(ErrorPageFilterConfiguration.class);
}
//启动Spring应用
return run(application);
}

7)、Spring的应用就启动并且创建IOC容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
ini复制代码public ConfigurableApplicationContext run(String... args) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
ConfigurableApplicationContext context = null;
FailureAnalyzers analyzers = null;
configureHeadlessProperty();
SpringApplicationRunListeners listeners = getRunListeners(args);
listeners.starting();
try {
ApplicationArguments applicationArguments = new DefaultApplicationArguments(
args);
ConfigurableEnvironment environment = prepareEnvironment(listeners,
applicationArguments);
Banner printedBanner = printBanner(environment);
context = createApplicationContext();
analyzers = new FailureAnalyzers(context);
prepareContext(context, environment, listeners, applicationArguments,
printedBanner);

//刷新IOC容器
refreshContext(context);
afterRefresh(context, applicationArguments);
listeners.finished(context, null);
stopWatch.stop();
if (this.logStartupInfo) {
new StartupInfoLogger(this.mainApplicationClass)
.logStarted(getApplicationLog(), stopWatch);
}
return context;
}
catch (Throwable ex) {
handleRunFailure(context, listeners, analyzers, ex);
throw new IllegalStateException(ex);
}
}

\

启动Servlet容器,再启动SpringBoot应用

\

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

基于SSM实现微博系统

发表于 2021-11-16

​

系统编号:BS-PT-033

后端技术:Spring+Springmvc+Mybatis

前端技术:BootStrap

开 发工具:IDEA / Eclipse

数据库:Mysql

应用服务器:Tomcat8.5.31

JAVA版本:JDK1.8

VX: baozai_7788

本系统基于SSM框架开发而面,实现了一个微博系统,功能完整,界面简洁大方,适合做毕业设计使用。

主要实现了用户注册、登陆、发微博、转发微博、点赞、评论、收藏、关注博主、添加好友、查看朋友圈、回复评论等功能。

系统部分功能展示:

前端功能展示:

http://localhost:8080/queryAllWeiboFollow.action?pageNo=1

用户登陆注册:

:

)​

)​

发微博

)​

查看微博

)​

评论

)​

系统通知:别人点赞、评论、转发时系统会进行提醒

)​

)​

转发微博

)​

可以查看我的收藏、我的赞

)​

)​

查看自己发的微博记录

)​

修改个人信息

)​

微博后台管理

)​

微博管理

)​

用户管理

)​

后台用户资料修改

)​

以上是本微博系统的基本功能展示,如果您的毕业设计选题符合,可以采用做毕业设计使用。VX: baozai_7788

\

​

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis6源码系列(一)- 内存管理zmalloc(下)

发表于 2021-11-16

Redis中对内存的管理功能由 zmalloc 完成,对应 zmalloc.h/zmalloc.c 文件;头文件 zmalloc.h 中包含了相关的宏定义和函数声明,具体的实现在 zmalloc.c 文件中。

zmalloc本质上是对 jemalloc、tcmalloc、libc(ptmalloc2)等内存分配器(算法库)的简单抽象封装,提供了统一的内存管理函数,屏蔽底层不同分配器的差异。

1、函数定义

在头文件 zmalloc.h 中,定义了Redis内存分配的主要功能函数,这些函数就包括内存申请、释放和统计等功能:

1
2
3
4
5
6
7
8
9
10
11
arduino复制代码// 申请内存
void *zmalloc(size_t size);
void *zcalloc(size_t size);
void *zrealloc(void *ptr, size_t size);
// 释放内存
void zfree(void *ptr);
// 获取内存大小
size_t zmalloc_used_memory(void);
// 内存溢出处理
void zmalloc_set_oom_handler(void (*oom_handler)(size_t));
// 其他函数...

2、宏和全局变量

zmalloc定义了几个宏变量、函数和全局变量,用于记录一些状态信息

PREFIX_SIZE

C语言标准库函数malloc在申请内存时,会记录申请的内存块大小,并把大小数值存储到分配的内存块中,用于外部获取已分配空间的大小。存储大小数值的内存空间是申请大小的额外空间,PREFIX_SIZE 就是表示这块额外内存空间的大小。

zmalloc根据实际使用的内存分配器判断是否需要申请额外的内存空间,然后通过 HAVE_MALLOC_SIZE 变量进行标识。

tcmalloc和Mac平台下的malloc函数族提供了计算已分配空间大小的函数,Redis不需要多申请一个PREFIX_SIZE大小的内存空间来记录分配的内存块大小,此时PREFIX_SIZE值为0。否则根据Redis服务器所在的系统平台,使用sizeof(long long)或sizeof(size_t)大小的额外空间记录申请的空间大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
arduino复制代码#ifdef HAVE_MALLOC_SIZE
#define PREFIX_SIZE (0)
#define ASSERT_NO_SIZE_OVERFLOW(sz)
#else
#if defined(__sun) || defined(__sparc) || defined(__sparc__)
#define PREFIX_SIZE (sizeof(long long))
#else
// size_t 是一种无符号的整型数,它在头文件中typedef为unsigned int类型
// size_t 随着系统体系结构变化,在32位操作系统上宽度可能至少为32位
#define PREFIX_SIZE (sizeof(size_t))
#endif
// PREFIX_SIZE + size 大于 size
#define ASSERT_NO_SIZE_OVERFLOW(sz) assert((sz) + PREFIX_SIZE > (sz))
#endif

ASSERT_NO_SIZE_OVERFLOW

在上面的宏定义中还包含用于检查申请内存大小是否有效的函数ASSERT_NO_SIZE_OVERFLOW,断言 申请的内存大小size加上PREFIX_SIZE的和 大于 size本身

used_memory

zmalloc使用used_memory变量来统计当前已分配的总内存大小,同时定义了两个原子操作,用于更新该变量的值:

1
2
3
4
5
6
scss复制代码// 原子操作,used_memory增加分配的内存空间size大小
#define update_zmalloc_stat_alloc(__n) atomicIncr(used_memory,(__n))
// 原子操作,used_memory减少分配的内存空间size大小
#define update_zmalloc_stat_free(__n) atomicDecr(used_memory,(__n))
​
static redisAtomic size_t used_memory = 0;

3、zmalloc函数

zmalloc函数用于申请指定大小内存,函数实现由2个部分组成:尝试申请内存、失败处理

1
2
3
4
5
6
7
arduino复制代码void *zmalloc(size_t size) {
   // 申请内存
   void *ptr = ztrymalloc_usable(size, NULL);
   // ptr为Null时调用异常处理函数
   if (!ptr) zmalloc_oom_handler(size);
   return ptr;
}

尝试申请内存

ztrymalloc_usable函数尝试申请内存,会先检查申请的内存块大小,然后调用malloc函数进行内存分配;申请成功后统计内存大小,如果申请失败则返回NULL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
arduino复制代码void *ztrymalloc_usable(size_t size, size_t *usable) {
   // 检查申请的内存大小
   ASSERT_NO_SIZE_OVERFLOW(size);
   // 调用malloc函数进行内存申请,申请的内存块大小为 size + PREFIX_SIZE
   // 这里额外的 PREFIX_SIZE 大小的内存块,用于存放此次分配到的内存块的大小信息
   void *ptr = malloc(MALLOC_MIN_SIZE(size)+PREFIX_SIZE);
​
   if (!ptr) return NULL;
#ifdef HAVE_MALLOC_SIZE
   // 调用函数,获取内存大小
   size = zmalloc_size(ptr);
   // 更新已使用内存 used_memory 的值
   update_zmalloc_stat_alloc(size);
   if (usable) *usable = size;
   return ptr;
#else
   // 获取当前ptr指针对应的内存空间大小
   *((size_t*)ptr) = size;
   // 更新已使用内存 used_memory 的值
   update_zmalloc_stat_alloc(size+PREFIX_SIZE);
   if (usable) *usable = size;
   return (char*)ptr+PREFIX_SIZE;
#endif
}

实际的内存分配是由 malloc函数 完成的,malloc函数是对实际内存分配器的内存分配函数的抽象封装,用于屏蔽不同内存分配器函数的差异。例如使用tcmalloc时,调用malloc函数实际就是在调用tc_malloc函数。

malloc.png

除了malloc函数,calloc、realloc和free等函数也采用了相同的做法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scss复制代码#if defined(USE_TCMALLOC)
// 使用tcmalloc时,将tc_malloc函数重命名为malloc
#define malloc(size) tc_malloc(size)
#define calloc(count,size) tc_calloc(count,size)
#define realloc(ptr,size) tc_realloc(ptr,size)
#define free(ptr) tc_free(ptr)
#elif defined(USE_JEMALLOC)
// 使用jemalloc时,将je_malloc函数重命名为malloc
#define malloc(size) je_malloc(size)
#define calloc(count,size) je_calloc(count,size)
#define realloc(ptr,size) je_realloc(ptr,size)
#define free(ptr) je_free(ptr)
#define mallocx(size,flags) je_mallocx(size,flags)
#define dallocx(ptr,flags) je_dallocx(ptr,flags)
#endif

在调用malloc函数之前,还有个最小申请大小的处理;如果申请的内存大小size小于0,则返回储存long类型数值所需要的空间大小

1
2
3
4
scss复制代码/* When using the libc allocator, use a minimum allocation size to match the
* jemalloc behavior that doesn't return NULL in this case.
*/
#define MALLOC_MIN_SIZE(x) ((x) > 0 ? (x) : sizeof(long))

OOM处理

在申请内存失败时,会调用 zmalloc_oom_handler函数 进行处理。zmalloc_oom_handler 函数的默认实现是打印“Out of memory”异常信息,并终止服务进程

1
2
3
4
5
6
7
8
9
arduino复制代码static void zmalloc_default_oom(size_t size) {
   fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n",
       size);
   fflush(stderr);
   // 异常终止进程
   abort();
}
​
static void (*zmalloc_oom_handler)(size_t) = zmalloc_default_oom;

头文件 zmalloc.h 定义了可以指定oom异常处理逻辑的函数 zmalloc_set_oom_handler,允许通过传入函数对异常进行处理

1
2
3
javascript复制代码void zmalloc_set_oom_handler(void (*oom_handler)(size_t)) {
   zmalloc_oom_handler = oom_handler;
}

在 server.c 中有对zmalloc_set_oom_handler函数的使用,服务启动时传入redisOutOfMemoryHandler函数,将异常日志打印到日志文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scss复制代码int main(int argc, char **argv) {
  ...
   setlocale(LC_COLLATE,"");
   tzset(); /* Populates 'timezone' global. */
   // 调用函数指定oom异常处理逻辑
   zmalloc_set_oom_handler(redisOutOfMemoryHandler);
   srand(time(NULL)^getpid());
  ...
​
// 函数实现将错误信息打印到日志文件    
void redisOutOfMemoryHandler(size_t allocation_size) {
   serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!",
       allocation_size);
   // 打印异常并终止进程    
   serverPanic("Redis aborting for OUT OF MEMORY. Allocating %zu bytes!",
       allocation_size);
}

整理下代码逻辑,zmalloc函数内部逻辑大致流程如下:

zmalloc流程.png

4、zcalloc函数

zcalloc函数也用于申请分配内存,zcalloc函数跟zmalloc函数 唯一区别是在实际分配内存空间时,调用的是calloc函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
arduino复制代码/* Allocate memory and zero it or panic */
void *zcalloc(size_t size) {
   // 尝试分配内存
   void *ptr = ztrycalloc_usable(size, NULL);
   // oom异常处理
   if (!ptr) zmalloc_oom_handler(size);
   return ptr;
}
​
/* Try allocating memory and zero it, and return NULL if failed.
* '*usable' is set to the usable size if non NULL. */
void *ztrycalloc_usable(size_t size, size_t *usable) {
   // 检查size大小
   ASSERT_NO_SIZE_OVERFLOW(size);
   // 申请分配内存,这里调用的是 calloc 函数
   void *ptr = calloc(1, MALLOC_MIN_SIZE(size)+PREFIX_SIZE);
   if (ptr == NULL) return NULL;
​
// 统计已分配内存大小
#ifdef HAVE_MALLOC_SIZE
   size = zmalloc_size(ptr);
   update_zmalloc_stat_alloc(size);
   if (usable) *usable = size;
   return ptr;
#else
   *((size_t*)ptr) = size;
   update_zmalloc_stat_alloc(size+PREFIX_SIZE);
   if (usable) *usable = size;
   return (char*)ptr+PREFIX_SIZE;
#endif
}

5、zfree函数

zfree函数用于内存回收,释放由zmalloc、zcalloc函数申请分配的内存空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
arduino复制代码void zfree(void *ptr) {
#ifndef HAVE_MALLOC_SIZE
   void *realptr;
   size_t oldsize;
#endif
   // ptr指针为空时直接返回
   if (ptr == NULL) return;
#ifdef HAVE_MALLOC_SIZE
    // 更新已使用大小used_memory,释放的内存块大小由zmalloc_size函数提供
   update_zmalloc_stat_free(zmalloc_size(ptr));
   // 调用 free 函数释放内存
   free(ptr);
#else
   // 获取内存块起始地址
   realptr = (char*)ptr-PREFIX_SIZE;
   oldsize = *((size_t*)realptr);
   // 更新已使用大小used_memory
   // 释放的内存块大小为ptr指向的内存块大小加上PREFIX_SIZE
   update_zmalloc_stat_free(oldsize+PREFIX_SIZE);
   // 调用 free 函数释放内存
   free(realptr);
#endif
}

zfree函数内部的实现逻辑,也区分不同的底层库。 例如使用tcmalloc,HAVE_MALLOC_SIZE变量的值为true,此时直接调用zmalloc_size函数获取ptr指针指向的内存空间大小,然后直接释放ptr即可。否则需要将ptr指针向前偏移PREFIX_SIZE字节的长度,获取到内存块实际的起始地址进行释放;计算释放的内存空间大小,也需要加上PREFIX_SIZE字节。

6、选择内存分配器

内存的分配、释放都依赖于底层使用的内存分配器(算法库),那么Redis是怎么指定底层使用的具体内存分配器的呢?

在 README.md 文件中有对内存分配器(Allocator)的描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
vbnet复制代码Allocator
---------
​
Selecting a non-default memory allocator when building Redis is done by setting
the `MALLOC` environment variable. Redis is compiled and linked against libc
malloc by default, with the exception of jemalloc being the default on Linux
systems. This default was picked because jemalloc has proven to have fewer
fragmentation problems than libc malloc.
​
To force compiling against libc malloc, use:
​
  % make MALLOC=libc
​
To compile against jemalloc on Mac OS X systems, use:
​
  % make MALLOC=jemalloc

大抵意思是Redis在Linux系统上默认为jemalloc,但是可以通过设置“MALLOC”环境变量进行指定。

启动一个Redis服务验证一下,使用 info memory 命令查看运行中的Redis服务内存信息:

1
2
csharp复制代码[root@localhost redis-6.2.6]# ./src/redis-cli info memory |grep mem_allocator
mem_allocator:jemalloc-5.1.0

可以确定,默认情况下Redis使用的内存分配器是jemalloc。接着尝试指定内存分配器,Redis的内存分配器在程序编译时进行指定,所以需要编译redis源码;在使用make命令编译时,直接指定使用的内存分配器:

1
2
csharp复制代码[root@localhost redis-6.2.6]# make MALLOC=libc
...

运行编译好的Redis服务后,通过 redis-cli 直接查看,可以发现使用的内存分配器为 libc:

1
2
csharp复制代码[root@localhost redis-6.2.6]# ./src/redis-cli info memory |grep mem_allocator
mem_allocator:libc

源码中初始化选择内存分配器的逻辑,在 zmalloc.h 中通过判断变量引入不同的头文件来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
arduino复制代码// 使用tcmalloc时,引入google/tcmalloc.h文件
#if defined(USE_TCMALLOC)
#define ZMALLOC_LIB ("tcmalloc-" __xstr(TC_VERSION_MAJOR) "." __xstr(TC_VERSION_MINOR))
#include <google/tcmalloc.h>
#if (TC_VERSION_MAJOR == 1 && TC_VERSION_MINOR >= 6) || (TC_VERSION_MAJOR > 1)
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) tc_malloc_size(p)
#else
#error "Newer version of tcmalloc required"
#endif
​
// 使用jemalloc时,,引入jemalloc/jemalloc.h文件
#elif defined(USE_JEMALLOC)
#define ZMALLOC_LIB ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr(JEMALLOC_VERSION_BUGFIX))
#include <jemalloc/jemalloc.h>
#if (JEMALLOC_VERSION_MAJOR == 2 && JEMALLOC_VERSION_MINOR >= 1) || (JEMALLOC_VERSION_MAJOR > 2)
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) je_malloc_usable_size(p)
#else
#error "Newer version of jemalloc required"
#endif
​
// 使用Mac时,引入 malloc/malloc.h 文件
#elif defined(__APPLE__)
#include <malloc/malloc.h>
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) malloc_size(p)
#endif
​
// 否则使用libc,此时未声明 HAVE_MALLOC_SIZE
#ifndef ZMALLOC_LIB
#define ZMALLOC_LIB "libc"

通过条件编译逻辑可以知道,zmalloc根据Makefile定义的不同变量进行判断,查看 Makefile 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
perl复制代码# Default allocator defaults to Jemalloc if it's not an ARM
MALLOC=libc
ifneq ($(uname_M),armv6l)
ifneq ($(uname_M),armv7l)
ifeq ($(uname_S),Linux)
  MALLOC=jemalloc
endif
endif
endif
​
# To get ARM stack traces if Redis crashes we need a special C flag.
ifneq (,$(filter aarch64 armv,$(uname_M)))
       CFLAGS+=-funwind-tables
else
ifneq (,$(findstring armv,$(uname_M)))
       CFLAGS+=-funwind-tables
endif
endif
​
# Backwards compatibility for selecting an allocator
ifeq ($(USE_TCMALLOC),yes)
  MALLOC=tcmalloc
endif
​
ifeq ($(USE_TCMALLOC_MINIMAL),yes)
  MALLOC=tcmalloc_minimal
endif
​
ifeq ($(USE_JEMALLOC),yes)
  MALLOC=jemalloc
endif
​
ifeq ($(USE_JEMALLOC),no)
  MALLOC=libc
endif
...
ifeq ($(MALLOC),tcmalloc)
  FINAL_CFLAGS+= -DUSE_TCMALLOC
  FINAL_LIBS+= -ltcmalloc
endif
​
ifeq ($(MALLOC),tcmalloc_minimal)
  FINAL_CFLAGS+= -DUSE_TCMALLOC
  FINAL_LIBS+= -ltcmalloc_minimal
endif
​
ifeq ($(MALLOC),jemalloc)
  DEPENDENCY_TARGETS+= jemalloc
  FINAL_CFLAGS+= -DUSE_JEMALLOC -I../deps/jemalloc/include
  FINAL_LIBS := ../deps/jemalloc/lib/libjemalloc.a $(FINAL_LIBS)
endif

Makefile 文件中判断到 $(USE_JEMALLOC) 参数的值为“no”时,将会使用 libc 默认的内存分配器,那似乎可以来点这样的操作:

1
2
3
4
5
6
7
bash复制代码[root@localhost redis-6.2.6]# make USE_JEMALLOC=no
cd src && make all
make[1]: Entering directory `/home/redis-6.2.6/src'
  CC Makefile.dep
  ...
Hint: It's a good idea to run 'make test' ;)
make[1]: Leaving directory `/home/dpm/redis-6.2.6/src'

运行编译好的Redis服务后,通过 redis-cli 直接查看,可以发现使用的内存分配器为 libc:

1
2
csharp复制代码[root@localhost redis-6.2.6]# ./src/redis-cli info memory |grep mem_allocator
mem_allocator:libc

7、内存大小与碎片率

在查看Redis服务器内存信息的时候,会发现里面包含有内存碎片率mem_fragmentation_ratio信息,但是在zmalloc中只有一个全局变量used_memory用于统计已分配内存大小。那么碎片率是怎么来的?

1
2
3
4
5
6
7
8
makefile复制代码[root@localhost redis-6.2.6]# redis-cli info memory
# Memory
used_memory:934384
used_memory_rss:2830336
...
mem_fragmentation_ratio:3.20
mem_fragmentation_bytes:1946360
...

获取内存大小

先来看看获取内存大小的函数 zmalloc_used_memory:

1
2
3
4
5
arduino复制代码size_t zmalloc_used_memory(void) {
   size_t um;
   atomicGet(used_memory,um);
   return um;
}

zmalloc_used_memory函数直接获取used_memory的值进行返回,这里使用了个原子的赋值操作,除此之外也就没有其他什么逻辑了。

我们知道used_memory是Redis自身统计维护的内存大小总数,实际上操作系统分配给Redis进程的内存大小未必就是这个数。例如在调用free函数释放内存空间时,zmalloc将used_memory的值减去了被释放的空间大小,但是free函数内部的实现为了减少调用系统调用接口,可能并没有实际释放这部分内存空间,而是由进程继续持有着这块空间,用于下次malloc函数申请内存时复用。

zmalloc定义了从操作系统角度获取当前Redis进程分配内存大小的函数 zmalloc_get_rss。既然从操作系统的角度获取进程的内存占用大小,那么就需要区分不同的操作系统实现。

我们正常需要获取进程的内存分配的情况,可以通过虚拟文件 /proc/$pid/stat 得到。 /proc 以文件系统的形式为内核和进程提供通信的接口,实际上并不存储在磁盘上,而是系统内存的映射。

先查询Redis服务的进程号,然后直接看进程号对应的 /stat 文件:

1
2
3
4
5
6
7
8
9
10
11
csharp复制代码// 1、获取进程号
[root@localhost ~]# ps -ef|grep redis
root     17046     1  0 Nov08 ?        00:06:36 ./src/redis-server *:6379
// 或者直接查看redis的info信息
[root@localhost ~]# ./redis-6.2.6/src/redis-cli info |grep process_id
process_id:17046
// 2、获取进程信息
[root@localhost ~]# cat /proc/17046/stat
17046 (redis-server) S 1 17046 14637 0 -1 4202752 778 0 0 0 19039 20606 0 0 20 0 4 0
182674605 146497536 692 18446744073709551615 4194304 5576996 140724681065824
140724681065280 139868670661187 0 0 4097 17642 18446744073709551615 0 0  17 1 0 0 0 0 0

/state文件的第24个数据项就是进程当前驻留物理地址空间的大小,单位是page(物理内存页)。这里可以看到,进程当前占用了692个内存页。

系统内存页的大小,可以通过getconf命令获取:

1
2
csharp复制代码[root@localhost ~]# getconf PAGESIZE
4096

单个页面大小为 4096 字节,692个内存页也就是 2834432 字节。看下Redis的内存信息 used_memory_rss ,跟计算出来的数据是一致的。

1
2
3
makefile复制代码[root@localhost redis-6.2.6]# ./src/redis-cli info memory |grep mem |grep rss
used_memory_rss:2834432
used_memory_rss_human:2.70M

再来看下zmalloc_get_rss函数的实现,能够发现逻辑几乎是一致的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
ini复制代码#if defined(HAVE_PROC_STAT)
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
​
size_t zmalloc_get_rss(void) {
   int page = sysconf(_SC_PAGESIZE);
   size_t rss;
   char buf[4096];
   char filename[256];
   int fd, count;
   char *p, *x;
​
   snprintf(filename,256,"/proc/%ld/stat",(long) getpid());
   if ((fd = open(filename,O_RDONLY)) == -1) return 0;
   if (read(fd,buf,4096) <= 0) {
       close(fd);
       return 0;
  }
   close(fd);
​
   p = buf;
   count = 23; /* RSS is the 24th field in /proc/<pid>/stat */
   while(p && count--) {
       p = strchr(p,' ');
       if (p) p++;
  }
   if (!p) return 0;
   x = strchr(p,' ');
   if (!x) return 0;
   *x = '\0';
​
   rss = strtoll(p,NULL,10);
   rss *= page;
   return rss;
}

如果有 /proc/pid/stat∗文件,则从该文件中获取。在不同的操作系统下,可能并不一定有∗/proc/pid/stat\ 文件,则从该文件中获取。在不同的操作系统下,可能并不一定有 */proc/pid/stat∗文件,则从该文件中获取。在不同的操作系统下,可能并不一定有∗/proc/pid/stat* 文件,所以zmalloc还提供了其他的实现。例如读取 /proc/$pid/psinfo 文件、调用task_info函数 ,在上面这些信息都没有的情况下,就返回 used_memory 。完整的函数定义逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
c复制代码/* Get the RSS information in an OS-specific way.
*
* WARNING: the function zmalloc_get_rss() is not designed to be fast
* and may not be called in the busy loops where Redis tries to release
* memory expiring or swapping out objects.
*
* For this kind of "fast RSS reporting" usages use instead the
* function RedisEstimateRSS() that is a much faster (and less precise)
* version of the function. */
​
#if defined(HAVE_PROC_STAT)
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
​
size_t zmalloc_get_rss(void) {
   int page = sysconf(_SC_PAGESIZE);
   size_t rss;
   char buf[4096];
   char filename[256];
   int fd, count;
   char *p, *x;
​
   snprintf(filename,256,"/proc/%ld/stat",(long) getpid());
   if ((fd = open(filename,O_RDONLY)) == -1) return 0;
   if (read(fd,buf,4096) <= 0) {
       close(fd);
       return 0;
  }
   close(fd);
​
   p = buf;
   count = 23; /* RSS is the 24th field in /proc/<pid>/stat */
   while(p && count--) {
       p = strchr(p,' ');
       if (p) p++;
  }
   if (!p) return 0;
   x = strchr(p,' ');
   if (!x) return 0;
   *x = '\0';
​
   rss = strtoll(p,NULL,10);
   rss *= page;
   return rss;
}
#elif defined(HAVE_TASKINFO)
#include <sys/types.h>
#include <sys/sysctl.h>
#include <mach/task.h>
#include <mach/mach_init.h>
​
size_t zmalloc_get_rss(void) {
   task_t task = MACH_PORT_NULL;
   struct task_basic_info t_info;
   mach_msg_type_number_t t_info_count = TASK_BASIC_INFO_COUNT;
​
   if (task_for_pid(current_task(), getpid(), &task) != KERN_SUCCESS)
       return 0;
   task_info(task, TASK_BASIC_INFO, (task_info_t)&t_info, &t_info_count);
​
   return t_info.resident_size;
}
#elif defined(__FreeBSD__) || defined(__DragonFly__)
#include <sys/types.h>
#include <sys/sysctl.h>
#include <sys/user.h>
​
size_t zmalloc_get_rss(void) {
   struct kinfo_proc info;
   size_t infolen = sizeof(info);
   int mib[4];
   mib[0] = CTL_KERN;
   mib[1] = KERN_PROC;
   mib[2] = KERN_PROC_PID;
   mib[3] = getpid();
​
   if (sysctl(mib, 4, &info, &infolen, NULL, 0) == 0)
#if defined(__FreeBSD__)
       return (size_t)info.ki_rssize * getpagesize();
#else
       return (size_t)info.kp_vm_rssize * getpagesize();
#endif
​
   return 0L;
}
#elif defined(__NetBSD__)
#include <sys/types.h>
#include <sys/sysctl.h>
​
size_t zmalloc_get_rss(void) {
   struct kinfo_proc2 info;
   size_t infolen = sizeof(info);
   int mib[6];
   mib[0] = CTL_KERN;
   mib[1] = KERN_PROC;
   mib[2] = KERN_PROC_PID;
   mib[3] = getpid();
   mib[4] = sizeof(info);
   mib[5] = 1;
   if (sysctl(mib, 4, &info, &infolen, NULL, 0) == 0)
       return (size_t)info.p_vm_rssize * getpagesize();
​
   return 0L;
}
#elif defined(HAVE_PSINFO)
#include <unistd.h>
#include <sys/procfs.h>
#include <fcntl.h>
​
size_t zmalloc_get_rss(void) {
   struct prpsinfo info;
   char filename[256];
   int fd;
​
   snprintf(filename,256,"/proc/%ld/psinfo",(long) getpid());
​
   if ((fd = open(filename,O_RDONLY)) == -1) return 0;
   if (ioctl(fd, PIOCPSINFO, &info) == -1) {
       close(fd);
  return 0;
  }
​
   close(fd);
   return info.pr_rssize;
}
#else
size_t zmalloc_get_rss(void) {
   /* If we can't get the RSS in an OS-specific way for this system just
    * return the memory usage we estimated in zmalloc()..
    *
    * Fragmentation will appear to be always 1 (no fragmentation)
    * of course... */
   return zmalloc_used_memory();
}
#endif

内存碎片率

知道了Redis向系统申请的内存大小,也知道了系统实际分配给Redis进程的内存大小,似乎就不难得出内存碎片率了:

1
bash复制代码mem_fragmentation_ratio(内存碎片率)= used_memory_rss/used_memory

来看下Redis的实现,内存碎片率 mem_fragmentation_ratio 信息对应 redisMemOverhead 对象的total_frag属性,计算逻辑在 object.c 文件的 getMemoryOverheadData函数 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ini复制代码struct redisMemOverhead *getMemoryOverheadData(void) {
   int j;
   size_t mem_total = 0;
   size_t mem = 0;
   size_t zmalloc_used = zmalloc_used_memory();
   struct redisMemOverhead *mh = zcalloc(sizeof(*mh));
   
   mh->total_allocated = zmalloc_used;
   mh->startup_allocated = server.initial_memory_usage;
   mh->peak_allocated = server.stat_peak_memory;
   // 内存碎片率 = 系统分配内存大小 / Redis服务器统计内存大小
   mh->total_frag =
      (float)server.cron_malloc_stats.process_rss / server.cron_malloc_stats.zmalloc_used;
   // 内存碎片空间大小 = 系统分配内存大小 - Redis服务器统计内存大小    
   mh->total_frag_bytes =
       server.cron_malloc_stats.process_rss - server.cron_malloc_stats.zmalloc_used;

管理碎片率

一般认为,合理的内存碎片率应该控制在 1~1.5 之间。

(1)碎片率过低

如果内存碎片率低于1,那么说明系统分配给Redis的内存不能满足实际需求,此时的Redis实例可能会把部分数据交换到磁盘上。因为磁盘I/O的读写速度远远慢与内存读写,频繁的磁盘换出换入操作会给Redis带来性能问题。

内存碎片率过低的问题,根据情况可以通过 扩展物理内存 、调整Redis实例的maxmemory配置 或者是 禁用SWAP 来解决。

Redis的配置文件 redis.conf 提供了实例可用最大内存配置项maxmemory,搭配maxmemory-policy配置项控制Redis可使用内存空间的大小,以避免将数据交换到磁盘交换区:

1
2
3
4
5
python复制代码# Redis使用内存到达指定值时,根据policy删除keys
# 如果无法删除keys腾出空间,Redis会拒绝执行需要申请内存空间的命令,并返回错误信息,例如set、lpush等
maxmemory <bytes>
# 最大内存的处理策略,在达到maxmemory时根据此配置项进行处理
maxmemory-policy noeviction

Redis服务的生产环境通常是建议禁用SWAP的,进程的内存使用情况可以通过查看 /proc/$pid/smap 文件辅助判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
yaml复制代码// 系统swap使用情况
[root@localhost redis-6.2.6]# free -h
            total       used       free     shared   buffers     cached
Mem:           23G       22G       887M       200K       585M       4.6G
-/+ buffers/cache:       17G       6.0G            
Swap:         7.7G       74M       7.7G
// 进程swap使用情况
[root@localhost redis-6.2.6]# cat /proc/17046/smaps |egrep '^(Swap|Size)'
Size:               1352 kB
Swap:                  0 kB
Size:                 48 kB
Swap:                  0 kB
Size:                116 kB
...

数据中的Swap部分表示的就是进程被swap到交换空间的大小(不包含mmap内存)。

(2)碎片率过高

相反地,如果内存碎片率大于1.5,那么说明此时的Redis有较大的内存浪费。

问题产生的原因,是Redis释放了内存,但是内存分配器(Allocator)并没有将这部分内存返还给操作系统。从我们对内存分配的了解可知,这是并不是Redis的特性,而是malloc函数导致的。

Redis提供了自动内存碎片整理功能(defragmentation),通过 redis.conf 文件的配置项 activedefrag 启用:

1
2
3
4
5
6
7
8
9
10
11
12
13
python复制代码# 是否启用碎片整理功能
# 默认no,运行时通过 CONFIG SET activedefrag yes 可启用
activedefrag no
# 最小碎片量,达到阈值开始内存碎片整理;默认100mb
active-defrag-ignore-bytes 100mb
# 最小碎片百分比,内存碎片超过指定百分比时开始整理;默认10%
active-defrag-threshold-lower 10
# 最大碎片百分比,内存碎片超过指定百分比时尽最大努力整理
active-defrag-threshold-upper 100
# 内存自动整理占用资源最小百分比
active-defrag-cycle-min 1
# 内存自动整理占用资源最大百分比
active-defrag-cycle-max 25

内存碎片整理功能defragmentation允许Redis实例在运行中对碎片空间进行回收,而不需要重启服务。不过需要注意的是,Redis只有在使用jemalloc内存分配器时才支持该功能。

注:更多内存碎片整理相关的内容,可以查看 Redis6源码系列(二)- 自动碎片整理defrag

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JAVA异常工具类ExceptionUtils的使用

发表于 2021-11-16

org.apache.commons.lang3.exception.ExceptionUtils工具类的用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
csharp复制代码import com.tellme.utils.ExceptionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;

@Slf4j
public class TestExceptionUtils {

public static void main(String[] args) {
try {
say();
} catch (Exception e) {
log.error("", e);
System.out.println("----------");
//RuntimeException:xx (打印的是直接异常)
System.out.println("ExceptionUtils.getMessage(e):" + ExceptionUtils.getMessage(e));
//ArithmeticException: / by zero (打印的是根异常,但是若是抛出异常时,没将跟异常向外抛出,那么捕获到的依旧是RuntimeException:xx)
System.out.println("ExceptionUtils.getRootCauseMessage(e):" + ExceptionUtils.getRootCauseMessage(e));
//打印的是异常堆栈信息
System.out.println("ExceptionUtils.getStackTrace(e):" + ExceptionUtils.getStackTrace(e));
//java.lang.ArithmeticException: / by zero (打印的是根异常,但是若是抛出异常时,没将跟异常向外抛出,那么返回null)
System.out.println("ExceptionUtils.getRootCause(e):" + ExceptionUtils.getRootCause(e));
//异常链中异常的数量:2
System.out.println("ExceptionUtils.getThrowableCount(e):" + ExceptionUtils.getThrowableCount(e));
//java.lang.ArithmeticException: / by zero 数组,第一位的的是根异常信息
System.out.println("ExceptionUtils.getRootCauseStackTrace(e)[0]:" + ExceptionUtils.getRootCauseStackTrace(e)[0]);
//java.lang.RuntimeException: xx 数组,第一位的是直接异常信息
System.out.println("ExceptionUtils.getStackFrames(e)[0]:" + ExceptionUtils.getStackFrames(e)[0]);
//异常链的数组:[java.lang.RuntimeException: xx, java.lang.ArithmeticException: / by zero]
System.out.println("ExceptionUtils.getThrowableList(e):" + ExceptionUtils.getThrowableList(e));
//自定义的异常链信息(打印直接异常的详细信息):ExceptionUtil.getLogErrorMessage(e):xx||RuntimeException||com.tellme.test.TestExceptionUtils.say(TestExceptionUtils.java:38)
System.out.println("ExceptionUtil.getLogErrorMessage(e):" + ExceptionUtil.getLogErrorMessage(e));
}
}

private static void say() {
try {
int i = 1 / 0;
} catch (Exception e) {
//将e传入,即保持了异常链的信息。否则的话,异常链便断了。
throw new RuntimeException("xx", e);
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

简单谈谈ConcurrentHashMap 简单谈谈Conc

发表于 2021-11-16

简单谈谈ConcurrentHashMap

前言

本文学习与B站ConcurrentHashMap底层原理剖析

本文只简单讲解ConcurrentHashMap的一些面试题,用于理解ConcurrentHashMap,并不会特别深入

如果对于HashMap不熟悉的读者,在阅读本文前可以先阅读我的另一篇博客《简单谈谈HashMap》

ConcurrentHashMap的实现原理是什么?

jdk7原理

JDK7中的ConcurrentHashMap由Segment和HashEntry组成,即ConcurrentHashMap把哈希桶数组切分成小数组(Segment) ,每个小数组有n个HashEntry组成。

分为了多个小数组Segment,并为每个Segment都上了锁,我们称之为分段锁

将数据分为一段一段的存储,然后给每一 段数据配一把锁,当一个线程占用锁访问其中一段数据时,其他段的数

据也能被其他线程访问,实现并发访问。

jdk8原理

ConcurrentHashMap采用了和1.8的hashmap相同的结构:数组+链表+红黑树

在锁的实现上,抛弃了原有的Segment分段锁,采用CAS + synchronized实现更加细粒度的锁。将锁的级别控制在了更细粒度的哈希桶数组元素级别,只需要锁住这个链表头节点(红黑树的根节点), 就不会影响其他的哈希桶数组元素的读写,大大提高了并发度。

get方法需要加锁吗?为什么?

get方法不需要加锁

使用了volatile关键字,保证了内存的可见性

多线程的环境下修改了节点的value和新增了节点对于其他线程是可见的

(在1.7中的HashEntry也是用的volatile修饰value和next,道理一样的)

1
2
3
4
5
6
java复制代码    static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}

ConcurrentHashMap不支持key或value为空的原因是什么?

在很多java资料中,都有提到 ConcurrentHashmap HashMap和Hashtable都是key-value存储结构,但他们有一个不同点是 ConcurrentHashmap、Hashtable不支持key或者value为null,而HashMap是支持的

ConcurrentHashmap和Hashtable都是支持并发的,这样会有一个问题,当你通过get(k)获取对应的value时,如果获取到的是null时,你无法判断,它是put(k,v)的时候value为null,还是这个key从来没有做过映射。

HashMap是非并发的,可以通过contains(key)来做这个判断来进一步分析

而支持并发的ConcurrentHashmap和Hashtable在调用m.contains(key)和m.get(key)时,m可能已经不同了

ConcurrentHashMap迭代器是强一致性还是弱一 致性?

与HashMap迭代器是强一致性不同, ConcurrentHashMap 迭代器是弱一致性。

ConcurrentHashMap的迭代器创建后,就会按照哈希表结构遍历每个元素

但在遍历过程中,内部元素可能发生变化,如果变化发生在已遍历过的部分,迭代器就不会反映出来

而如果变化发生在未遍历过的部分,迭代器就会发现并反映出来,这就是弱一致性。

这样迭代器线程可以使用原来老的数据,而写线程也可以并发的完成改变,保证了多个线程并发执行的连续性和扩展性,是性能提升的关键。

JDK 7和JDK 8中ConcurrentHashMap的区别是什么?

底层数据结构

  • JDK7底层数据结构是使用Segment组织的数组+链表
  • JDK8中取而代之的是数组+链表+红黑树的结构,在链表节点数量大于8 (且数据总量大于等于64)时,会将链表转化为红黑树进行存储

查询时间复杂度

  • JDK7的遍历链表O(n)
  • JDK8 变成遍历红黑树O(logN)

保证线程安全机制

  • JDK7采用Segment的分段锁机制实现线程安全,其中Segment继承自ReentrantLock
  • JDK8采用CAS+synchronized保证线程安全

锁的粒度

  • JDK7是对需要进行数据操作的Segment加锁
  • JDK8调整为对每个数组元素的头节点加锁

JDK 8中为什么使用同步锁synchronized替换ReentrantLock?

synchronized性能提升

在JDK6中对synchronized锁的实现引入了大量的优化,会从无锁->偏向锁->轻量级锁->重量级锁一步步转换就是锁膨胀的优化,以及有锁的粗化锁消除自适应自旋等优化

提升并发度和减少内存开销

CAS + synchronized方式时加锁的对象是每个链条的头结点,相对Segment再次提高了并发度

如果使用可重入锁达到同样的效果,则需要大量继承自ReentrantLock的对象,造成巨大内存浪费。

ConcurrentHashMap的并发度是怎么设计的?

并发度可以理解为程序运行时能够同时更新ConcurrentHashMap且不产生锁竞争的最大线程数

在JDK7中,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[的数组长度,默认是16,这个值可以在构造函数中设置。

如果自己设置了并发度,ConcurrentHashMap会使用大于等于该值的最小的2的幂指数作为实际并发度

如果并发度设置的过小,会带来严重的锁竞争问题

如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,引起程序性能下降

因为有着种种限制和问题,在jdk8中进行了优化,抛弃了segment和分段锁,转而锁头结点,结构为数组+链表+红黑树

ConcurrentHashMap和Hashtable的效率哪个更高?为什么?

ConcurrentHashMap比Hashtable效率更高

因为Hashtable给整个哈希表加锁从而实现线程安全

而ConcurrentHashMap的锁粒度更低

  • 1.7的分段锁
  • 1.8的CAS+Synchronized锁头结点

多线程下安全的操作Map还有其它方法吗?

使用Hashtable代替

new一个Hashtable用作map,对全表上锁,效率不高,参考上一条问题

1
java复制代码Map<String, String> hashtable = new Hashtable<>();

使用Collections.synchronizedMap转为SynchronizedMap

Collections的synchronizedMap将HashMap转为线程安全的SynchronizedMap

本质上也是对全表上锁,效率不如ConcurrentHashMap

1
2
java复制代码        HashMap<String, String> map = new HashMap<>();
Map<String, String> synchronizedMap = Collections.synchronizedMap(map);

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

简单谈谈Juc并发编程——上 简单谈谈Juc并发编程——上

发表于 2021-11-16

简单谈谈Juc并发编程——上

前言

本课程学习与B站狂神说Java的JUC并发编程

本课程的代码都放在了我的个人gitee仓库上了

什么是JUC?

  • java.util.concurrent juc
  • java.util.concurrent.atomic 原子性
  • java.util.concurrent.locks 锁

平时业务中可能用Thread

或者像Runnable接口实现,没有返回值,而且效率相对于callable较低

java.util.concurrent
Interface Callable

进程与线程

我们都知道计算机的核心是CPU,它承担了所有的计算任务,而操作系统是计算机的管理者,它负责任务的调度,资源的分配和管理,统领整个计算机硬件

  • 进程是一个具有一定独立功能的程序在一个数据集上的一次动态执行的过程,是操作系统进行资源分配和调度的一个独立单位,是应用程序运行的载体
  • 线程是程序执行中一个单一的顺序控制流程,是程序执行流的最小单元,是处理器调度和分派的基本单位。一个进程可以有一个或多个线程,各个线程之间共享程序的内存空间(也就是所在进程的内存空间)

进程:一个程序,QQ.exe Music.exe 程序的集合;
一个进程往往可以包含多个线程,至少包含一个!

Java默认有几个线程? 2 个 main、GC
线程:开了一个进程 Typora,写字,自动保存(线程负责的)

对于Java而言:Thread、Runnable、Callable
Java 真的可以开启线程吗? 开不了

1
2
java复制代码// 本地方法,调用了底层的C++,因为java运行在jvm虚拟机上,Java无法直接操作硬件
private native void start0();

线程有几个状态?

Thread.State可以看到,是一个枚举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码   public enum State {
//尚未启动的线程的线程状态
//新生
NEW,
//运行
RUNNABLE,
//阻塞
BLOCKED,
//等待,死等
WAITING,
//超时等待,过期不候
TIMED_WAITING,
//终止
TERMINATED;
}

并发和并行

  • 并发是指两个或多个事件在同一时间间隔发生->交替进行
    • 一核cpu,模拟出来多个线程,快速交替运行
  • 并行是指两个或者多个事件在同一时刻发生 ->同时进行
    • 多核cpu,多个线程同时执行,线程池

并发编程的目标是充分的利用cpu的每一个核,以达到最高的处理性能

1
2
java复制代码    //获取cpu的核数
System.out.println(Runtime.getRuntime().availableProcessors());

wait/sleep 区别

1、来自不同的类

wait => Object

sleep => Thread

2、关于锁的释放

wait 会释放锁

sleep 睡觉了,抱着锁睡觉,不会释放!

3、使用的范围是不同的

wait必须在synchronized同步代码块中使用

sleep可以在任何地方睡

4、是否需要捕获异常(存疑)

throws InterruptedException

wait 也需要捕获异常(实测提示需要捕获异常,且不捕获会报错!)

sleep 必须要捕获异常

Lock锁

只要是并发编程,就一定需要有锁!

传统的synchronized锁

此处不谈线程池,讲普通的方法

解耦线程类,不必要再去写一个单独的线程类继承Runnable接口

而是使用lambda表达式()->{}实现Runnable接口来创建线程

1
2
3
java复制代码        new Thread(()->{
//do something
},"Name").start();

然后synchronized锁方法上,锁住这个对象

1
2
3
4
java复制代码        public synchronized void sale(){
if(num<=0)return;
System.out.println(Thread.currentThread().getName()+" 买到第"+(num--)+"张票,剩下"+num+"张票");
}

还是老生常谈的卖票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码public static void main(String[] args) throws InterruptedException {

Ticket ticket = new Ticket();

new Thread(()->{
for(int i=0;i<100;i++){
ticket.sale();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();

new Thread(()->{
for(int i=0;i<100;i++){
ticket.sale();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();

new Thread(()->{
for(int i=0;i<100;i++){
ticket.sale();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
}
static class Ticket{
private int num=100;
public synchronized void sale(){
if(num<=0)return;
System.out.println(Thread.currentThread().getName()+" 买到第"+(num--)+"张票,剩下"+num+"张票");
}
}

Lock锁

java.util.concurrent.locks.Lock的Lock是一个接口

建议的做法是始终立即跟随lock与try块的通话

最常见的是在之前/之后的上锁lock.lock(); 和解锁lock.unlock()

它有几个实现类:

  • ReentrantLock可重入锁
  • ReentrantReadWriteLock.ReadLock读锁
  • ReentrantReadWriteLock.WriteLock写锁

我们来看看可重入锁ReentrantLock的构造器

1
2
3
4
5
6
7
8
java复制代码   //默认创建非公平锁Nonfair
public ReentrantLock() {
sync = new NonfairSync();
}
//boolean参数为true创建公平锁Fair,反之创建非公平锁Nonfair
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

公平锁:公平:需要先来后到
非公平锁:不公平:可以插队 (默认)

使用三部曲

  1. Lock lock=new ReentrantLock();实例化锁对象
  2. lock.lock();上锁
  3. 在finally中lock.unlock();解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码    static class Ticket{
private int num=100;
//可重入锁
Lock lock=new ReentrantLock();
public void sale(){
//上锁
lock.lock();

try{
//do something 业务代码
if(num<=0)return;
System.out.println(Thread.currentThread().getName()+" 买到第"+(num--)+"张票,剩下"+num+"张票");
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
}

synchronized和lock的区别

  1. Synchronized 内置的Java关键字, Lock 是一个Java类
  2. Synchronized 无法判断获取锁的状态,Lock 可以判断是否获取到了锁
  3. Synchronized 会自动释放锁,lock 必须要手动释放锁!如果不释放锁,死锁
    • 如果在Synchronized中出现异常,会自动释放锁
  4. Synchronized 线程 1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去;
    • lock.tryLock()尝试获取锁 ,获取不到就自己掉头走了不会等下去
  5. Synchronized 可重入锁,不可以中断的,非公平;Lock ,可重入锁,可以判断锁,非公平(可以自己设置)
  6. Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码!

生产者、消费者问题

面试:单例模式、排序算法、生产消费者、死锁

老版的synchronized实现

当num为0时,消费者等待,生产者生成消息

当num>=0时,生产者等待,消费者进行消费

我们先来看一下这段问题代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码    public static void main(String[] args) throws InterruptedException {
Data data = new Data();
new Thread(()->{
for(int i=0;i<100;i++){
try {
data.pro();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"ProducerA").start();

new Thread(()->{
for(int i=0;i<100;i++){
try {
data.con();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"ConsumerA").start();

}
static class Data{
private int num=0;

public synchronized void pro() throws InterruptedException {
if(num!=0){
System.out.println(Thread.currentThread().getName()+"正在等待");
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName()+" 生产者生产了一条消息,此时num="+num);
this.notifyAll();
}
public synchronized void con() throws InterruptedException {
if(num==0){
System.out.println(Thread.currentThread().getName()+"正在等待");
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName()+" 消费者消费了一条消息,此时num="+num);
this.notifyAll();
}
}

这个时候代码会正确运行嘛,结论是会的

那如果我们放置多个producer和consumer呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码        new Thread(()->{
for(int i=0;i<100;i++){
try {
data.pro();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"ProducerA").start();

new Thread(()->{
for(int i=0;i<100;i++){
try {
data.pro();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"ProducerB").start();

new Thread(()->{
for(int i=0;i<100;i++){
try {
data.con();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"ConsumerA").start();

new Thread(()->{
for(int i=0;i<100;i++){
try {
data.con();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"ConsumerB").start();

可以看见有很大几率会出问题

ConsumerA 消费者消费了一条消息,此时num=-92
ConsumerA 消费者消费了一条消息,此时num=-93
ConsumerA 消费者消费了一条消息,此时num=-94
ConsumerA 消费者消费了一条消息,此时num=-95
ConsumerA 消费者消费了一条消息,此时num=-96
ConsumerB 消费者消费了一条消息,此时num=-97
ProducerB 生产者生产了一条消息,此时num=-96

正在等待

这里出现的就是虚假唤醒

查看Object的wait方法的api文档可以看见

线程也可以唤醒,而不会被通知,中断或超时,即所谓的虚假唤醒

比如说买货,如果商品本来没有货物,突然进了一件商品,这是所有的线程都被唤醒了 ,但是只能一个人买,所以其他人都是假唤醒,获取不到对象的锁

虽然这在实践中很少会发生,但应用程序必须通过测试应该使线程被唤醒的条件来防范,并且如果条件不满足则继续等待。

换句话说,等待应该总是出现在循环中

为什么if块会存在虚假唤醒的情况?

在if块中使用wait方法,是非常危险的,因为一旦线程被唤醒,并得到锁,就不会再判断if条件,而执行if语句块外的代码

所以建议,凡是先要做条件判断,再wait的地方,都使用while循环来做

1
2
3
4
5
java复制代码	synchronized (obj) {
while (<condition does not hold>)
obj.wait(timeout);
... // Perform action appropriate to condition
}

所以我们将原有的代码将if改为while

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码        public synchronized void pro() throws InterruptedException {
while (num!=0){
System.out.println(Thread.currentThread().getName()+"正在等待");
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName()+" 生产者生产了一条消息,此时num="+num);
this.notifyAll();
}
public synchronized void con() throws InterruptedException {
while (num==0){
System.out.println(Thread.currentThread().getName()+"正在等待");
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName()+" 消费者消费了一条消息,此时num="+num);
this.notifyAll();
}

juc版的生产者和消费者实现

使用Lock和Condition两个接口

其中lock对象我们这使用ReentrantLock实例化

condition对象使用lock.newCondition()获取

Condition实现可以提供Object监视器方法的行为和语义,例如有保证的通知顺序,或者在执行通知时不需要

锁定

一个Condition实例本质上绑定到一个锁。 要获得特定Condition实例的Condition实例,请使用其

newCondition()方法

Condition因素出Object监视器方法( wait,notify 和 notifyAll )成不同的对象,以得到具有多个等待集的每个对象,通过将它们与使用任意的组合的效果Lock个实现。

Lock替换synchronized方法和语句的使用, Condition取代了对象监视器方法的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码    static class Data{
private int num=0;

//获取锁,取代了wait和notify
Lock lock=new ReentrantLock();
//获取condition,取代synchronized
Condition condition = lock.newCondition();

public void pro(){
lock.lock();
try{

while (num!=0){
System.out.println(Thread.currentThread().getName()+"正在等待");
//等待
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName()+" 生产者生产了一条消息,此时num="+num);
//唤醒
condition.signalAll();

}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}

}
public void con(){
lock.lock();
try{
while (num==0){
System.out.println(Thread.currentThread().getName()+"正在等待");
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName()+" 消费者消费了一条消息,此时num="+num);
condition.signalAll();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}

}
}

也许会有人问:既然synchronized更简洁,这里反而还多加了一层condition,岂不是更麻烦了?

当然不是

Lock+Condition与synchronized的区别

设置多个Condition监视器可以实现精准的通知和唤醒线程

个人理解:不用就等待,需要则唤醒

Condition监视器的精准唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
java复制代码import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @Author if
* @Description: 使用Condition实现精准唤醒
* @Date 2021-11-05 上午 12:04
*/
public class Test03 {
public static void main(String[] args) {
Data data=new Data();
new Thread(()->{
for(int i=0;i<10;i++){
data.soutA();
}
},"A").start();
new Thread(()->{
for(int i=0;i<10;i++){
data.soutB();
}
},"B").start();
new Thread(()->{
for(int i=0;i<10;i++){
data.soutC();
}
},"C").start();
}
static class Data{

private Lock lock=new ReentrantLock();
private Condition condition1=lock.newCondition();
private Condition condition2=lock.newCondition();
private Condition condition3=lock.newCondition();
private int num=1;

public void soutA(){
lock.lock();
try{
//num不为1时,condition1等待
while(num!=1){
condition1.await();
}
System.out.println("AAAAAA");
num=2;
//A输出完后,唤醒condition2来输出B
condition2.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}

public void soutB(){
lock.lock();
try{
//num不为2时,condition2等待
while(num!=2){
condition2.await();
}
System.out.println("BBBBBB");
num=3;
//B输出完后,唤醒condition3来输出C
condition3.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}

public void soutC(){
lock.lock();
try{
//num不为3时,condition3等待
while(num!=3){
condition3.await();
}
System.out.println("CCCCCC");
num=1;
//B输出完后,唤醒condition1来输出A
condition1.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
}

结果

AAAAAA
BBBBBB
CCCCCC
AAAAAA
BBBBBB
CCCCCC

其实condition还有awaitNanos超时等待和awaitUntil超时时间等待,下文ArrayBlockingQueue会讲到

八锁问题

1.锁对象的同步锁synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
java复制代码import java.util.concurrent.TimeUnit;

/**
* @Author if
* @Description:
*
* 问题:main主线程中 没有 添加TimeUnit.SECONDS.sleep(1);之前,输出顺序?
* 答题:看cpu的时间片分配,随机的(多开几个线程测试就行)
*
* 问题:mail方法中 没有 添加TimeUnit.SECONDS.sleep(4);之前,输出顺序?
* 答题:先发邮件再打电话
*
* 问题:mail方法中添加了TimeUnit.SECONDS.sleep(4);后,输出顺序?
* 答题:还是先发邮件再打电话,因为synchronized不会释放锁,直到代码结束才释放
*
* @Date 2021-11-05 上午 12:20
*/
public class Test1 {

public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.mail();
},"A").start();

try{
//main线程休眠
TimeUnit.SECONDS.sleep(1);
}catch(Exception e){
e.printStackTrace();
}

new Thread(()->{
phone.call();
},"B").start();
}

static class Phone{
//synchronized锁住调用者(实例化的对象)
public synchronized void mail(){
try{
//休眠了锁也没被释放
TimeUnit.SECONDS.sleep(4);
}catch(Exception e){
e.printStackTrace();
}
System.out.println("发邮件");
}
public synchronized void call(){
System.out.println("打电话");
}
}
}

2.synchronized和普通方法不同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
java复制代码import java.util.concurrent.TimeUnit;

/**
* @Author if
* @Description:
*
* 问题:如果我将mail方法上synchronized,call方法不上,此时调用顺序
* 答题:先打电话再发邮件,因为call不需要获得锁就能执行,只是需要等待main线程的1秒睡眠
*
* @Date 2021-11-05 上午 12:29
*/
public class Test2 {
public static void main(String[] args) {
Phone phone = new Phone();
/**
* 问题:假如现在有两个phone,且mail和call方法都是synchronized时,调用俩对象的方法时的执行顺序
* 答题:当然是按正常顺序来了,因为synchronized锁的调用者,即实例化对象,而这有两个不同的对象
*/
// Phone phone1 = new Phone();
// Phone phone2 = new Phone();
// new Thread(()->{
// phone1.mail();
// },"A").start();
// try{
// //main线程休眠
// TimeUnit.SECONDS.sleep(1);
// }catch(Exception e){
// e.printStackTrace();
// }
// new Thread(()->{
// phone2.call();
// },"B").start();


new Thread(()->{
phone.mail();
},"A").start();

try{
//main线程休眠
TimeUnit.SECONDS.sleep(1);
}catch(Exception e){
e.printStackTrace();
}

new Thread(()->{
phone.call();
},"B").start();
}

static class Phone{
//synchronized锁住调用者(实例化的对象)
public synchronized void mail(){
try{
//休眠了锁也没被释放
TimeUnit.SECONDS.sleep(4);
}catch(Exception e){
e.printStackTrace();
}
System.out.println("发邮件");
}
//这里没有上同步锁,当然不受影响了
public void call(){
System.out.println("打电话");
}
}
}

3.锁class的同步锁synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
java复制代码import java.util.concurrent.TimeUnit;

/**
* @Author if
* @Description:
*
* 问题:mail和call方法改为static静态方法后,调用顺序
* 答题:先短信再电话,因为此时锁住的Phone.class,而不是实例化对象
*
* 问题:实例化两个对象,phone1和phone2,调用顺序
* 答题:还是先短信再电话,锁住的是Phone.class,而不是实例化对象
*
*
* @Date 2021-11-05 上午 12:37
*/
public class Test3 {
public static void main(String[] args) {

Phone phone = new Phone();

new Thread(()->{
phone.mail();
},"A").start();

try{
//main线程休眠
TimeUnit.SECONDS.sleep(1);
}catch(Exception e){
e.printStackTrace();
}

new Thread(()->{
phone.call();
},"B").start();


Phone phone1 = new Phone();
Phone phone2 = new Phone();

new Thread(()->{
phone1.mail();
},"A").start();

try{
//main线程休眠
TimeUnit.SECONDS.sleep(1);
}catch(Exception e){
e.printStackTrace();
}

new Thread(()->{
phone2.call();
},"B").start();

}

static class Phone{
//synchronized锁住class,和对象相不相同没有关系,因为多个对象的class都是这同一个
public static synchronized void mail(){
try{
TimeUnit.SECONDS.sleep(4);
}catch(Exception e){
e.printStackTrace();
}
System.out.println("发邮件");
}
public static synchronized void call(){
System.out.println("打电话");
}
}
}

4.静态synchronized和普通synchronized锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码import java.util.concurrent.TimeUnit;

/**
* @Author if
* @Description:
*
* 问题:此时mail是静态synchronized方法,call是普通synchronized方法,问执行顺序?
* 答题:
* 先打电话再发邮件,因为两把锁不同,mail虽然先start,但是他锁住class且睡眠4秒
* 此时main线程的1秒已经结束,开始call的start,而call锁的对象锁,和mail的class锁不同
* 所以不需要同步,则执行call
*
* @Date 2021-11-05 上午 12:46
*/
public class Test4 {
public static void main(String[] args) {

Phone phone = new Phone();

new Thread(()->{
phone.mail();
},"A").start();

try{
//main线程休眠
TimeUnit.SECONDS.sleep(1);
}catch(Exception e){
e.printStackTrace();
}

new Thread(()->{
phone.call();
},"B").start();


}

static class Phone{

public static synchronized void mail(){
try{
TimeUnit.SECONDS.sleep(4);
}catch(Exception e){
e.printStackTrace();
}
System.out.println("发邮件");
}
public synchronized void call(){
System.out.println("打电话");
}
}
}

总结

synchronized在代码块或者普通方法中,锁住的是方法的调用者(实例化对象)

在静态方法中,锁住的类的class

对象锁和class锁不同,所以不需要同步

不安全的List类

我们之前使用的集合都是在单线程情况下,所以没有出现问题,但是其实很多都是不安全的

例如我们平时经常使用的ArrayList

1
2
3
4
5
6
7
8
9
java复制代码        //多线程下的ArrayList插入报错
// 并发修改异常:java.util.ConcurrentModificationException
List<String> stringList=new ArrayList<>();
for(int i=1;i<=100;i++){
new Thread(()->{
stringList.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(stringList);
},i+"").start();
}

单线程玩多了,乍一看没什么问题,可是这是在多线程的情况下,就会出现并发修改异常

如何优化让他变成线程安全的呢?

1.使用Vector代替

Vector的增删改查都加上了同步锁synchronized,使得线程安全

但是效率怎么样呢?我们下文再说

1
2
3
4
5
6
7
8
java复制代码        //多线程下的Vector没有报错,因为底层加了synchronized
List<String> stringVector=new Vector<>();
for(int i=1;i<=100;i++){
new Thread(()->{
stringVector.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(stringVector);
},i+"").start();
}

2.使用Collections转synchronizedList

使用Collections.synchronizedList()方法将普通list转为线程安全的list

1
2
3
4
5
6
7
8
9
10
java复制代码        //如果我想用安全的list呢
//使用Collections.synchronizedList将普通list转为线程安全的list
List<String> stringList=new ArrayList<>();
List<String> synchronizedList = Collections.synchronizedList(stringList);
for(int i=1;i<=100;i++){
new Thread(()->{
synchronizedList.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(synchronizedList);
},i+"").start();
}

如何既保证线程安全,效率也高呢

使用JUC的CopyOnWriteArrayList

JUC:使用CopyOnWriteArrayList,解决并发

COW :写入时复制,一种优化策略

list是唯一固定的,多个线程读取时是固定的,但是写入时有可能会覆盖

COW写入时避免了覆盖,防止了数据问题

1
2
3
4
5
6
7
8
9
10
java复制代码        /**
* JUC:使用CopyOnWriteArrayList,解决并发
*/
List<String> list = new CopyOnWriteArrayList<>();
for(int i=1;i<=100;i++){
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},i+"").start();
}

怎么解决的?

写入时先复制一份长度+1的数组,然后末尾插入数据,再把数组赋给原数组完成插入

插入源码为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码    public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

CopyOnWriteArrayList比vector好在哪?

Vector 的增删改查方法都加上了synchronized锁,保证同步的情况下,每个方法都要去获得锁,所以性能会下降

CopyOnWriteArrayList 方法只是在增删改方法上增加了ReentrantLock锁

但是他的读方法不加锁,==读写分离==,所以在读的方面就要比Vector性能要好

CopyOnWriteArrayList适合读多写少的并发情况

不安全的Set类

和上面list差不多,我就不多做讲解了,直接贴代码

多线程下报错

1
2
3
4
5
6
7
8
java复制代码        //并发修改异常:java.util.ConcurrentModificationException
HashSet<String> set = new HashSet<>();
for(int i=1;i<=100;i++){
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},i+"").start();
}

解决方案

1.转synchronizedSet

1
2
3
4
5
6
7
8
java复制代码        HashSet<String> set = new HashSet<>();
Set<String> synchronizedSet = Collections.synchronizedSet(set);
for(int i=1;i<=100;i++){
new Thread(()->{
synchronizedSet.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(synchronizedSet);
},i+"").start();
}

2.使用CopyOnWriteArraySet

1
2
3
4
5
6
7
java复制代码        Set<String> set = new CopyOnWriteArraySet<>();
for(int i=1;i<=100;i++){
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},i+"").start();
}

简单说明一下HashSet的实现

这里提一嘴HashSet的实现,说了不一定加分,但是说不出来一定扣分

本质就是就是new的HashMap,然后new的Object当做HashMap的value,add的参数当做key

因为是hash算法,所以HashSet是无序的

因为key不能重复,所以HashSet的的元素是不能重复的

1
2
3
4
5
6
7
8
9
10
java复制代码    private transient HashMap<E,Object> map;
private static final Object PRESENT = new Object();

public HashSet() {
map = new HashMap<>();
}

public boolean add(E e) {
return map.put(e, PRESENT)==null;
}

不安全的Map类

单线程中我们经常使用的HashMap在多线程下也是不安全的

1
2
3
4
5
6
7
8
java复制代码        //并发修改异常:ConcurrentModificationException
HashMap<String, String> map = new HashMap<>();
for(int i=1;i<=100;i++){
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
}).start();
}

1.用Hashtable代替

1
2
3
4
5
6
7
java复制代码        Map<String, String> hashtable = new Hashtable<>();
for (int i = 1; i <= 100; i++) {
new Thread(() -> {
hashtable.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
System.out.println(hashtable);
}).start();
}

和之前的Vector代替ArrayList一样,用synchronized简单粗暴的加上同步保证线程安全,只是效率可能会低一些

2.转synchronizedMap

1
2
3
4
5
6
7
8
java复制代码        HashMap<String, String> map = new HashMap<>();
Map<String, String> synchronizedMap = Collections.synchronizedMap(map);
for (int i = 1; i <= 100; i++) {
new Thread(() -> {
synchronizedMap.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
System.out.println(synchronizedMap);
}).start();
}

3.使用ConcurrentHashMap

使用java.util.concurrent.ConcurrentHashMap

并发的HashMap,在保证了线程安全的情况下也保证了效率的高效,推荐使用

对ConcurrentHashMap不熟悉的小伙伴可以看看我的《简单谈谈ConcurrentHashMap》

1
2
3
4
5
6
7
java复制代码        Map<String, String> concurrentHashMap = new ConcurrentHashMap<>();
for (int i = 1; i <= 100; i++) {
new Thread(() -> {
concurrentHashMap.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
System.out.println(concurrentHashMap);
}).start();
}

走进Callable

返回结果并可能引发异常的任务。 实现者定义一个没有参数的单一方法,称为call

Callable接口类似于Runnable ,因为它们都是为其实例可能由另一个线程执行的类设计的

然而,A Runnable不返回结果,也不能抛出被检查的异常

  • 可以有返回值
  • 可以抛出异常
  • 方法不同
    • Runnable是run()
    • Callable是call()

老版本创建线程的两种方式

1.extends Thread

1
2
3
4
5
6
7
8
9
10
11
java复制代码public static void main(String[] args) {
//老版方式:线程类继承Thread重写run方法,然后启动
new MyThread().start();
}

static class MyThread extends Thread{
@Override
public void run() {
System.out.println("class MyThread extends Thread");
}
}

2.实现Runnable接口

1
2
3
4
5
6
7
8
9
10
java复制代码public static void main(String[] args) {
//老版方式:线程类实现Runnable的run,将实例化对象放入Thread参数启动线程
new Thread(new MyRun()).start();
}
static class MyRun implements Runnable{
@Override
public void run() {
System.out.println("class MyRun implements Runnable");
}
}

使用Callable创建线程

我们这里需要一个适配类FutureTask

这个类实现了RunnableFuture类,FutureTask<V> implements RunnableFuture<V>

RunnableFuture类继承了Runnable,RunnableFuture<V> extends Runnable, Future<V>

所以Thread(Runnable target)可以将其传入

注意:futureTask.get()可以获取返回结果,但是可能会抛异常,需要捕获或抛出

因为要等待执行完毕才返回,所以有可能会阻塞,最好把它放在最后,或者异步通信来处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public static void main(String[] args) {
/**
* 使用Callable
* 线程类实现Callable实现call()方法,这个方法可以有返回值,就是定义的泛型类型
* 创建FutureTask,将线程类对象传入
* 将FutureTask对象传入Thread再启动
*
* Thread(Runnable target)
* FutureTask<V> implements RunnableFuture<V>
* RunnableFuture<V> extends Runnable, Future<V>
*/
MyCall callable = new MyCall();
//适配类
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
//获取返回结果,可能会抛异常
//可能会阻塞,因为要等待执行完毕,所以最好把他放在最后,或者异步通信来处理
try {
Integer integer = futureTask.get();
System.out.println(integer);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

}

static class MyCall implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("class MyCall implements Callable<Integer>");
return 1024;
}
}

FutureTask的状态

如果我们此时用同一个FutureTask传入两条线程,会输出两次结果吗?

1
2
3
4
5
6
7
8
9
java复制代码        MyCall callable = new MyCall();
//适配类
FutureTask<Integer> futureTask = new FutureTask<>(callable);
//如果此时用同一个futureTask对象开启两条线程会有什么结果呢?
//答案是:只会有一个线程进行运行,且只输出一次结果
//因为FutureTask的state的状态从初始化NEW变了完成状态COMPLETING
//然后在run方法中判断不为NEW则直接返回不执行了
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();

答案是:不会,只会执行一次!

为什么?

我们看看源码

可以看到FutureTask有一个state表示状态变量,还有很多int类型的常量表示具体状态

这里我们暂时只关注NEW和COMPLETING

1
2
3
4
5
6
7
8
9
java复制代码    public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

在构造器中,默认给state为NEW

1
2
3
4
5
6
java复制代码    public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

第一条线程进入

在run方法中,执行了callable的call方法后,会将判断变量ran设置为trueif (ran) set(result);

而在set方法中UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)

将NEW状态变为了COMPLETING

也就是说此条FutureTask已经完成了他的使命,变为COMPLETING完成状态

当下一条线程进来判断state != NEW时,直接return

所以执行了一次之后,其他的线程都无法继续执行run,也就是Callable的call方法了

所以可以得出结论:正常情况下,一个FutureTask只能执行一次call

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码    public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

常用的辅助类

CountDownLatch减法计数器

java.util.concurrent.CountDownLatch

减法计数器

允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助类

可用于某些线程的强制执行

CountDownLatch用给定的计数初始化。 await方法阻塞,直到由于countDown()方法的调用而导致当前计数达到零,之后所有等待线程被释放,并且任何后续的await 调用立即返回。 这是一个一次性的现象 - 计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier 。

A CountDownLatch是一种通用的同步工具,可用于多种用途。 一个CountDownLatch为一个计数的CountDownLatch用作一个简单的开/关锁存器,或者门:所有线程调用await在门口等待,直到被调用countDown()的线程打开。 一个CountDownLatch初始化N可以用来做一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次。

CountDownLatch一个有用的属性是,它不要求调用countDown线程等待计数到达零之前继续,它只是阻止任何线程通过await ,直到所有线程可以通过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码import java.util.concurrent.CountDownLatch;

/**
* @Author if
* @Description: 倒计时锁存器(减法计数器)
* 每次调用countDown方法进行-1
* 当总数不为0时,会一直阻塞下去
* 可以用于线程的强制执行(因为不执行会阻塞)
* @Date 2021-11-06 上午 12:00
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//count总数,用于倒计时
//配合await方法,在倒计时结束前不会再向下执行代码
CountDownLatch countDownLatch = new CountDownLatch(12);

for(int i=1;i<=6;i++){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第"+i+"次执行countDown");
countDownLatch.countDown();
// new Thread(()->{
// System.out.println(Thread.currentThread().getName()+"线程走了");
// //数量-1
// countDownLatch.countDown();
// },i+"").start();
}
//等待计数器归零,然后再向下执行
countDownLatch.await();
System.out.println("关门");
}
}

CyclicBarrier加法计数器

java.util.concurrent.CyclicBarrier

加法计数器

允许一组线程全部等待彼此达到共同屏障点的同步辅助类

可以用于某些线程的强制等待

循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。

A CyclicBarrier支持一个可选的Runnable命令,每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程释放之前。 在任何一方继续进行之前,此屏障操作对更新共享状态很有用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* @Author if
* @Description: 循环屏障(加法计数器)
* 使用await方法阻塞当前线程
* 当执行的await方法次数达不到构造器传入的参数parties时,会一直阻塞下去
* @Date 2021-11-06 上午 12:08
*/
public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
//集齐7颗龙珠召唤神龙
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("集齐7颗龙珠召唤神龙成功");
});
for(int i=1;i<=7;i++){
Thread.sleep(500);
int finalI = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集了第"+finalI+"颗龙珠");
try {
//执行等待,直到等待了7次
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
},i+"").start();
}
}
}

Semaphore信号量

一个计数信号量,在概念上,信号量维持一组许可证。

如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 每个release()添加许可证,潜在地释放阻塞获取方

但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行

信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源

我们这假设有3个车位和6辆车需要停车,所以只能有3台车能停进去,其他的车需要等待车位空出才能停

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
* @Author if
* @Description: 信号量
* 参数permits许可证,可以用于限制线程数
* @Date 2021-11-06 上午 12:17
*/
public class SemaphoreDemo {
public static void main(String[] args) {
//permits许可证
//我们这假设有3个车位和6辆车需要停车
Semaphore semaphore = new Semaphore(3);

for(int i=1;i<=6;i++){
new Thread(()->{
try{
//acquire得到许可证
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"线程抢到车位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"线程离开车位");
}catch(Exception e){
e.printStackTrace();
}finally{
//release释放许可证
semaphore.release();
}
},i+"").start();
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Laravel 表单验证 使用技巧 表单验证 Last bu

发表于 2021-11-16

「这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战」

表单验证

怎么设置动态的验证规则?

如果我们的限制规则是动态依赖于其他条件的,那么我们可以动态地创建规则数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
php复制代码    public function save(Request $request)
{
$validationArray = [
'title' => 'required',
'name' => 'required',
'logo' => 'file|max:1024',
'position' => 'required',
];

if (!Auth::check()) {
$validationArray = array_merge($validationArray, [
'email' => 'required|email|unique:users',
'password' => 'required|confirmed|min:5',
'name' => 'required'
]);
}
//
}

自定义抛出422状态码

如果我们不使用 validate () 或 Form Request,但仍然需要使用相同的 422 状态码和错误结构抛出错误,那么可以手动抛出 throw ValidationException::withMessages()

这个功能真的是用不到的时候没感觉,用到的时候觉得真优雅~

1
2
3
4
5
php复制代码if (! $user || ! Hash::check($request->password, $user->password)) {
throw ValidationException::withMessages([
'email' => ['邮箱格式不正确'],
]);
}

图片验证

我们在验证上传的图片时,可以指定所需的尺寸

1
css复制代码['image' => 'dimensions:max_width=1096,max_height=1096']

自定义验证错误的信息

我们只需在 resources/lang/xx/validation.php 文件创建适当的数组结构,就可以定义每个 字段、规则和语言的验证错误消息。

1
2
3
4
5
dart复制代码'custom' => [
'email' => [
'required' => '电子邮箱必填',
],
],

某些条件的验证规则

如果验证规则依赖于某些条件,则可以通过将 withValidator() 添加到 FormRequest 类中来修改规则,并在那里指定自定义逻辑。

例如,如果我们只想为某些用户角色添加验证规则。

1
2
3
4
5
6
7
8
9
10
11
scala复制代码use Illuminate\Validation\Validator;

class XxxxxRequest extends FormRequest {

public function withValidator(Validator $validator) {
if (auth()->user()->is_admin) {
$validator->addRules(['xxxx' => 'required']);
}
}

}

更改默认验证消息

如果要更改特定字段和特定验证规则的默认验证错误消息,只需将 messages() 方法添加到 FormRequest 类中。

1
2
3
4
5
6
7
8
9
10
11
12
php复制代码class UserRequest extends FormRequest
{
public function rules()
{
return ['name' => 'required'];
}

public function messages()
{
return ['name.required' => '请填写姓名'];
}
}

预验证

我们想在默认的 Laravel 验证之前修改某个字段,(类似常用的beforeXxx(){}、afterXxx(){})

FormRequest 类中有一个方法 prepareForValidation () , 能实现我们的需求:

在验证之前修改字段

1
2
3
4
5
6
php复制代码protected function prepareForValidation()
{
$this->merge([
'myBlog' => Illuminate\Support\Str::slug($this->myBlog),
]);
}

第一次验证错误时就终止程序

默认情况下,将在列表中返回 Laravel 验证错误,检查所有验证规则。

但是如果我们想要在第一个错误之后终止这个过程,使用验证规则叫做 bail:

1
2
3
4
dart复制代码$request->validate([
'title' => 'bail|required|unique:posts|max:255',
'body' => 'required',
]);

Last but not least

技术交流群请到 这里来。 或者添加我的微信 wangzhongyang0601 ,一起学习一起进步。

感谢大家的点赞、评论、关注;谢谢大佬们的支持,感谢 ღ( ´・ᴗ・` )比心

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

kafka 好用的几个链接

发表于 2021-11-16

blog.csdn.net/qq_39188150…

blog.csdn.net/weixin_4364…

download.csdn.net/download/we…

zhuanlan.zhihu.com/p/47636038

blog.csdn.net/gaoying_blo…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…323324325…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%