开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

进大厂为何需要学习Zookeeper?

发表于 2021-09-06

大家好,我是冰河~~

最近,有很多小伙伴让我更新一些Zookeeper的文章,正好也趁着清明假期把之前自己工作过程当中总结的Zookeeper知识点梳理了一番,打算写一个【精通Zookeeper系列】,希望能够帮助小伙伴们更好的理解和掌握Zookeeper。

今天是【精通Zookeeper系列】的开篇,我们先来简单的介绍下如何内容。

为何要学Zookeeper?

好了,我们直入主题吧,很多小伙伴可能都会问一个问题:为什么要学习Zookeeper呢?

Zookeeper是一个开源的分布式协同服务系统,在业界的应用非常广泛,虽然最近几年有Consul、etcd、Nacos等分布式协同系统问世,但是Zookeeper依然是最主流的分布式协同服务系统。Zookeeper也是一个设计的非常成功的软件系统,从Zookeeper最初按照预想的需求场景进行设计实现到现在,Zookeeper的对外API,在基本上没有改变的前提下,在越来越广泛的领域得到应用,通过学习Zookeeper,小伙伴们可以熟悉Zookeeper中成功的设计和研发思路,提升自己的系统设计水平。另外,Zookeeper是使用Java语言开发的,通过学习Zookeeper,小伙伴们还可以积累使用Java研发系统级软件的实战经验。

另外,Zookeeper的应用场景十分广发,例如:大数据领域中,Hadoop集群、Storm集群、Kafka集群、Spark集群、Flink集群、Flume集群等主流的大数据分析平台,在集群化的场景中,推荐使用Zookeeper作为集群环境中的分布式协同服务。在分布式和微服务领域中,Dubbo、SpringCloud、分布式锁、分布式序列号服务、RPC服务等框架和技术,也能够通过Zookeeper进行实现。

很多大厂在面试过程中,也要求深入掌握Zookeeper技术,所以,学好Zookeeper还是很有必要的。

Zookeeper很难吗?

说实话,大多数技术人员,都可以很快的上手Zookeeper,但是大部分都是局限于基于现有的Zookeeper协同服务示例做一些简单的修改和定制。如果要具备为自己的业务场景设计Zookeeper协同服务的能力,就需要深刻理解Zookeeper的内部工作原理,还要做大量的协同服务设计的积累和总结。理解Zookeeper的内部工作原理,需要学习大量的计算机理论知识,这些理论知识包括:分布式系统、数据库系统和操作系统。找到和学习这些知识点,需要耗费大量的时间。

在即将要连载的【精通Zookeeper系列】的文章中,会专门介绍学习Zookeeper需要学习的计算机理论知识,帮助小伙伴们迅速建立起对Zookeeper内部原理和机制的理解。

学习Zookeeper收获大不大?

在【精通Zookeeper系列】中,我们首先介绍Zookeeper的基础知识,然后就是高阶的原理和源代码剖析等内容。在介绍基础知识的时候,会结合代码多介绍些实战性的案例,也会介绍一些分布式系统的基本概念。高阶的内容主要介绍Zookeeper的内部原理和相关的源码实现。到时也会对Zookeeper和etcd进行简单的对比,以帮助小伙伴们更好的理解和掌握Zookeeper。

除了掌握Zookeeper本身的技术之外,冰河也要让你学到以下知识:

  • 如何设计一个本地数据节点
  • 分布式环境中节点之间如何通讯
  • 如何从0到1设计一个RPC子系统
  • 如何使用数据一致性协议保证数据的高可用
  • 如何在数据一致性和系统性能之间做取舍

以上这些设计理念基本每个分布式系统都会涉及到,掌握这些分布式系统能够让你更好的理解分布式系统的架构设计,也可以将这些设计理念应用到自己设计和研发的系统当中。

进大厂Zookeeper要学到什么程度?

对于进大厂Zookeeper要学到什么程度?这个问题小伙伴们应该还是比较关心的,简单点说,进大厂,你只是会简单的使用Zookeeper还不行,你要理解Zookeeper的工作原理和底层源码机制。在之前的文章中,我就说过:大厂对于技术的要求高,不是他们故意刁难人。而是大厂的用户量级很大,业务体量很高,如果你不深刻理解原理和源码机制,一旦线上生产环境由于高并发、大流量等场景出现一些偶然的系统问题,你可能就会半天定位不到问题,甚至会一脸懵逼的看着问题反复出现。所以,要想进大厂,就要深刻理解Zookeeper的原理和源码机制。

精通Zookeeper系列更新哪些内容?

说了这么多,那【精通Zookeeper系列】到底要更新哪些内容呢?

总体上说,这个专题要更新六大部分的内容,如下所示。

在【精通Zookeeper系列】中,我主要按照图示的分类进行介绍,当然,每一部分都可能会写一到多篇文章。

好了,今天就到这儿吧,我是冰河,大家有啥问题可以在下方留言,一起交流技术,一起进阶,一起进大厂~~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

大厂的 SDK 写法,偷学到了!

发表于 2021-09-06

自己动手写 SDK 的经验技巧分享

大家好,我是鱼皮。

最近因为工作需要,自己动手写了一些项目的通用 SDK。在编写的过程中,我阅读和参考了不少公司中其他大佬写的 SDK,也总结了一些开发 SDK 的经验和技巧,给大家分享下~

在此之前,必须先给大家解释一下啥是 SDK。

啥是 SDK ?

SDK(Software Development Kit)即 软件开发工具包 ,就是帮助我们开发出软件的工具集合,除了代码之外,一般还要搭配文档、示例等。

一般 SDK 都是需要 引入 到项目中使用的。比如学 Java 的朋友最早接触的 JDK,就是用来开发 Java 软件的工具包,使用时需要编写 类似 import java.util.* 的语法来引入。此外,大部分的 SDK,都是需要通过人工或项目管理工具,将其文件下载到指定路径才能引入。

引入 SDK

使用 SDK 有什么好处呢?

举个例子,假设公司有很多系统都需要实现文件上传功能。之前看过我文章的朋友应该知道,一个优秀的文件上传功能并不好做,要考虑很多点,比如分块、断点续传、秒传、文件存储、文件管理等。

文件上传设计:mp.weixin.qq.com/s/3QXe4MSOb…

显然,我们不需要给每个系统都去开发文件上传,而是只需要有一个团队舍身而出,编写一套 通用的 文件上传 SDK,然后让需要实现同样功能的系统引用就行了,这样就 大大减少了工作量、提高了开发效率 。

通用 SDK

有点前人造车,后人享乐的意思~

编写 SDK 又称 造轮子 ,好的轮子不仅能够帮助团队省时省力,还能够减少一些项目在相同功能上的差异。就不要说同一个功能,小王写的要运行 1 秒,小李写的要运行 1 小时!

而假设每个系统都去开发同样的功能,那就是 重复造轮子 ,在大多数情况下,不是明智之举。

理解了啥是 SDK 后,来看看如何写出优秀的 SDK 吧~

手写 SDK 经验总结

好的 SDK 应该具有简单易用、通俗易懂、便于扩展、高效稳定等特点。

易用性

如今,现成的轮子实在太多了!如何让你的轮子脱颖而出呢?那就要先提升 SDK 的易用性。

我自己在技术选型时,就会倾向于优先选择简单易用的 SDK,最好是几行代码就能轻松使用,而不是必须要我读完老长一份文档,再写个几十行代码才能生效。

就和产品说明书一样,太复杂直接把人劝退。

我们可以通过以下几点提高易用性:

统一调用

将复杂的功能进行封装,对外提供统一的调用入口,尽量屏蔽一些实现细节,减少用户调用的流程和对参数的理解成本。

举个例子,下面是两种日期处理函数。用户不需要关心他们是如何实现的,只需要知道怎么用、传递哪些参数、得到哪些返回值就行了。

1
2
3
4
5
6
7
java复制代码// 第 1 种:需要 new 对象
DateUtils dateUtils = new DateUtils();
dateUtils.setDate('2021-08-31');
Date date = dateUtils.parse();

// 第 2 种:直接调用
Date date = DateUtils.parse('2021-08-13');

那大家觉得哪种更易用呢?

集中配置

将复杂的参数配置化,不需要让用户到处写参数,而是通过一个配置文件统一管理。

Java 主流开发框架 SpringBoot 就是典型的例子,假如用户想改变内嵌服务器启动的端口、亦或是改变数据库的连接地址,不需要写代码,而是改一下配置文件就行了:

1
2
3
4
5
6
yaml复制代码# 服务器配置
server:
port: 8081
# 数据库配置
db:
ip: 10.0.0.1

此外,这样也便于维护项目和实现多环境。

良好命名

给 SDK 的函数取名称时,尽量让它符合用户的习惯。

比如具有解析功能的函数,可以叫 “parseXXX”;判断是否为空的函数,可以叫 “xx.isEmpty” 等。最好能做到让用户不看文档,只通过函数名称和参数,就知道你这个函数是做什么的。

好坏命名

因此,想要写出好的 SDK,首先要多用、多参考别人的 SDK,养成习惯后你就会发现,大家起名儿都差不多。

但也要注意一点,如果可以,尽量不要让你 SDK 中的类名(函数名)和别人的完全一致,否则可能给用户带来困扰:这么多同名的函数,我该用哪个呢?哪个是你开发的 SDK 呢?

可理解性

有时,用户可能不满足于简单地使用你的 SDK,而是希望阅读你的 SDK 源码来进一步了解,用的才更放心。

因此,除了易用外,还要让你的 SDK 便于理解,不能金玉其外败絮其中。

这个就和编码习惯有很大的关系了,无论是写 SDK 也好,还是做项目也罢,都要做到以下几点:

结构清晰

把代码按照功能或类别进行整理,放到指定的目录下。常见的做法有分包、分层等,让人一眼就知道每个目录下的文件的作用。

比如下面这个经典的 Java 项目结构,service 目录是编写业务逻辑的、constant 是存放常量的、utils 是存放工具类的等等,都很清晰:

Java MVC 项目结构

统一风格

统一风格的目标很简单:让项目看起来是同一个人写出来的。

比如代码缩进都用 4 个空格、命名都用驼峰式等。尤其是功能相似的代码,一定要保持命名和用法的统一!比如解析文本的函数,不要一会叫 “parseXXX”、一会儿又叫 “jiexiXXX”,给用户都整乐了~

但实际上,团队开发中,很难做到这点。因此才需要有一套通用的代码规范,大家都去遵守规范,才能让项目更好理解、更便于维护。

编写注释

最好给 SDK 中的每个类和函数的 开头 都加上注释,这样用户在使用 SDK 时,甚至都不需要看文档,直接看代码注释就能知道它是干嘛的、怎么用。

随便打开 Java SDK 或者网上知名 SDK 中的一个函数,一般都能看到这些注释,包括对函数功能的描述、参数含义、返回值含义等:

添加注释

说明文档

除了注释外,还要编写一个说明文档(用户手册),包括如何引入 SDK 、有哪些功能、应该怎么使用等等,甚至还可以补充一些关键的实现细节、以及常见的问题列表。

这点也会极大地影响用户的选择。就我个人而言,没有文档的 SDK 我一般是不会选用的,万一出了事我找谁呢?

可扩展性

编写 SDK 的一大难点是:不仅要考虑到大部分通用的使用场景,还要满足小部分用户定制化的需求。

因此,SDK 的可扩展性是很重要的,但怎么提升呢?

轻量依赖

一方面,我们可以尽量减少 SDK 本身对其他类库的依赖。

举个例子,假如你要做一个很轻小的工具类,可能只有几十 KB,那就没有必要再引入一个几百 KB 的依赖库了,得不偿失!别人也不乐意用啊!

轻量依赖不仅可以减少 SDK 的体积,更关键的是可以减少依赖冲突的可能性。我自己也曾经遇到过很多次这样的尴尬局面:引入一个工具类后,整个项目就跑不起来了!

自定义实现

为了提高 SDK 的通用性和灵活性,在设计 SDK 时,除了提供默认实现外,建议提供一个通用接口或抽象类,让用户来继承,编写自己的实现方式。

举个例子,假设我们要编写一个日期解析类,默认的解析规则是按照短横分割字符串:

1
2
java复制代码// 按照 '-' 切分
date = DateUtils.parse('2021-01-18')

如果只能做到这点,那这个 SDK 就很死板。因为用户可能想按照冒号或其他规则来解析。

怎么实现呢?

我们可以允许用户自己传入分割字符:

1
2
3
java复制代码// 按照 '-' 切分
parseRule = ':'
date = DateUtils.parse('2021-01-18', parseRule)

还可以让用户自己来控制解析的方式:

1
2
3
4
5
6
7
java复制代码// 自定义解析器
interface MyParser extends Parser {
// 需要用户自己实现
void doParse();
}
// 指定解析器
date = DateUtils.parse('2021-01-18', MyParser.class);

这两种方式在 SDK 的设计中屡见不鲜,此外还可以让用户自行编写或指定配置文件,也能提高灵活性。

高效稳定

其实,开发 SDK,尤其是在大厂开发 SDK,是个很 “坑” 的工作,我相信做过的朋友会感同身受。

因为随着使用你 SDK 的用户越来越多,可能会发现各种各样莫名其妙的问题;而且 SDK 作为相对底层的依赖,对使用方的影响也是无法估量的。所以,不想经常加班改 Bug 的话,就要保证你 SDK 的稳定性。

我们需要注意以下几点:

1. 测试

为了保证每个功能都是正常的,我们可以编写 单元测试(UT)来最大程度上地覆盖 SDK 的功能和代码。

尤其是每次改动代码后、发布新版本之前,都要再完整地执行一遍测试,不要盲目自信。

接口测试报告

此外,还可以通过 压力测试 来估算 SDK 的执行效率,比如每秒最多同时执行 3 次、每次要执行 500 毫秒等。建议将这些信息补充到说明文档中,给用户一些预期。当然也可以尝试通过压测来优化 SDK 的性能。

2. 兼容性

重要的函数和接口尽量减少改动,尤其是函数名、入参和返回值!

举个例子,SDK 0.1 版本时,函数的定义是这样的:

1
java复制代码boolean isValid(String str)

结果突然在 0.2 版本时改成了:

1
java复制代码String checkValid(StringBuilder str)

这样就会导致用户升级时一脸懵逼,怎么报了一堆找不到函数呢?

因此,对于比较大的改动,可以新写一个函数,并且给老函数打上类似 @Deprecated 的注释,表示已过时,引导用户去用新的。

此外,还可以在 版本号 上做做文章,小改动时只改变小版本号,比如 0.0.1 到 0.0.2;大改动时才改变大版本号,比如 1.0 到 2.0。这样可以给用户一个预期:这次改动很大,可能会存在不兼容。

3. 暴露异常

要让用户感知到 SDK 代码中可能抛出的异常,交给他们去进行相应的处理,防止出现一些意料之外的错误。

此外,SDK 要合理地打印日志,尤其是异常日志,在出了问题时要让调用者知道是出了什么问题,便于排查。


以上就是本期分享,建议学编程的同学多自己动手写 SDK,并且按照本文的总结去优化它,对提升编程能力真的很有帮助!

最近整理了我原创的 140 篇编程经验和技术文章,欢迎大家阅读,一起成长!⛽️

指路:t.1yb.co/ARnD

我是鱼皮,最后求个 点赞 ,这将是我持续创作的最大动力,谢谢 🙏

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

5W字高质量java并发系列详解教程(下)-附PDF下载 第

发表于 2021-09-06

本文结合上一篇文章5W字高质量java并发系列详解教程(上),继续进行高质量java并发系列的介绍。

本文PDF下载链接concurrent-all-in-one.pdf

本文的例子可以参考github.com/ddean2009/l…

第十四章 java中的daemon thread

java中有两种类型的thread,user threads 和 daemon threads。

User threads是高优先级的thread,JVM将会等待所有的User Threads运行完毕之后才会结束运行。

daemon threads是低优先级的thread,它的作用是为User Thread提供服务。 因为daemon threads的低优先级,并且仅为user thread提供服务,所以当所有的user thread都结束之后,JVM会自动退出,不管是否还有daemon threads在运行中。

因为这个特性,所以我们通常在daemon threads中处理无限循环的操作,因为这样不会影响user threads的运行。

daemon threads并不推荐使用在I/O操作中。

但是有些不当的操作也可能导致daemon threads阻塞JVM关闭,比如在daemon thread中调用join()方法。

我们看下怎么创建daemon thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class DaemonThread extends Thread{

public void run(){
while(true){
log.info("Thread A run");
try {
log.info("Thread A is daemon {}" ,Thread.currentThread().isDaemon());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


public static void main(String[] args) throws InterruptedException {
DaemonThread daemonThread = new DaemonThread();
daemonThread.setDaemon(true);
daemonThread.start();
}
}

创建 daemon thread很简单,只需要在创建之后,设置其daemon属性为true即可。

注意setDaemon(true)必须在thread start()之前执行,否则会抛出IllegalThreadStateException

上面的例子将会立刻退出。

如果我们将daemonThread.setDaemon(true);去掉,则因为user thread一直执行,JVM将会一直运行下去,不会退出。

这是在main中运行的情况,如果我们在一个@Test中运行,会发生什么现象呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public class DaemonThread extends Thread{

public void run(){
while(true){
log.info("Thread A run");
try {
log.info("Thread A is daemon {}" ,Thread.currentThread().isDaemon());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

@Test
public void testDaemon() throws InterruptedException {
DaemonThread daemonThread = new DaemonThread();
daemonThread.start();
}
}

我们将main方法改成了@Test执行。执行之后我们会发现,不管是不是daemon thread, Test都会立刻结束。

再看一个daemon线程中启动一个user thread的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public class DaemonBThread extends Thread{

Thread worker = new Thread(()->{
while(true){
log.info("Thread B run");
log.info("Thread B is daemon {}",Thread.currentThread().isDaemon());
}
});
public void run(){
log.info("Thread A run");
worker.start();
}

public static void main(String[] args) {
DaemonBThread daemonThread = new DaemonBThread();
daemonThread.setDaemon(true);
daemonThread.start();
}
}

这个例子中,daemonThread启动了一个user thread,运行之后我们会发现,即使有user thread正在运行,JVM也会立刻结束执行。

第十五章 java中ThreadPool的介绍和使用

Thread Pool简介

在Java中,threads是和系统的threads相对应的,用来处理一系列的系统资源。不管在windows和linux下面,能开启的线程个数都是有限的,如果你在java程序中无限制的创建thread,那么将会遇到无线程可创建的情况。

CPU的核数是有限的,如果同时有多个线程正在运行中,那么CPU将会根据线程的优先级进行轮循,给每个线程分配特定的CPU时间。所以线程也不是越多越好。

在java中,代表管理ThreadPool的接口有两个:ExecutorService和Executor。

我们运行线程的步骤一般是这样的:1. 创建一个ExecutorService。 2.将任务提交给ExecutorService。3.ExecutorService调度线程来运行任务。

画个图来表示:

threadPool.png

下面我讲一下,怎么在java中使用ThreadPool。

Executors, Executor 和 ExecutorService

Executors 提供了一系列简便的方法,来帮助我们创建ThreadPool。

Executor接口定义了一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public interface Executor {

/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}

ExecutorService继承了Executor,提供了更多的线程池的操作。是对Executor的补充。

根据接口实现分离的原则,我们通常在java代码中使用ExecutorService或者Executor,而不是具体的实现类。

我们看下怎么通过Executors来创建一个Executor和ExecutorService:

1
2
3
4
5
6
7
java复制代码        Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> log.info("in Executor"));


ExecutorService executorService= Executors.newCachedThreadPool();
executorService.submit(()->log.info("in ExecutorService"));
executorService.shutdown();

关于ExecutorService的细节,我们这里就多讲了,感兴趣的朋友可以参考之前我写的ExecutorService的详细文章。

ThreadPoolExecutor

ThreadPoolExecutor是ExecutorService接口的一个实现,它可以为线程池添加更加精细的配置,具体而言它可以控制这三个参数:corePoolSize, maximumPoolSize, 和 keepAliveTime。

PoolSize就是线程池里面的线程个数,corePoolSize表示的是线程池里面初始化和保持的最小的线程个数。

如果当前等待线程太多,可以设置maximumPoolSize来提供最大的线程池个数,从而线程池会创建更多的线程以供任务执行。

keepAliveTime是多余的线程未分配任务将会等待的时间。超出该时间,线程将会被线程池回收。

我们看下怎么创建一个ThreadPoolExecutor:

1
2
3
4
5
java复制代码        ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
threadPoolExecutor.submit(()->log.info("submit through threadPoolExecutor"));
threadPoolExecutor.shutdown();

上面的例子中我们通过ThreadPoolExecutor的构造函数来创建ThreadPoolExecutor。

通常来说Executors已经内置了ThreadPoolExecutor的很多实现,我们来看下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码ThreadPoolExecutor executor1 =
(ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor1.submit(() -> {
Thread.sleep(1000);
return null;
});
executor1.submit(() -> {
Thread.sleep(1000);
return null;
});
executor1.submit(() -> {
Thread.sleep(1000);
return null;
});
log.info("executor1 poolsize {}",executor1.getPoolSize());
log.info("executor1 queuesize {}", executor1.getQueue().size());
executor1.shutdown();

上的例子中我们Executors.newFixedThreadPool(2)来创建一个ThreadPoolExecutor。

上面的例子中我们提交了3个task。但是我们pool size只有2。所以还有一个1个不能立刻被执行,需要在queue中等待。

我们再看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码ThreadPoolExecutor executor2 =
(ThreadPoolExecutor) Executors.newCachedThreadPool();
executor2.submit(() -> {
Thread.sleep(1000);
return null;
});
executor2.submit(() -> {
Thread.sleep(1000);
return null;
});
executor2.submit(() -> {
Thread.sleep(1000);
return null;
});

log.info("executor2 poolsize {}", executor2.getPoolSize());
log.info("executor2 queue size {}", executor2.getQueue().size());
executor2.shutdown();

上面的例子中我们使用Executors.newCachedThreadPool()来创建一个ThreadPoolExecutor。 运行之后我们可以看到poolsize是3,而queue size是0。这表明newCachedThreadPool会自动增加pool size。

如果thread在60秒钟之类没有被激活,则会被收回。

这里的Queue是一个SynchronousQueue,因为插入和取出基本上是同时进行的,所以这里的queue size基本都是0.

ScheduledThreadPoolExecutor

还有个很常用的ScheduledThreadPoolExecutor,它继承自ThreadPoolExecutor, 并且实现了ScheduledExecutorService接口。

1
2
3
java复制代码public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService

我们看下怎么使用:

1
2
3
4
java复制代码ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
log.info("Hello World");
}, 500, TimeUnit.MILLISECONDS);

上面的例子中,我们定义了一个定时任务将会在500毫秒之后执行。

之前我们也讲到了ScheduledExecutorService还有两个非常常用的方法:

  • scheduleAtFixedRate - 以开始时间为间隔。
  • scheduleWithFixedDelay - 以结束时间为间隔。
1
2
3
4
5
6
7
8
9
10
11
java复制代码
CountDownLatch lock = new CountDownLatch(3);

ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor2.scheduleAtFixedRate(() -> {
log.info("in ScheduledFuture");
lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);

lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);

ForkJoinPool

ForkJoinPool是在java 7 中引入的新框架,我们将会在后面的文章中详细讲解。 这里做个简单的介绍。

ForkJoinPool主要用来生成大量的任务来做算法运算。如果用线程来做的话,会消耗大量的线程。但是在fork/join框架中就不会出现这个问题。

在fork/join中,任何task都可以生成大量的子task,然后通过使用join()等待子task结束。

这里我们举一个例子:

1
2
3
4
5
6
7
8
9
10
11
java复制代码static class TreeNode {

int value;

Set<TreeNode> children;

TreeNode(int value, TreeNode... children) {
this.value = value;
this.children = Sets.newHashSet(children);
}
}

定义一个TreeNode,然后遍历所有的value,将其加起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public  class CountingTask extends RecursiveTask<Integer> {

private final TreeNode node;

public CountingTask(TreeNode node) {
this.node = node;
}

@Override
protected Integer compute() {
return node.value + node.children.stream()
.map(childNode -> new CountingTask(childNode).fork()).mapToInt(ForkJoinTask::join).sum();
}
}

下面是调用的代码:

1
2
3
4
5
6
7
8
java复制代码    public static void main(String[] args) {
TreeNode tree = new TreeNode(5,
new TreeNode(3), new TreeNode(2,
new TreeNode(2), new TreeNode(8)));

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));
}

第十六章 java 中的fork join框架

fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力。

fork join主要有两个步骤,第一就是fork,将一个大任务分成很多个小任务,第二就是join,将第一个任务的结果join起来,生成最后的结果。如果第一步中并没有任何返回值,join将会等到所有的小任务都结束。

还记得之前的文章我们讲到了thread pool的基本结构吗?

  1. ExecutorService - ForkJoinPool 用来调用任务执行。
  2. workerThread - ForkJoinWorkerThread 工作线程,用来执行具体的任务。
  3. task - ForkJoinTask 用来定义要执行的任务。

下面我们从这三个方面来详细讲解fork join框架。

ForkJoinPool

ForkJoinPool是一个ExecutorService的一个实现,它提供了对工作线程和线程池的一些便利管理方法。

1
java复制代码public class ForkJoinPool extends AbstractExecutorService

一个work thread一次只能处理一个任务,但是ForkJoinPool并不会为每个任务都创建一个单独的线程,它会使用一个特殊的数据结构double-ended queue来存储任务。这样的结构可以方便的进行工作窃取(work-stealing)。

什么是work-stealing呢?

默认情况下,work thread从分配给自己的那个队列头中取出任务。如果这个队列是空的,那么这个work thread会从其他的任务队列尾部取出任务来执行,或者从全局队列中取出。这样的设计可以充分利用work thread的性能,提升并发能力。

下面看下怎么创建一个ForkJoinPool。

最常见的方法就是使用ForkJoinPool.commonPool()来创建,commonPool()为所有的ForkJoinTask提供了一个公共默认的线程池。

1
java复制代码ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

另外一种方式是使用构造函数:

1
java复制代码ForkJoinPool forkJoinPool = new ForkJoinPool(2);

这里的参数是并行级别,2指的是线程池将会使用2个处理器核心。

ForkJoinWorkerThread

ForkJoinWorkerThread是使用在ForkJoinPool的工作线程。

1
2
java复制代码public class ForkJoinWorkerThread extends Thread
}

和一般的线程不一样的是它定义了两个变量:

1
2
java复制代码    final ForkJoinPool pool;                // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

一个是该worker thread所属的ForkJoinPool。 另外一个是支持 work-stealing机制的Queue。

再看一下它的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码   public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}

简单点讲就是从Queue中取出任务执行。

ForkJoinTask

ForkJoinTask是ForkJoinPool中运行的任务类型。通常我们会用到它的两个子类:RecursiveAction和RecursiveTask。

他们都定义了一个需要实现的compute()方法用来实现具体的业务逻辑。不同的是RecursiveAction只是用来执行任务,而RecursiveTask可以有返回值。

既然两个类都带了Recursive,那么具体的实现逻辑也会跟递归有关,我们举个使用RecursiveAction来打印字符串的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码public class CustomRecursiveAction extends RecursiveAction {

private String workload = "";
private static final int THRESHOLD = 4;

private static Logger logger =
Logger.getAnonymousLogger();

public CustomRecursiveAction(String workload) {
this.workload = workload;
}

@Override
protected void compute() {
if (workload.length() > THRESHOLD) {
ForkJoinTask.invokeAll(createSubtasks());
} else {
processing(workload);
}
}

private List<CustomRecursiveAction> createSubtasks() {
List<CustomRecursiveAction> subtasks = new ArrayList<>();

String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());

subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));

return subtasks;
}

private void processing(String work) {
String result = work.toUpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
}
}

上面的例子使用了二分法来打印字符串。

我们再看一个RecursiveTask的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码public class CustomRecursiveTask extends RecursiveTask<Integer> {
private int[] arr;

private static final int THRESHOLD = 20;

public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}

@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
} else {
return processing(arr);
}
}

private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
}

private Integer processing(int[] arr) {
return Arrays.stream(arr)
.filter(a -> a > 10 && a < 27)
.map(a -> a * 10)
.sum();
}
}

和上面的例子很像,不过这里我们需要有返回值。

在ForkJoinPool中提交Task

有了上面的两个任务,我们就可以在ForkJoinPool中提交了:

1
2
3
4
5
java复制代码int[] intArray= {12,12,13,14,15};
CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);

int result = forkJoinPool.invoke(customRecursiveTask);
System.out.println(result);

上面的例子中,我们使用invoke来提交,invoke将会等待任务的执行结果。

如果不使用invoke,我们也可以将其替换成fork()和join():

1
2
3
java复制代码customRecursiveTask.fork();
int result2= customRecursiveTask.join();
System.out.println(result2);

fork() 是将任务提交给pool,但是并不触发执行, join()将会真正的执行并且得到返回结果。

第十七章 java并发中CountDownLatch的使用

在java并发中,控制共享变量的访问非常重要,有时候我们也想控制并发线程的执行顺序,比如:等待所有线程都执行完毕之后再执行另外的线程,或者等所有线程都准备好了才开始所有线程的执行等。

这个时候我们就可以使用到CountDownLatch。

简单点讲,CountDownLatch存有一个放在QueuedSynchronizer中的计数器。当调用countdown() 方法时,该计数器将会减一。然后再调用await()来等待计数器归零。

1
2
3
4
5
6
7
8
9
10
java复制代码
private static final class Sync extends AbstractQueuedSynchronizer {
...
}

private final Sync sync;

public void countDown() {
sync.releaseShared(1);
}
1
2
3
4
5
6
7
8
java复制代码    public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

下面我们举两个使用的例子:

主线程等待子线程全都结束之后再开始运行

这里我们定义子线程类,在子线程类里面,我们传入一个CountDownLatch用来计数,然后在子线程结束之前,调用该CountDownLatch的countDown方法。最后在主线程中调用await()方法来等待子线程结束执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Slf4j
public class MainThreadWaitUsage implements Runnable {

private List<String> outputScraper;
private CountDownLatch countDownLatch;

public MainThreadWaitUsage(List<String> outputScraper, CountDownLatch countDownLatch) {
this.outputScraper = outputScraper;
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
outputScraper.add("Counted down");
countDownLatch.countDown();
}
}

看下怎么调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码    @Test
public void testCountDownLatch()
throws InterruptedException {

List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new MainThreadWaitUsage(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
countDownLatch.await();
outputScraper.add("Latch released");
log.info(outputScraper.toString());

}

执行结果如下:

1
txt复制代码07:37:27.388 [main] INFO MainThreadWaitUsageTest - [Counted down, Counted down, Counted down, Counted down, Counted down, Latch released]

等待所有线程都准备好再一起执行

上面的例子中,我们是主线程等待子线程,那么在这个例子中,我们将会看看怎么子线程一起等待到准备好的状态,再一起执行。

思路也很简单,在子线程开始之后,将等待的子线程计数器减一,在主线程中await该计数器,等计数器归零之后,主线程再通知子线程运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class ThreadWaitThreadUsage implements Runnable {

private List<String> outputScraper;
private CountDownLatch readyThreadCounter;
private CountDownLatch callingThreadBlocker;
private CountDownLatch completedThreadCounter;

public ThreadWaitThreadUsage(
List<String> outputScraper,
CountDownLatch readyThreadCounter,
CountDownLatch callingThreadBlocker,
CountDownLatch completedThreadCounter) {

this.outputScraper = outputScraper;
this.readyThreadCounter = readyThreadCounter;
this.callingThreadBlocker = callingThreadBlocker;
this.completedThreadCounter = completedThreadCounter;
}

@Override
public void run() {
readyThreadCounter.countDown();
try {
callingThreadBlocker.await();
outputScraper.add("Counted down");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
completedThreadCounter.countDown();
}
}
}

看下怎么调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码    @Test
public void testCountDownLatch()
throws InterruptedException {

List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch readyThreadCounter = new CountDownLatch(5);
CountDownLatch callingThreadBlocker = new CountDownLatch(1);
CountDownLatch completedThreadCounter = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new ThreadWaitThreadUsage(
outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
readyThreadCounter.await();
outputScraper.add("Workers ready");
callingThreadBlocker.countDown();
completedThreadCounter.await();
outputScraper.add("Workers complete");

log.info(outputScraper.toString());

}

输出结果如下:

1
txt复制代码07:41:47.861 [main] INFO ThreadWaitThreadUsageTest - [Workers ready, Counted down, Counted down, Counted down, Counted down, Counted down, Workers complete]

停止CountdownLatch的await

如果我们调用await()方法,该方法将会等待一直到count=0才结束。但是如果在线程执行过程中出现了异常,可能导致countdown方法执行不了。那么await()方法可能会出现无限等待的情况。

这个时候我们可以使用:

1
2
3
4
java复制代码    public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

第十八章 java中CyclicBarrier的使用

CyclicBarrier是java 5中引入的线程安全的组件。它有一个barrier的概念,主要用来等待所有的线程都执行完毕,然后再去执行特定的操作。

假如我们有很多个线程,每个线程都计算出了一些数据,然后我们需要等待所有的线程都执行完毕,再把各个线程计算出来的数据加起来,的到最终的结果,那么我们就可以使用CyclicBarrier。

CyclicBarrier的方法

我们先看下CyclicBarrier的构造函数:

1
2
3
4
5
6
7
8
9
10
java复制代码    public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
this(parties, null);
}

CyclicBarrier有两个构造函数,第一个只接受一个参数,表示需要统一行动的线程个数。第二个参数叫做barrierAction,表示出发barrier是需要执行的方法。

其中barrierAction是一个Runnable,我们可以在其中定义最后需要执行的工作。

再看下重要await方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码    public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

await也有两个方法,一个是带时间参数的,一个是不带时间参数的。

await本质上调用了lock.newCondition().await()方法。

因为有多个parties,下面我们考虑两种情况。

  1. 该线程不是最后一个调用await的线程

在这种情况下,该线程将会进入等待状态,直到下面的情况发送:

  • 最后一个线程调用await()
  • 其他线程中断了当前线程
  • 其他线程中断了其他正在等待的线程
  • 其他线程在等待barrier的时候超时
  • 其他线程在该barrier上调用的reset()方法

如果该线程在调用await()的时候已经设置了interrupted的状态,或者在等待的时候被interrupted,那么将会抛出InterruptedException异常,并清除中断状态。(这里和Thread的interrupt()方法保持一致)

如果任何线程正在等待状态中,这时候barrier被重置。或者在线程调用await方法或者正在等待中,barrier被broken,那么将会抛出BrokenBarrierException。

如果任何线程在等待的时候被中断,那么所有其他等待的线程将会抛出BrokenBarrierException,barrier将会被置为broken状态。

  1. 如果该线程是最后一个调用await方法的

在这种情况,如果barrierAction不为空,那么该线程将会在其他线程继续执行前调用这个barrierAction。

如果该操作抛出异常,那么barrier的状态将会被置为broken状态。

再看看这个reset() 方法:

1
2
3
4
5
6
7
8
9
10
java复制代码    public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

该方法将会将barrier置为broken状态,并且开启一个新的generation,来进行下一轮的操作。

CyclicBarrier的使用

我们在子线程中生成一个随机的整数队列,当所有的线程都生成完毕之后,我们再将生成的整数全都加起来。看下怎么实现。

定义生成整数队列的子线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码public class CyclicBarrierUsage implements Runnable {

private CyclicBarrier cyclicBarrier;
private List<List<Integer>> partialResults;
private Random random = new Random();

public CyclicBarrierUsage(CyclicBarrier cyclicBarrier,List<List<Integer>> partialResults){
this.cyclicBarrier=cyclicBarrier;
this.partialResults=partialResults;
}

@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
List<Integer> partialResult = new ArrayList<>();

// Crunch some numbers and store the partial result
for (int i = 0; i < 10; i++) {
Integer num = random.nextInt(10);
System.out.println(thisThreadName
+ ": Crunching some numbers! Final result - " + num);
partialResult.add(num);
}

partialResults.add(partialResult);
try {
System.out.println(thisThreadName
+ " waiting for others to reach barrier.");
cyclicBarrier.await();
} catch (InterruptedException e) {
// ...
} catch (BrokenBarrierException e) {
// ...
}
}

}

上面的子线程接收外部传入的cyclicBarrier和保存数据的partialResults,并在运行完毕调用cyclicBarrier.await()来等待其他线程执行完毕。

看下CyclicBarrier的构建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码CyclicBarrier cyclicBarrier=new CyclicBarrier(5,()->{
String thisThreadName = Thread.currentThread().getName();

System.out.println(
thisThreadName + ": Computing sum of 5 workers, having 10 results each.");
int sum = 0;

for (List<Integer> threadResult : partialResults) {
System.out.print("Adding ");
for (Integer partialResult : threadResult) {
System.out.print(partialResult+" ");
sum += partialResult;
}
System.out.println();
}
System.out.println(thisThreadName + ": Final result = " + sum);
});

在CyclicBarrier中,我们定义了一个BarrierAction来做最后数据的汇总处理。

运行:

1
2
3
4
5
java复制代码        for (int i = 0; i < 5; i++) {
Thread worker = new Thread(new CyclicBarrierUsage(cyclicBarrier,partialResults));
worker.setName("Thread " + i);
worker.start();
}

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
txt复制代码Spawning 5 worker threads to compute 10 partial results each
Thread 0: Crunching some numbers! Final result - 5
Thread 0: Crunching some numbers! Final result - 3
Thread 1: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 7
Thread 1: Crunching some numbers! Final result - 8
Thread 0: Crunching some numbers! Final result - 4
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 9
Thread 1: Crunching some numbers! Final result - 3
Thread 2: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 0
Thread 2: Crunching some numbers! Final result - 9
Thread 1: Crunching some numbers! Final result - 3
Thread 2: Crunching some numbers! Final result - 7
Thread 0: Crunching some numbers! Final result - 2
Thread 2: Crunching some numbers! Final result - 6
Thread 1: Crunching some numbers! Final result - 6
Thread 2: Crunching some numbers! Final result - 5
Thread 0: Crunching some numbers! Final result - 0
Thread 2: Crunching some numbers! Final result - 1
Thread 1: Crunching some numbers! Final result - 5
Thread 2: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 7
Thread 2: Crunching some numbers! Final result - 8
Thread 1: Crunching some numbers! Final result - 2
Thread 2: Crunching some numbers! Final result - 4
Thread 0 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 7
Thread 1: Crunching some numbers! Final result - 6
Thread 1: Crunching some numbers! Final result - 9
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 9
Thread 3: Crunching some numbers! Final result - 3
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 1
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 0
Thread 3: Crunching some numbers! Final result - 5
Thread 3: Crunching some numbers! Final result - 9
Thread 3: Crunching some numbers! Final result - 1
Thread 3 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 2
Thread 4: Crunching some numbers! Final result - 2
Thread 4: Crunching some numbers! Final result - 5
Thread 4: Crunching some numbers! Final result - 5
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 7
Thread 4: Crunching some numbers! Final result - 4
Thread 4: Crunching some numbers! Final result - 8
Thread 4: Crunching some numbers! Final result - 4
Thread 4: Crunching some numbers! Final result - 3
Thread 4 waiting for others to reach barrier.
Thread 4: Computing sum of 5 workers, having 10 results each.
Adding 5 3 7 4 6 9 0 2 0 7
Adding 1 9 7 6 5 1 1 8 4 0
Adding 1 8 3 3 6 5 2 7 6 9
Adding 9 3 8 8 1 8 0 5 9 1
Adding 2 2 5 5 3 7 4 8 4 3
Thread 4: Final result = 230

Process finished with exit code 0

第十九章 在java中使用JMH(Java Microbenchmark Harness)做性能测试

JMH的全称是Java Microbenchmark Harness,是一个open JDK中用来做性能测试的套件。该套件已经被包含在了JDK 12中。

本文将会讲解如何使用JMH来在java中做性能测试。

如果你使用的不是JDK 12,那么需要添加如下依赖:

1
2
3
4
5
6
7
8
9
10
xml复制代码<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.19</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.19</version>
</dependency>

使用JMH做性能测试

如果我们想测试某个方法的性能,一般来说就是重复执行某个方法n次,求出总的执行时间,然后求平均值。

但是这样通常会有一些问题,比如程序的头几次执行通常会比较慢,因为JVM会对多次执行的代码进行优化。另外得出的统计结果也不够直观,需要我们自行解析。

如果使用JMH可以轻松解决这些问题。

在JMH中,将要测试的方法添加@Benchmark注解即可:

1
2
3
4
java复制代码    @Benchmark
public void measureThroughput() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(100);
}

看下怎么调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码    public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(BenchMarkUsage.class.getSimpleName())
// .include(BenchMarkUsage.class.getSimpleName()+".*measureThroughput*")
// 预热3轮
.warmupIterations(3)
// 度量5轮
.measurementIterations(5)
.forks(1)
.build();

new Runner(opt).run();
}

上面的例子,我们通过OptionsBuilder的include方法添加了需要进行测试的类。

默认情况下,该类的所有@Benchmark方法都将会被测试,如果我们只想测试其中的某个方法,我们可以在类后面加上方法的名字:

1
java复制代码.include(BenchMarkUsage.class.getSimpleName()+".*measureAll*")

上面的代码支持通配符。

warmupIterations(3)意思是在真正的执行前,先热身三次。

measurementIterations(5)表示我们将方法运行5次来测试性能。

forks(1)表示启动一个进程来执行这个任务。

上面是最基本的运行,我们看下运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
txt复制代码# JMH version: 1.19
# VM version: JDK 1.8.0_171, VM 25.171-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/jre/bin/java
# VM options: -javaagent:/Applications/IntelliJ IDEA 2.app/Contents/lib/idea_rt.jar=55941:/Applications/IntelliJ IDEA 2.app/Contents/bin -Dfile.encoding=UTF-8
# Warmup: 3 iterations, 1 s each
# Measurement: 5 iterations, 1 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: com.flydean.BenchMarkUsage.measureThroughput

# Run progress: 26.66% complete, ETA 00:01:42
# Fork: 1 of 1
# Warmup Iteration 1: 9.727 ops/s
# Warmup Iteration 2: 9.684 ops/s
# Warmup Iteration 3: 9.678 ops/s
Iteration 1: 9.652 ops/s
Iteration 2: 9.678 ops/s
Iteration 3: 9.733 ops/s
Iteration 4: 9.651 ops/s
Iteration 5: 9.678 ops/s


Result "com.flydean.BenchMarkUsage.measureThroughput":
9.678 ±(99.9%) 0.129 ops/s [Average]
(min, avg, max) = (9.651, 9.678, 9.733), stdev = 0.034
CI (99.9%): [9.549, 9.808] (assumes normal distribution)

ops/s 是每秒的OPS次数。程序会给出运行的最小值,平均值和最大值。同时给出标准差stdev和置信区间CI。

BenchmarkMode

上面的例子中, 我们只用了最简单的@Benchmark。如果想实现更加复杂和自定义的BenchMark,我们可以使用@BenchmarkMode。

先举个例子:

1
2
3
4
5
6
java复制代码    @Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void measureThroughput() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(100);
}

上面的例子中,我们指定了@BenchmarkMode(Mode.Throughput),Throughput的意思是整体吞吐量,表示给定的时间内执行的次数。

这里我们通过 @OutputTimeUnit(TimeUnit.SECONDS)来指定时间单位。

Mode除了Throughput还有如下几种模式:

  • AverageTime - 调用的平均时间
  • SampleTime - 随机取样,最后输出取样结果的分布
  • SingleShotTime - 只会执行一次,通常用来测试冷启动时候的性能。
  • All - 所有的benchmark modes。

Fork和Warmup

上面的例子中我们通过代码来显式的制定Fork和Warmup,我们也可以使用注解来实现:

1
2
java复制代码    @Fork(value = 1, warmups = 2)
@Warmup(iterations = 5)

上面的例子中value表示该benchMark执行多少次,warmups表示fork多少个进程来执行。iterations表示warmup的iterations个数。

如果你同时在代码中和注解中都配置了相关的信息,那么注解将会覆盖掉代码中的显示配置。

State和Scope

如果我们在多线程环境中使用beachMark,那么多线程中用到的类变量是共享还是每个线程一个呢?

这个时候我们就要用到@State注解。

1
2
3
java复制代码@State(Scope.Benchmark)
public class StateUsage {
}

Scope有三种:

  • Scope.Thread:默认的State,每个测试线程分配一个实例;
  • Scope.Benchmark:所有测试线程共享一个实例,用于测试有状态实例在多线程共享下的性能;
  • Scope.Group:每个线程组共享一个实例;

第二十章 java中ThreadLocalRandom的使用

在java中我们通常会需要使用到java.util.Random来便利的生产随机数。但是Random是线程安全的,如果要在线程环境中的话就有可能产生性能瓶颈。

我们以Random中常用的nextInt方法为例来具体看一下:

1
2
3
java复制代码    public int nextInt() {
return next(32);
}

nextInt方法实际上调用了下面的方法:

1
2
3
4
5
6
7
8
9
java复制代码    protected int next(int bits) {
long oldseed, nextseed;
AtomicLong seed = this.seed;
do {
oldseed = seed.get();
nextseed = (oldseed * multiplier + addend) & mask;
} while (!seed.compareAndSet(oldseed, nextseed));
return (int)(nextseed >>> (48 - bits));
}

从代码中我们可以看到,方法内部使用了AtomicLong,并调用了它的compareAndSet方法来保证线程安全性。所以这个是一个线程安全的方法。

其实在多个线程环境中,Random根本就需要共享实例,那么该怎么处理呢?

在JDK 7 中引入了一个ThreadLocalRandom的类。ThreadLocal大家都知道就是线程的本地变量,而ThreadLocalRandom就是线程本地的Random。

我们看下怎么调用:

1
java复制代码ThreadLocalRandom.current().nextInt();

我们来为这两个类分别写一个benchMark测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class RandomUsage {

public void testRandom() throws InterruptedException {
ExecutorService executorService=Executors.newFixedThreadPool(2);
Random random = new Random();
List<Callable<Integer>> callables = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
callables.add(() -> {
return random.nextInt();
});
}
executorService.invokeAll(callables);
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(RandomUsage.class.getSimpleName())
// 预热5轮
.warmupIterations(5)
// 度量10轮
.measurementIterations(10)
.forks(1)
.build();

new Runner(opt).run();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public class ThreadLocalRandomUsage {

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void testThreadLocalRandom() throws InterruptedException {
ExecutorService executorService=Executors.newFixedThreadPool(2);
List<Callable<Integer>> callables = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
callables.add(() -> {
return ThreadLocalRandom.current().nextInt();
});
}
executorService.invokeAll(callables);
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(ThreadLocalRandomUsage.class.getSimpleName())
// 预热5轮
.warmupIterations(5)
// 度量10轮
.measurementIterations(10)
.forks(1)
.build();

new Runner(opt).run();
}
}

分析运行结果,我们可以看出ThreadLocalRandom在多线程环境中会比Random要快。

第二十一章 java中FutureTask的使用

FutureTask简介

FutureTask是java 5引入的一个类,从名字可以看出来FutureTask既是一个Future,又是一个Task。

我们看下FutureTask的定义:

1
2
3
java复制代码public class FutureTask<V> implements RunnableFuture<V> {
...
}
1
2
3
4
5
6
7
java复制代码public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

FutureTask实现了RunnableFuture接口,RunnableFuture接口是Runnable和Future的综合体。

作为一个Future,FutureTask可以执行异步计算,可以查看异步程序是否执行完毕,并且可以开始和取消程序,并取得程序最终的执行结果。

除此之外,FutureTask还提供了一个runAndReset()的方法, 该方法可以运行task并且重置Future的状态。

Callable和Runnable的转换

我们知道Callable是有返回值的,而Runnable是没有返回值的。
Executors提供了很多有用的方法,将Runnable转换为Callable:

1
2
3
4
5
java复制代码    public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

FutureTask内部包含一个Callable,并且可以接受Callable和Runnable作为构造函数:

1
2
3
4
5
6
java复制代码    public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
1
2
3
4
java复制代码    public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

它的内部就是调用了Executors.callable(runnable, result);方法进行转换的。

以Runnable运行

既然是一个Runnable,那么FutureTask就可以以线程的方式执行,我们来看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Test
public void convertRunnableToCallable() throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.info("inside callable future task ...");
return 0;
}
});

Thread thread= new Thread(futureTask);
thread.start();
log.info(futureTask.get().toString());
}

上面例子是以单个线程来执行的,同样我们也可以将FutureTask提交给线程池来执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码    @Test
public void workWithExecutorService() throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.info("inside futureTask");
return 1;
}
});
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(futureTask);
executor.shutdown();
log.info(futureTask.get().toString());
}

第二十二章 java中CompletableFuture的使用

之前的文章中,我们讲解了Future, 本文我们将会继续讲解java 8中引入的CompletableFuture的用法。

CompletableFuture首先是一个Future,它拥有Future所有的功能,包括获取异步执行结果,取消正在执行的任务等。

除此之外,CompletableFuture还是一个CompletionStage。

我们看下CompletableFuture的定义:

1
java复制代码public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

什么是CompletionStage呢?

在异步程序中,如果将每次的异步执行都看成是一个stage的话,我们通常很难控制异步程序的执行顺序,在javascript中,我们需要在回调中执行回调。这就会形成传说中的回调地狱。

好在在ES6中引入了promise的概念,可以将回调中的回调转写为链式调用,从而大大的提升了程序的可读性和可写性。

同样的在java中,我们使用CompletionStage来实现异步调用的链式操作。

CompletionStage定义了一系列的then*** 操作来实现这一功能。

CompletableFuture作为Future使用

调用CompletableFuture.complete方法可以立马返回结果,我们看下怎么使用这个方法来构建一个基本的Future:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码    public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture
= new CompletableFuture<>();

Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});

return completableFuture;
}

上面我们通过调动ExecutorService来提交一个任务从而得到一个Future。如果你知道执行的结果,那么可以使用CompletableFuture的completedFuture方法来直接返回一个Future。

1
2
3
4
5
java复制代码    public Future<String> useCompletableFuture(){
Future<String> completableFuture =
CompletableFuture.completedFuture("Hello");
return completableFuture;
}

CompletableFuture还提供了一个cancel方法来立马取消任务的执行:

1
2
3
4
5
6
7
8
9
10
java复制代码    public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();

Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});
return completableFuture;
}

如果这个时候调用Future的get方法,将会报CancellationException异常。

1
2
java复制代码Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException

异步执行code

CompletableFuture提供了runAsync和supplyAsync的方法,可以以异步的方式执行代码。

我们看一个runAsync的基本应用,接收一个Runnable参数:

1
2
3
4
5
java复制代码    public  void runAsync(){
CompletableFuture<Void> runAsync= CompletableFuture.runAsync(()->{
log.info("runAsync");
});
}

而supplyAsync接受一个Supplier:

1
2
3
4
5
java复制代码    public void supplyAsync(){
CompletableFuture<String> supplyAsync=CompletableFuture.supplyAsync(()->{
return "supplyAsync";
});
}

他们两个的区别是一个没有返回值,一个有返回值。

组合Futures

上面讲到CompletableFuture的一个重大作用就是将回调改为链式调用,从而将Futures组合起来。

而链式调用的返回值还是CompletableFuture,我们看一个thenCompose的例子:

1
2
3
java复制代码CompletableFuture<String> completableFuture 
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

thenCompose将前一个Future的返回结果作为后一个操作的输入。

如果我们想合并两个CompletableFuture的结果,则可以使用thenCombine:

1
2
3
4
5
6
java复制代码    public void thenCombine(){
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2));
}

如果你不想返回结果,则可以使用thenAcceptBoth:

1
2
3
4
5
java复制代码    public void thenAcceptBoth(){
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));
}

thenApply() 和 thenCompose()的区别

thenApply()和thenCompose()两个方法都可以将CompletableFuture连接起来,但是两个有点不一样。

thenApply()接收的是前一个调用返回的结果,然后对该结果进行处理。

thenCompose()接收的是前一个调用的stage,返回flat之后的的CompletableFuture。

简单点比较,两者就像是map和flatMap的区别。

并行执行任务

当我们需要并行执行任务时,通常我们需要等待所有的任务都执行完毕再去处理其他的任务,那么我们可以用到CompletableFuture.allOf方法:

1
2
3
4
5
6
7
8
9
10
11
java复制代码    public void allOf(){
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
}

allOf只保证task全都执行,而并没有返回值,如果希望带有返回值,我们可以使用join:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码    public void join(){
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");

String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
}

上面的程序将会返回:“Hello Beautiful World”。

异常处理

如果在链式调用的时候抛出异常,则可以在最后使用handle来接收:

1
2
3
4
5
6
7
8
9
10
11
java复制代码    public void handleError(){
String name = null;

CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
}

这和Promise中的catch方法使用类似。

第二十三章 java中使用Semaphore构建阻塞对象池

Semaphore是java 5中引入的概念,叫做计数信号量。主要用来控制同时访问某个特定资源的访问数量或者执行某个操作的数量。

Semaphore中定义了一组虚拟的permits,通过获取和释放这些permits,Semaphore可以控制资源的个数。

Semaphore的这个特性可以用来构造资源池,比如数据库连接池等。

Semaphore有两个构造函数:

1
2
3
java复制代码    public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
1
2
3
java复制代码    public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

permits定义了许可资源的个数,而fair则表示是否支持FIFO的顺序。

两个比较常用的方法就是acquire和release了。

1
2
3
java复制代码    public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
1
2
3
java复制代码    public void release() {
sync.releaseShared(1);
}

其中acquire用来获取资源,release用来释放资源。

有了这两个特性, 我们看一下怎么使用Semaphore来定义一个一个有界容器。

我们可以将Semaphore初始化为容器池大小,并且在容器池获取资源时调用acquire,将资源返回给容器池之后再调用release。

我们看下面的一个实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class SemaphoreUsage<T> {

private final Set<T> set;
private final Semaphore sem;

public SemaphoreUsage(int bound){
this.set = Collections.synchronizedSet(new HashSet<T>());
sem= new Semaphore(bound);
}

public boolean add (T o) throws InterruptedException{
sem.acquire();
boolean wasAdded = false;
try{
wasAdded=set.add(o);
return wasAdded;
}finally {
if(!wasAdded){
sem.release();
}
}
}

public boolean remove(Object o){
boolean wasRemoved = set.remove(o);
if(wasRemoved){
sem.release();
}
return wasRemoved;
}

}

上面的例子我们定义了一个有界的synchronizedSet。 要注意一点是在add方法中,只有add成功之后才会调用release方法。

第二十四章 在java中构建高效的结果缓存

缓存是现代应用服务器中非常常用的组件。除了第三方缓存以外,我们通常也需要在java中构建内部使用的缓存。那么怎么才能构建一个高效的缓存呢? 本文将会一步步的进行揭秘。

使用HashMap

缓存通常的用法就是构建一个内存中使用的Map,在做一个长时间的操作比如计算之前,先在Map中查询一下计算的结果是否存在,如果不存在的话再执行计算操作。

我们定义了一个代表计算的接口:

1
2
3
4
java复制代码public interface Calculator<A, V> {

V calculate(A arg) throws InterruptedException;
}

该接口定义了一个calculate方法,接收一个参数,并且返回计算的结果。

我们要定义的缓存就是这个Calculator具体实现的一个封装。

我们看下用HashMap怎么实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class MemoizedCalculator1<A, V> implements Calculator<A, V> {

private final Map<A, V> cache= new HashMap<A, V>();
private final Calculator<A, V> calculator;

public MemoizedCalculator1(Calculator<A, V> calculator){
this.calculator=calculator;
}
@Override
public synchronized V calculate(A arg) throws InterruptedException {
V result= cache.get(arg);
if( result ==null ){
result= calculator.calculate(arg);
cache.put(arg, result);
}
return result;
}
}

MemoizedCalculator1封装了Calculator,在调用calculate方法中,实际上调用了封装的Calculator的calculate方法。

因为HashMap不是线程安全的,所以这里我们使用了synchronized关键字,从而保证一次只有一个线程能够访问calculate方法。

虽然这样的设计能够保证程序的正确执行,但是每次只允许一个线程执行calculate操作,其他调用calculate方法的线程将会被阻塞,在多线程的执行环境中这会严重影响速度。从而导致使用缓存可能比不使用缓存需要的时间更长。

使用ConcurrentHashMap

因为HashMap不是线程安全的,那么我们可以尝试使用线程安全的ConcurrentHashMap来替代HashMap。如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class MemoizedCalculator2<A, V> implements Calculator<A, V> {

private final Map<A, V> cache= new ConcurrentHashMap<>();
private final Calculator<A, V> calculator;

public MemoizedCalculator2(Calculator<A, V> calculator){
this.calculator=calculator;
}
@Override
public V calculate(A arg) throws InterruptedException {
V result= cache.get(arg);
if( result ==null ){
result= calculator.calculate(arg);
cache.put(arg, result);
}
return result;
}
}

上面的例子中虽然解决了之前的线程等待的问题,但是当有两个线程同时在进行同一个计算的时候,仍然不能保证缓存重用,这时候两个线程都会分别调用计算方法,从而导致重复计算。

我们希望的是如果一个线程正在做计算,其他的线程只需要等待这个线程的执行结果即可。很自然的,我们想到了之前讲到的FutureTask。FutureTask表示一个计算过程,我们可以通过调用FutureTask的get方法来获取执行的结果,如果该执行正在进行中,则会等待。

下面我们使用FutureTask来进行改写。

FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码@Slf4j
public class MemoizedCalculator3<A, V> implements Calculator<A, V> {

private final Map<A, Future<V>> cache= new ConcurrentHashMap<>();
private final Calculator<A, V> calculator;

public MemoizedCalculator3(Calculator<A, V> calculator){
this.calculator=calculator;
}
@Override
public V calculate(A arg) throws InterruptedException {
Future<V> future= cache.get(arg);
V result=null;
if( future ==null ){
Callable<V> callable= new Callable<V>() {
@Override
public V call() throws Exception {
return calculator.calculate(arg);
}
};
FutureTask<V> futureTask= new FutureTask<>(callable);
future= futureTask;
cache.put(arg, futureTask);
futureTask.run();
}
try {
result= future.get();
} catch (ExecutionException e) {
log.error(e.getMessage(),e);
}
return result;
}
}

上面的例子,我们用FutureTask来封装计算,并且将FutureTask作为Map的value。

上面的例子已经体现了很好的并发性能。但是因为if语句是非原子性的,所以对这一种先检查后执行的操作,仍然可能存在同一时间调用的情况。

这个时候,我们可以借助于ConcurrentHashMap的原子性操作putIfAbsent来重写上面的类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码@Slf4j
public class MemoizedCalculator4<A, V> implements Calculator<A, V> {

private final Map<A, Future<V>> cache= new ConcurrentHashMap<>();
private final Calculator<A, V> calculator;

public MemoizedCalculator4(Calculator<A, V> calculator){
this.calculator=calculator;
}
@Override
public V calculate(A arg) throws InterruptedException {
while (true) {
Future<V> future = cache.get(arg);
V result = null;
if (future == null) {
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return calculator.calculate(arg);
}
};
FutureTask<V> futureTask = new FutureTask<>(callable);
future = cache.putIfAbsent(arg, futureTask);
if (future == null) {
future = futureTask;
futureTask.run();
}

try {
result = future.get();
} catch (CancellationException e) {
log.error(e.getMessage(), e);
cache.remove(arg, future);
} catch (ExecutionException e) {
log.error(e.getMessage(), e);
}
return result;
}
}
}
}

上面使用了一个while循环,来判断从cache中获取的值是否存在,如果不存在则调用计算方法。

上面我们还要考虑一个缓存污染的问题,因为我们修改了缓存的结果,如果在计算的时候,计算被取消或者失败,我们需要从缓存中将FutureTask移除。

第二十五章 java中CompletionService的使用

之前的文章中我们讲到了ExecutorService,通过ExecutorService我们可以提交一个个的task,并且返回Future,然后通过调用Future.get方法来返回任务的执行结果。

这种方式虽然有效,但是需要保存每个返回的Future值,还是比较麻烦的,幸好ExecutorService提供了一个invokeAll的方法,来保存所有的Future值,我们看一个具体的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码   public void useExecutorService() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);

Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};

List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);

List<Future<String>> futures = executor.invokeAll(callableTasks);

executor.shutdown();

}

上面的例子中,我们定义了3个task,通过调用executor.invokeAll(callableTasks)返回了一个 List,这样我们就可以得到所有的返回值了。

除了上面的invokeAll方法外,我们今天要介绍一个CompletionService接口。

CompletionService实际上是ExecutorService和BlockingQueue的结合体,ExecutorService用来提交任务,而BlockingQueue用来保存封装成Future的执行结果。通过调用take和poll的方法来获取到Future值。

CompletionService是一个接口,我们看下它的一个具体实现ExecutorCompletionService:

1
2
3
4
5
6
7
8
java复制代码    public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

ExecutorCompletionService接收一个Executor作为参数。

我们看下上面的例子如果用ExecutorCompletionService重写是怎么样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码   public void useCompletionService() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletionService<String> completionService=new ExecutorCompletionService<String>(executor);
Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
for(int i=0; i< 5; i ++){
completionService.submit(callableTask);
}

for(int i=0; i<5; i++){
Future<String> result=completionService.take();
System.out.println(result.get());
}
}

上面的例子通过completionService.submit来提交任务,通过completionService.take()来获取结果值。

其实CompletionService还有一个poll的方法,poll和take的区别在于:take如果获取不到值则会等待,而poll则会返回null。

第二十六章 使用ExecutorService来停止线程服务

之前的文章中我们提到了ExecutorService可以使用shutdown和shutdownNow来关闭。

这两种关闭的区别在于各自的安全性和响应性。shutdownNow强行关闭速度更快,但是风险也更大,因为任务可能正在执行的过程中被结束了。而shutdown正常关闭虽然速度比较慢,但是却更安全,因为它一直等到队列中的所有任务都执行完毕之后才关闭。

使用shutdown

我们先看一个使用shutdown的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码    public void useShutdown() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);

Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};

executor.submit(runnableTask);
executor.shutdown();
executor.awaitTermination(800, TimeUnit.MILLISECONDS);
}

awaitTermination将会阻塞直到所有正在执行的任务完成,或者达到指定的timeout时间。

使用shutdownNow

当通过shutdownNow来强行关闭ExecutorService是, 它会尝试取消正在执行的任务,并返回所有已经提交但是还没有开始的任务。从而可以将这些任务保存起来,以便以后进行处理。

但是这样我们只知道了还没有开始执行的任务,对于那些已经开始执行但是没有执行完毕却被取消的任务我们无法获取。

我们看下如何获得开始执行但是还没有执行完毕的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService executorService;
private final Set<Runnable> taskCancelledAtShutdown= Collections.synchronizedSet(new HashSet<Runnable>());

public TrackingExecutor(ExecutorService executorService){
this.executorService=executorService;
}
@Override
public void shutdown() {
executorService.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return executorService.shutdownNow();
}

@Override
public boolean isShutdown() {
return executorService.isShutdown();
}

@Override
public boolean isTerminated() {
return executorService.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executorService.awaitTermination(timeout,unit);
}

@Override
public void execute(Runnable command) {
executorService.execute(() -> {
try {
command.run();
}finally {
if(isShutdown() && Thread.currentThread().isInterrupted()){
taskCancelledAtShutdown.add(command);
}
}
});
}

public List<Runnable> getCancelledTask(){
if(! executorService.isTerminated()){
throw new IllegalStateException("executorService is not terminated");
}
return new ArrayList<>(taskCancelledAtShutdown);
}
}

上面的例子中我们构建了一个新的ExecutorService,他传入一个ExecutorService,并对其进行封装。

我们重写了execute方法,在执行完毕判断该任务是否被中断,如果被中断则将其添加到CancelledTask列表中。

并提供一个getCancelledTask方法来返回未执行完毕的任务。

我们看下怎么使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码    public void useShutdownNow() throws InterruptedException {
TrackingExecutor trackingExecutor=new TrackingExecutor(Executors.newCachedThreadPool());

Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};

trackingExecutor.submit(runnableTask);
List<Runnable> notrunList=trackingExecutor.shutdownNow();
if(trackingExecutor.awaitTermination(800, TimeUnit.SECONDS)){
List<Runnable> runButCancelledList= trackingExecutor.getCancelledTask();
}
}

trackingExecutor.shutdownNow()返回的是未执行的任务。而trackingExecutor.getCancelledTask()返回的是被取消的任务。

上面的任务其实还有一个缺点,因为我们在存储被取消的任务列表的额时候taskCancelledAtShutdown.add(command),因为之前的判断不是原子操作,则可能会产生误报。

第二十七章 我们的线程被饿死了

我们在构建线程池的时候可以构建单个线程的线程池和多个线程的线程池。

那么线程池使用不当可不可能产生死锁呢?我们知道死锁是循环争夺资源而产生的。线程池中的线程也是资源的一种,那么如果对线程池中的线程进行争夺的话也是可能产生死锁的。

在单个线程的线程池中,如果一个正在执行的线程中,使用该线程池再去提交第二个任务,因为线程池中的线程只有一个,那么第二个任务将会等待第一个任务的执行完成来释放线程,而第一个任务又在等待第二任务的执行来完成任务。从而产生了线程饥饿死锁(Thread Starvation Deadlock).

线程饥饿死锁并不一定在单个线程的线程池中产生,只要有这种循环使用线程池的情况都可能产生这种问题。

我们看下例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class ThreadPoolDeadlock {

ExecutorService executorService= Executors.newSingleThreadExecutor();

public class RenderPageTask implements Callable<String> {
public String call() throws Exception{
Future<String> header, footer;
header= executorService.submit(()->{
return "header";
});
footer= executorService.submit(()->{
return "footer";
});
return header.get()+ footer.get();
}
}

public void submitTask(){
executorService.submit(new RenderPageTask());
}
}

我们在executorService中提交了一个RenderPageTask,而RenderPageTask又提交了两个task。因为ExecutorService线程池只有一个线程,则会产生死锁。

我们的线程被饿死了!

第二十八章 java中有界队列的饱和策略(reject policy)

我们在使用ExecutorService的时候知道,在ExecutorService中有个一个Queue来保存提交的任务,通过不同的构造函数,我们可以创建无界的队列(ExecutorService.newCachedThreadPool)和有界的队列(ExecutorService newFixedThreadPool(int nThreads))。

无界队列很好理解,我们可以无限制的向ExecutorService提交任务。那么对于有界队列来说,如果队列满了该怎么处理呢?

今天我们要介绍一下java中ExecutorService的饱和策略(reject policy)。

以ExecutorService的具体实现ThreadPoolExecutor来说,它定义了4种饱和策略。分别是AbortPolicy,DiscardPolicy,DiscardOldestPolicy和CallerRunsPolicy。

如果要在ThreadPoolExecutor中设定饱和策略可以调用setRejectedExecutionHandler方法,如下所示:

1
2
3
4
java复制代码        ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(20));
threadPoolExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.AbortPolicy()
);

上面的例子中我们定义了一个初始5个,最大10个工作线程的Thread Pool,并且定义其中的Queue的容量是20。如果提交的任务超出了容量,则会使用AbortPolicy策略。

AbortPolicy

AbortPolicy意思是如果队列满了,最新的提交任务将会被拒绝,并抛出RejectedExecutionException异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码   public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

上面的代码中,rejectedExecution方法中我们直接抛出了RejectedExecutionException异常。

DiscardPolicy

DiscardPolicy将会悄悄的丢弃提交的任务,而不报任何异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

DiscardOldestPolicy

DiscardOldestPolicy将会丢弃最老的任务,保存最新插入的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码   public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

我们看到在rejectedExecution方法中,poll了最老的一个任务,然后使用ThreadPoolExecutor提交了一个最新的任务。

CallerRunsPolicy

CallerRunsPolicy和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务,从而降低调用者的调用速度。我们看下是怎么实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

在rejectedExecution方法中,直接调用了 r.run()方法,这会导致该方法直接在调用者的主线程中执行,而不是在线程池中执行。从而导致主线程在该任务执行结束之前不能提交任何任务。从而有效的阻止了任务的提交。

使用Semaphore

如果我们并没有定义饱和策略,那么有没有什么方法来控制任务的提交速度呢?考虑下之前我们讲到的Semaphore,我们可以指定一定的资源信号量来控制任务的提交,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class SemaphoreUsage {

private final Executor executor;
private final Semaphore semaphore;

public SemaphoreUsage(Executor executor, int count) {
this.executor = executor;
this.semaphore = new Semaphore(count);
}

public void submitTask(final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
executor.execute(() -> {
try {
command.run();
} finally {
semaphore.release();
}
}
);
} catch (RejectedExecutionException e) {
semaphore.release();
}
}

}

第二十九章 由于不当的执行顺序导致的死锁

为了保证线程的安全,我们引入了加锁机制,但是如果不加限制的使用加锁,就有可能会导致顺序死锁(Lock-Ordering Deadlock)。上篇文章我们也提到了在线程词中因为资源的不足而导致的资源死锁(Resource Deadlock)。

本文将会讨论一下顺序死锁的问题。

我们来讨论一个经常存在的账户转账的问题。账户A要转账给账户B。为了保证在转账的过程中A和B不被其他的线程意外的操作,我们需要给A和B加锁,然后再进行转账操作, 我们看下转账的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码    public void transferMoneyDeadLock(Account from,Account to, int amount) throws InsufficientAmountException {
synchronized (from){
synchronized (to){
transfer(from,to,amount);
}
}
}

private void transfer(Account from,Account to, int amount) throws InsufficientAmountException {
if(from.getBalance() < amount){
throw new InsufficientAmountException();
}else{
from.debit(amount);
to.credit(amount);
}
}

看起来上面的程序好像没有问题,因为我们给from和to都加了锁,程序应该可以很完美的按照我们的要求来执行。

那如果我们考虑下面的一个场景:

1
2
java复制代码A:transferMoneyDeadLock(accountA, accountB, 20)
B:transferMoneyDeadLock(accountB, accountA, 10)

如果A和B同时执行,则可能会产生A获得了accountA的锁,而B获得了accountB的锁。从而后面的代码无法继续执行,从而导致了死锁。

对于这样的情况,我们有没有什么好办法来处理呢?

加入不管参数怎么传递,我们都先lock accountA再lock accountB是不是就不会出现死锁的问题了呢?

我们看下代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码    private void transfer(Account from,Account to, int amount) throws InsufficientAmountException {
if(from.getBalance() < amount){
throw new InsufficientAmountException();
}else{
from.debit(amount);
to.credit(amount);
}
}

public void transferMoney(Account from,Account to, int amount) throws InsufficientAmountException {

int fromHash= System.identityHashCode(from);
int toHash = System.identityHashCode(to);

if(fromHash < toHash){
synchronized (from){
synchronized (to){
transfer(from,to, amount);
}
}
}else if(fromHash < toHash){
synchronized (to){
synchronized (from){
transfer(from,to, amount);
}
}
}else{
synchronized (lock){
synchronized (from) {
synchronized (to) {
transfer(from, to, amount);
}
}
}
}
}

上面的例子中,我们使用了System.identityHashCode来获得两个账号的hash值,通过比较hash值的大小来选定lock的顺序。

如果两个账号的hash值恰好相等的情况下,我们引入了一个新的外部lock,从而保证同一时间只有一个线程能够运行内部的方法,从而保证了任务的执行而不产生死锁。

第三十章 非阻塞同步机制和CAS

我们知道在java 5之前同步是通过Synchronized关键字来实现的,在java 5之后,java.util.concurrent包里面添加了很多性能更加强大的同步类。这些强大的类中很多都实现了非阻塞的同步机制从而帮助其提升性能。

什么是非阻塞同步

非阻塞同步的意思是多个线程在竞争相同的数据时候不会发生阻塞,从而能够在更加细粒度的维度上进行协调,从而极大的减少线程调度的开销,从而提升效率。非阻塞算法不存在锁的机制也就不存在死锁的问题。

在基于锁的算法中,如果一个线程持有了锁,那么其他的线程将无法进行下去。使用锁虽然可以保证对资源的一致性访问,但是在挂起和恢复线程的执行过程中存在非常大的开销,如果锁上面存在着大量的竞争,那么有可能调度开销比实际工作开销还要高。

悲观锁和乐观锁

我们知道独占锁是一个悲观锁,悲观锁的意思就是假设最坏的情况,如果你不锁定该资源,那么就有其他的线程会修改该资源。悲观锁虽然可以保证任务的顺利执行,但是效率不高。

乐观锁就是假设其他的线程不会更改要处理的资源,但是我们在更新资源的时候需要判断该资源是否被别的线程所更改。如果被更改那么更新失败,我们可以重试,如果没有被更改,那么更新成功。

使用乐观锁的前提是假设大多数时间系统对资源的更新是不会产生冲突的。

乐观锁的原子性比较和更新操作,一般都是由底层的硬件支持的。

CAS

大多数的处理器都实现了一个CAS指令(compare and swap),通常来说一个CAS接收三个参数,数据的现值V,进行比较的值A,准备写入的值B。只有当V和A相等的时候,才会写入B。无论是否写入成功,都会返回V。翻译过来就是“我认为V现在的值是A,如果是那么将V的值更新为B,否则不修改V的值,并告诉我现在V的值是多少。”

这就是CAS的含义,JDK中的并发类是通过使用Unsafe类来使用CAS的,我们可以自己构建一个并发类,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class CasCounter {

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
private volatile int value;

static {
try {
valueOffset = unsafe.objectFieldOffset
(CasCounter.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

public CasCounter(int initialValue) {
value = initialValue;
}

public CasCounter() {
}

public final int get() {
return value;
}

public final void set(int newValue) {
value = newValue;
}

public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

}

上面的例子中,我们定义了一个原子操作compareAndSet, 它内部调用了unsafe的compareAndSwapInt方法。

看起来上面的CAS使用比直接使用锁复杂,但实际上在JVM中实现锁定时需要遍历JVM中一条非常复杂的代码路径,并可能导致操作系统级的锁定,线程挂机和上下文切换等操作。在最好的情况下,锁定需要执行一次CAS命令。

CAS的主要缺点就是需要调用者自己来处理竞争问题(重试,回退,放弃),而在锁中可以自动处理这些问题。

前面的文章我们也讲到了原子变量,原子变量的底层就是使用CAS。

第三十一章 非阻塞算法(Lock-Free)的实现

上篇文章我们讲到了使用锁会带来的各种缺点,本文将会讲解如何使用非阻塞算法。非阻塞算法一般会使用CAS来协调线程的操作。

虽然非阻塞算法有诸多优点,但是在实现上要比基于锁的算法更加繁琐和负责。

本文将会介绍两个是用非阻塞算法实现的数据结构。

非阻塞的栈

我们先使用CAS来构建几个非阻塞的栈。栈是最简单的链式结构,其本质是一个链表,而链表的根节点就是栈顶。

我们先构建Node数据结构:

1
2
3
4
5
6
7
8
java复制代码public class Node<E> {
public final E item;
public Node<E> next;

public Node(E item){
this.item=item;
}
}

这个Node保存了内存item和它的下一个节点next。

然后我们构建非阻塞的栈,在该栈中我们需要实现pop和push方法,我们使用一个Atomic类来保存top节点的引用,在pop和push之前调用compareAndSet命令来保证命令的原子性。同时,我们需要不断的循环,以保证在线程冲突的时候能够重试更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class ConcurrentStack<E> {

AtomicReference<Node<E>> top= new AtomicReference<>();

public void push(E item){
Node<E> newNode= new Node<>(item);
Node<E> oldNode;
do{
oldNode=top.get();
newNode.next= oldNode;
}while(!top.compareAndSet(oldNode, newNode));
}

public E pop(){
Node<E> oldNode;
Node<E> newNode;
do {
oldNode = top.get();
if(oldNode == null){
return null;
}
newNode=oldNode.next;
}while(!top.compareAndSet(oldNode, newNode));
return oldNode.item;
}

}

非阻塞的链表

构建链表要比构建栈复杂。因为我们要维持头尾两个指针。以put方法来说,我们需要执行两步操作:1. 在尾部插入新的节点。2.将尾部指针指向最新的节点。

我们使用CAS最多只能保证其中的一步是原子执行。那么对于1和2的组合步骤该怎么处理呢?

我们再仔细考虑考虑,其实1和2并不一定要在同一个线程中执行,其他线程在检测到有线程插入了节点,但是没有将tail指向最后的节点时,完全帮忙完成这个操作。

我们看下具体的代码实现:

1
2
3
4
5
6
7
8
9
java复制代码public class LinkedNode<E> {
public final E item;
public final AtomicReference<LinkedNode<E>> next;

public LinkedNode(E item, LinkedNode<E> next){
this.item=item;
this.next=new AtomicReference<>(next);
}
}

先构建一个LinkedNode类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class LinkedQueue<E> {
private final LinkedNode<E> nullNode= new LinkedNode<>(null, null);
private final AtomicReference<LinkedNode<E>> head= new AtomicReference<>(nullNode);
private final AtomicReference<LinkedNode<E>> tail= new AtomicReference<>(nullNode);

public boolean put(E item){
LinkedNode<E> newNode = new LinkedNode<>(item, null);
while (true){
LinkedNode<E> currentTail= tail.get();
LinkedNode<E> tailNext= currentTail.next.get();
if(currentTail == tail.get()){
if (tailNext != null) {
//有其他的线程已经插入了一个节点,但是还没有将tail指向最新的节点
tail.compareAndSet(currentTail, tailNext);
}else{
//没有其他的线程插入节点,那么做两件事情:1. 插入新节点,2.将tail指向最新的节点
if(currentTail.next.compareAndSet(null, newNode)){
tail.compareAndSet(currentTail, newNode);
}
}
}
}
}
}

第三十二章 java内存模型(JMM)和happens-before

我们知道java程序是运行在JVM中的,而JVM就是构建在内存上的虚拟机,那么内存模型JMM是做什么用的呢?

我们考虑一个简单的赋值问题:

1
java复制代码int a=100;

JMM考虑的就是什么情况下读取变量a的线程可以看到值为100。看起来这是一个很简单的问题,赋值之后不就可以读到值了吗?

但是上面的只是我们源码的编写顺序,当把源码编译之后,在编译器中生成的指令的顺序跟源码的顺序并不是完全一致的。处理器可能采用乱序或者并行的方式来执行指令(在JVM中只要程序的最终执行结果和在严格串行环境中执行结果一致,这种重排序是允许的)。并且处理器还有本地缓存,当将结果存储在本地缓存中,其他线程是无法看到结果的。除此之外缓存提交到主内存的顺序也肯能会变化。

上面提到的种种可能都会导致在多线程环境中产生不同的结果。在多线程环境中,大部分时间多线程都是在执行各自的任务,只有在多个线程需要共享数据的时候,才需要协调线程之间的操作。

而JMM就是JVM中必须遵守的一组最小保证,它规定了对于变量的写入操作在什么时候对其他线程是可见的。

重排序

上面讲了JVM中的重排序,这里我们举个例子,以便大家对重排序有一个更深入的理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码@Slf4j
public class Reorder {

int x=0, y=0;
int a=0, b=0;

private void reorderMethod() throws InterruptedException {

Thread one = new Thread(()->{
a=1;
x=b;
});

Thread two = new Thread(()->{
b=1;
y=a;
});
one.start();
two.start();
one.join();
two.join();
log.info("{},{}", x, y);
}

public static void main(String[] args) throws InterruptedException {

for (int i=0; i< 100; i++){
new Reorder().reorderMethod();
}
}
}

上面的例子是一个很简单的并发程序。由于我们没有使用同步限制,所以线程one和two的执行顺序是不定的。有可能one在two之前执行,也有可能在two之后执行,也可能两者同时执行。不同的执行顺序可能会导致不同的输出结果。

同时虽然我们在代码中指定了先执行a=1, 再执行x=b,但是这两条语句实际上是没有关系的,在JVM中完全可能将两条语句重排序成x=b在前,a=1在后,从而导致输出更多意想不到的结果。

Happens-Before

为了保证java内存模型中的操作顺序,JMM为程序中的所有操作定义了一个顺序关系,这个顺序叫做Happens-Before。要想保证操作B看到操作A的结果,不管A和B是在同一线程还是不同线程,那么A和B必须满足Happens-Before的关系。如果两个操作不满足happens-before的关系,那么JVM可以对他们任意重排序。

我们看一下happens-before的规则:

  1. 程序顺序规则: 如果在程序中操作A在操作B之前,那么在同一个线程中操作A将会在操作B之前执行。

注意,这里的操作A在操作B之前执行是指在单线程环境中,虽然虚拟机会对相应的指令进行重排序,但是最终的执行结果跟按照代码顺序执行是一样的。虚拟机只会对不存在依赖的代码进行重排序。

  1. 监视器锁规则: 监视器上的解锁操作必须在同一个监视器上面的加锁操作之前执行。

锁我们大家都很清楚了,这里的顺序必须指的是同一个锁,如果是在不同的锁上面,那么其执行顺序也不能得到保证。

  1. volatile变量规则: 对volatile变量的写入操作必须在对该变量的读操作之前执行。

原子变量和volatile变量在读写操作上面有着相同的语义。

  1. 线程启动规则: 线程上对Thread.start的操作必须要在该线程中执行任何操作之前执行。
  2. 线程结束规则: 线程中的任何操作都必须在其他线程检测到该线程结束之前执行。
  3. 中断规则: 当一个线程再另一个线程上调用interrupt时,必须在被中断线程检测到interrupt调用之前执行。
  4. 终结器规则: 对象的构造函数必须在启动该对象的终结器之前执行完毕。
  5. 传递性: 如果操作A在操作B之前执行,并且操作B在操作C之前执行,那么操作A必须在操作C之前执行。

上面的规则2很好理解,在加锁的过程中,不允许其他的线程获得该锁,也意味着其他的线程必须等待锁释放之后才能加锁和执行其业务逻辑。

4,5,6,7规则也很好理解,只有开始,才能结束。这符合我们对程序的一般认识。

8的传递性相信学过数学的人应该也不难理解。

接下来我们重点讨论一下规则3和规则1的结合。讨论之前我们再总结一下happens-before到底是做什么的。

因为JVM会对接收到的指令进行重排序,为了保证指令的执行顺序,我们才有了happens-before规则。上面讲到的2,3,4,5,6,7规则可以看做是重排序的节点,这些节点是不允许重排序的,只有在这些节点之间的指令才允许重排序。

结合规则1程序顺序规则,我们得到其真正的含义:代码中写在重排序节点之前的指令,一定会在重排序节点执行之前执行。

重排序节点就是一个分界点,它的位置是不能够移动的。看一下下面的直观例子:

线程1中有两个指令:set i=1, set volatile a=2。
线程2中也有两个指令:get volatile a, get i。

按照上面的理论,set和get volatile是两个重排序节点,set必须排在get之前。而依据规则1,代码中set i=1 在set volatile a=2之前,因为set volatile是重排序节点,所以需要遵守程序顺序执行规则,从而set i=1要在set volatile a=2之前执行。同样的道理get volatile a在get i之前执行。最后导致i=1在get i之前执行。

这个操作叫做借助同步。

安全发布

我们经常会用到单例模式来创建一个单的对象,我们看下下面的方法有什么不妥:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class Book {

private static Book book;

public static Book getBook(){
if(book==null){
book = new Book();
}
return book;
}
}

上面的类中定义了一个getBook方法来返回一个新的book对象,返回对象之前,我们先判断了book是否为空,如果不为空的话就new一个book对象。

初看起来,好像没什么问题,但是如果仔细考虑JMM的重排规则,就会发现问题所在。
book=new Book()其实一个复杂的命令,并不是原子性操作。它大概可以分解为1.分配内存,2.实例化对象,3.将对象和内存地址建立关联。

其中2和3有可能会被重排序,然后就有可能出现book返回了,但是还没有初始化完毕的情况。从而出现不可以预见的错误。

根据上面我们讲到的happens-before规则, 最简单的办法就是给方法前面加上synchronized关键字:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class Book {

private static Book book;

public synchronized static Book getBook(){
if(book==null){
book = new Book();
}
return book;
}
}

我们再看下面一种静态域的实现:

1
2
3
4
5
6
7
java复制代码public class BookStatic {
private static BookStatic bookStatic= new BookStatic();

public static BookStatic getBookStatic(){
return bookStatic;
}
}

JVM在类被加载之后和被线程使用之前,会进行静态初始化,而在这个初始化阶段将会获得一个锁,从而保证在静态初始化阶段内存写入操作将对所有的线程可见。

上面的例子定义了static变量,在静态初始化阶段将会被实例化。这种方式叫做提前初始化。

下面我们再看一个延迟初始化占位类的模式:

1
2
3
4
5
6
7
8
9
10
11
java复制代码
public class BookStaticLazy {

private static class BookStaticHolder{
private static BookStaticLazy bookStatic= new BookStaticLazy();
}

public static BookStaticLazy getBookStatic(){
return BookStaticHolder.bookStatic;
}
}

上面的类中,只有在调用getBookStatic方法的时候才会去初始化类。

接下来我们再介绍一下双重检查加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public class BookDLC {
private volatile static BookDLC bookDLC;

public static BookDLC getBookDLC(){
if(bookDLC == null ){
synchronized (BookDLC.class){
if(bookDLC ==null){
bookDLC=new BookDLC();
}
}
}
return bookDLC;
}
}

上面的类中检测了两次bookDLC的值,只有bookDLC为空的时候才进行加锁操作。看起来一切都很完美,但是我们要注意一点,这里bookDLC一定要是volatile。

因为bookDLC的赋值操作和返回操作并没有happens-before,所以可能会出现获取到一个仅部分构造的实例。这也是为什么我们要加上volatile关键词。

初始化安全性

本文的最后,我们将讨论一下在构造函数中含有final域的对象初始化。

对于正确构造的对象,初始化对象保证了所有的线程都能够正确的看到由构造函数为对象给各个final域设置的正确值,包括final域可以到达的任何变量(比如final数组中的元素,final的hashMap等)。

1
2
3
4
5
6
7
8
java复制代码public class FinalSafe {
private final HashMap<String,String> hashMap;

public FinalSafe(){
hashMap= new HashMap<>();
hashMap.put("key1","value1");
}
}

上面的例子中,我们定义了一个final对象,并且在构造函数中初始化了这个对象。那么这个final对象是将不会跟构造函数之后的其他操作重排序。

第三十三章 java多线程之Phaser

前面的文章中我们讲到了CyclicBarrier、CountDownLatch的使用,这里再回顾一下CountDownLatch主要用在一个线程等待多个线程执行完毕的情况,而CyclicBarrier用在多个线程互相等待执行完毕的情况。

Phaser是java 7 引入的新的并发API。他引入了新的Phaser的概念,我们可以将其看成一个一个的阶段,每个阶段都有需要执行的线程任务,任务执行完毕就进入下一个阶段。所以Phaser特别适合使用在重复执行或者重用的情况。

基本使用

在CyclicBarrier、CountDownLatch中,我们使用计数器来控制程序的顺序执行,同样的在Phaser中也是通过计数器来控制。在Phaser中计数器叫做parties, 我们可以通过Phaser的构造函数或者register()方法来注册。

通过调用register()方法,我们可以动态的控制phaser的个数。如果我们需要取消注册,则可以调用arriveAndDeregister()方法。

我们看下arrive:

1
2
3
java复制代码    public int arrive() {
return doArrive(ONE_ARRIVAL);
}

Phaser中arrive实际上调用了doArrive方法,doArrive接收一个adjust参数,ONE_ARRIVAL表示arrive,ONE_DEREGISTER表示arriveAndDeregister。

Phaser中的arrive()、arriveAndDeregister()方法,这两个方法不会阻塞,但是会返回相应的phase数字,当此phase中最后一个party也arrive以后,phase数字将会增加,即phase进入下一个周期,同时触发(onAdvance)那些阻塞在上一phase的线程。这一点类似于CyclicBarrier的barrier到达机制;更灵活的是,我们可以通过重写onAdvance方法来实现更多的触发行为。

下面看一个基本的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码    void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}.start();
}

// allow threads to start and deregister self
phaser.arriveAndDeregister();
}

上面的例子中,我们在执行每个Runnable之前调用register()来注册, 然后调用arriveAndAwaitAdvance()来等待这一个Phaser周期结束。最后我们调用 phaser.arriveAndDeregister();来取消注册主线程。

下面来详细的分析一下运行步骤:

  1. final Phaser phaser = new Phaser(1);

这一步我们初始化了一个Phaser,并且指定其现在party的个数为1。

  1. phaser.register();

这一步注册Runnable task到phaser,同时将party+1。

  1. phaser.arriveAndAwaitAdvance()

这一步将会等待直到所有的party都arrive。这里只会将步骤2中注册的party标记为arrive,而步骤1中初始化的party一直都没有被arrive。

  1. phaser.arriveAndDeregister();

在主线程中,arrive了步骤1中的party,并且将party的个数减一。

  1. 步骤3中的phaser.arriveAndAwaitAdvance()将会继续执行,因为最后一个phaser在步骤4中arrive了。

多个Phaser周期

Phaser的值是从0到Integer.MAX_VALUE,每个周期过后该值就会加一,如果到达Integer.MAX_VALUE则会继续从0开始。

如果我们执行多个Phaser周期,则可以重写onAdvance方法:

1
2
3
java复制代码    protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties == 0;
}

onAdvance将会在最后一个arrive()调用的时候被调用,如果这个时候registeredParties为0的话,该Phaser将会调用isTerminated方法结束该Phaser。

如果要实现多周期的情况,我们可以重写这个方法:

1
2
3
java复制代码protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations || registeredParties == 0;
}

上面的例子中,如果phase次数超过了指定的iterations次数则就会自动终止。

我们看下实际的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码   void startTasks(List<Runnable> tasks, final int iterations) {
final Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations || registeredParties == 0;
}
};
phaser.register();
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}
}.start();
}
phaser.arriveAndDeregister(); // deregister self, don't wait
}

上面的例子将会执行iterations次。

第三十四章 java中Locks的使用

之前文章中我们讲到,java中实现同步的方式是使用synchronized block。在java 5中,Locks被引入了,来提供更加灵活的同步控制。

本文将会深入的讲解Lock的使用。

Lock和Synchronized Block的区别

我们在之前的Synchronized Block的文章中讲到了使用Synchronized来实现java的同步。既然Synchronized Block那么好用,为什么会引入新的Lock呢?

主要有下面几点区别:

  1. synchronized block只能写在一个方法里面,而Lock的lock()和unlock()可以分别在不同的方法里面。
  2. synchronized block 不支持公平锁,一旦锁被释放,任何线程都有机会获取被释放的锁。而使用 Lock APIs则可以支持公平锁。从而让等待时间最长的线程有限执行。
  3. 使用synchronized block,如果线程拿不到锁,将会被Blocked。 Lock API 提供了一个tryLock() 的方法,可以判断是否可以获得lock,这样可以减少线程被阻塞的时间。
  4. 当线程在等待synchronized block锁的时候,是不能被中断的。如果使用Lock API,则可以使用 lockInterruptibly()来中断线程。

Lock interface

我们来看下Lock interface的定义, Lock interface定义了下面几个主要使用的方法:

  • void lock() - 尝试获取锁,如果获取不到锁,则会进入阻塞状态。
  • void lockInterruptibly() - 和lock()很类似,但是它可以将正在阻塞的线程中断,并抛出java.lang.InterruptedException。
  • boolean tryLock() – 这是lock()的非阻塞版本,它回尝试获取锁,并立刻返回是否获取成功。
  • boolean tryLock(long timeout, TimeUnit timeUnit) – 和tryLock()很像,只是多了一个尝试获取锁的时间。
  • void unlock() – unlock实例。
  • Condition newCondition() - 生成一个和当前Lock实例绑定的Condition。

在使用Lock的时候,一定要unlocked,以避免死锁。所以,通常我们我们要在try catch中使用:

1
2
3
4
5
6
7
java复制代码Lock lock = ...; 
lock.lock();
try {
// access to the shared resource
} finally {
lock.unlock();
}

除了Lock接口,还有一个ReadWriteLock接口,在其中定义了两个方法,实现了读锁和写锁分离:

  • Lock readLock() – 返回读锁
  • Lock writeLock() – 返回写锁

其中读锁可以同时被很多线程获得,只要不进行写操作。写锁同时只能被一个线程获取。

接下来,我们几个Lock的常用是实现类。

ReentrantLock

ReentrantLock是Lock的一个实现,什么是ReentrantLock(可重入锁)呢?

简单点说可重入锁就是当前线程已经获得了该锁,如果该线程的其他方法在调用的时候也需要获取该锁,那么该锁的lock数量+1,并且允许进入该方法。

不可重入锁:只判断这个锁有没有被锁上,只要被锁上申请锁的线程都会被要求等待。实现简单

可重入锁:不仅判断锁有没有被锁上,还会判断锁是谁锁上的,当就是自己锁上的时候,那么他依旧可以再次访问临界资源,并把加锁次数加一。

我们看下怎么使用ReentrantLock:

1
2
3
4
5
6
7
8
9
java复制代码    public void perform() {

lock.lock();
try {
counter++;
} finally {
lock.unlock();
}
}

下面是使用tryLock()的例子:

1
2
3
4
5
6
7
8
9
10
11
java复制代码    public void performTryLock() throws InterruptedException {
boolean isLockAcquired = lock.tryLock(1, TimeUnit.SECONDS);

if(isLockAcquired) {
try {
counter++;
} finally {
lock.unlock();
}
}
}

ReentrantReadWriteLock

ReentrantReadWriteLock是ReadWriteLock的一个实现。上面也讲到了ReadWriteLock主要有两个方法:

  • Read Lock - 如果没有线程获得写锁,那么可以多个线程获得读锁。
  • Write Lock - 如果没有其他的线程获得读锁和写锁,那么只有一个线程能够获得写锁。

我们看下怎么使用writeLock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码    Map<String,String> syncHashMap = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();

Lock writeLock = lock.writeLock();

public void put(String key, String value) {
try {
writeLock.lock();
syncHashMap.put(key, value);
} finally {
writeLock.unlock();
}
}

public String remove(String key){
try {
writeLock.lock();
return syncHashMap.remove(key);
} finally {
writeLock.unlock();
}
}

再看下怎么使用readLock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码    Lock readLock = lock.readLock();
public String get(String key){
try {
readLock.lock();
return syncHashMap.get(key);
} finally {
readLock.unlock();
}
}

public boolean containsKey(String key) {
try {
readLock.lock();
return syncHashMap.containsKey(key);
} finally {
readLock.unlock();
}
}

StampedLock

StampedLock也支持读写锁,获取锁的是会返回一个stamp,通过该stamp来进行释放锁操作。

上我们讲到了如果写锁存在的话,读锁是无法被获取的。但有时候我们读操作并不想进行加锁操作,这个时候我们就需要使用乐观读锁。

StampedLock中的stamped类似乐观锁中的版本的概念,当我们在
StampedLock中调用lock方法的时候,就会返回一个stamp,代表锁当时的状态,在乐观读锁的使用过程中,在读取数据之后,我们回去判断该stamp状态是否变化,如果变化了就说明该stamp被另外的write线程修改了,这说明我们之前的读是无效的,这个时候我们就需要将乐观读锁升级为读锁,来重新获取数据。

我们举个例子,先看下write排它锁的情况:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码    private double x, y;
private final StampedLock sl = new StampedLock();

void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}

再看下乐观读锁的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码    double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}

上面使用tryOptimisticRead()来尝试获取乐观读锁,然后通过sl.validate(stamp)来判断该stamp是否被改变,如果改变了,说明之前的read是无效的,那么需要重新来读取。

最后,StampedLock还提供了一个将read锁和乐观读锁升级为write锁的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码   void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
}
else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}

上面的例子是通过使用tryConvertToWriteLock(stamp)来实现升级的。

Conditions

上面讲Lock接口的时候有提到其中的一个方法:

1
java复制代码Condition newCondition();

Condition提供了await和signal方法,类似于Object中的wait和notify。

不同的是Condition提供了更加细粒度的等待集划分。我们举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码public class ConditionUsage {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}

上面的例子实现了一个ArrayBlockingQueue,我们可以看到在同一个Lock实例中,创建了两个Condition,分别代表队列未满,队列未空。通过这种细粒度的划分,我们可以更好的控制业务逻辑。

第三十五章 ABA问题的本质及其解决办法

简介

CAS的全称是compare and swap,它是java同步类的基础,java.util.concurrent中的同步类基本上都是使用CAS来实现其原子性的。

CAS的原理其实很简单,为了保证在多线程环境下我们的更新是符合预期的,或者说一个线程在更新某个对象的时候,没有其他的线程对该对象进行修改。在线程更新某个对象(或值)之前,先保存更新前的值,然后在实际更新的时候传入之前保存的值,进行比较,如果一致的话就进行更新,否则失败。

注意,CAS在java中是用native方法来实现的,利用了系统本身提供的原子性操作。

那么CAS在使用中会有什么问题呢?一般来说CAS如果设计的不够完美的话,可能会产生ABA问题,而ABA问题又可以分为两类,我们先看来看一类问题。

更多内容请访问<www.flydean.com>

第一类问题

我们考虑下面一种ABA的情况:

  1. 在多线程的环境中,线程a从共享的地址X中读取到了对象A。
  2. 在线程a准备对地址X进行更新之前,线程b将地址X中的值修改为了B。
  3. 接着线程b将地址X中的值又修改回了A。
  4. 最新线程a对地址X执行CAS,发现X中存储的还是对象A,对象匹配,CAS成功。

上面的例子中CAS成功了,但是实际上这个CAS并不是原子操作,如果我们想要依赖CAS来实现原子操作的话可能就会出现隐藏的bug。

第一类问题的关键就在2和3两步。这两步我们可以看到线程b直接替换了内存地址X中的内容。

在拥有自动GC环境的编程语言,比如说java中,2,3的情况是不可能出现的,因为在java中,只要两个对象的地址一致,就表示这两个对象是相等的。

2,3两步可能出现的情况就在像C++这种,不存在自动GC环境的编程语言中。因为可以自己控制对象的生命周期,如果我们从一个list中删除掉了一个对象,然后又重新分配了一个对象,并将其add back到list中去,那么根据 MRU memory allocation算法,这个新的对象很有可能和之前删除对象的内存地址是一样的。这样就会导致ABA的问题。

第二类问题

如果我们在拥有自动GC的编程语言中,那么是否仍然存在CAS问题呢?

考虑下面的情况,有一个链表里面的数据是A->B->C,我们希望执行一个CAS操作,将A替换成D,生成链表D->B->C。考虑下面的步骤:

  1. 线程a读取链表头部节点A。
  2. 线程b将链表中的B节点删掉,链表变成了A->C
  3. 线程a执行CAS操作,将A替换成D。

最后我们的到的链表是D->C,而不是D->B->C。

问题出在哪呢?CAS比较的节点A和最新的头部节点是不是同一个节点,它并没有关心节点A在步骤1和3之间是否内容发生变化。

我们举个例子:

1
2
3
4
5
6
7
8
9
10
java复制代码public void useABAReference(){
CustUser a= new CustUser();
CustUser b= new CustUser();
CustUser c= new CustUser();
AtomicReference<CustUser> atomicReference= new AtomicReference<>(a);
log.info("{}",atomicReference.compareAndSet(a,b));
log.info("{}",atomicReference.compareAndSet(b,a));
a.setName("change for new name");
log.info("{}",atomicReference.compareAndSet(a,c));
}

上面的例子中,我们使用了AtomicReference的CAS方法来判断对象是否发生变化。在CAS b和a之后,我们将a的name进行了修改,我们看下最后的输出结果:

1
2
3
java复制代码[main] INFO com.flydean.aba.ABAUsage - true
[main] INFO com.flydean.aba.ABAUsage - true
[main] INFO com.flydean.aba.ABAUsage - true

三个CAS的结果都是true。说明CAS确实比较的两者是否为统一对象,对其中内容的变化并不关心。

第二类问题可能会导致某些集合类的操作并不是原子性的,因为你并不能保证在CAS的过程中,有没有其他的节点发送变化。

第一类问题的解决

第一类问题在存在自动GC的编程语言中是不存在的,我们主要看下怎么在C++之类的语言中解决这个问题。

根据官方的说法,第一类问题大概有四种解法:

  1. 使用中间节点 - 使用一些不代表任何数据的中间节点来表示某些节点是标记被删除的。
  2. 使用自动GC。
  3. 使用hazard pointers - hazard pointers 保存了当前线程正在访问的节点的地址,在这些hazard pointers中的节点不能够被修改和删除。
  4. 使用read-copy update (RCU) - 在每次更新的之前,都做一份拷贝,每次更新的是拷贝出来的新结构。

第二类问题的解决

第二类问题其实算是整体集合对象的CAS问题了。一个简单的解决办法就是每次做CAS更新的时候再添加一个版本号。如果版本号不是预期的版本,就说明有其他的线程更新了集合中的某些节点,这次CAS是失败的。

我们举个AtomicStampedReference的例子:

1
2
3
4
5
6
7
8
9
java复制代码public void useABAStampReference(){
Object a= new Object();
Object b= new Object();
Object c= new Object();
AtomicStampedReference<Object> atomicStampedReference= new AtomicStampedReference(a,0);
log.info("{}",atomicStampedReference.compareAndSet(a,b,0,1));
log.info("{}",atomicStampedReference.compareAndSet(b,a,1,2));
log.info("{}",atomicStampedReference.compareAndSet(a,c,0,1));
}

AtomicStampedReference的compareAndSet方法,多出了两个参数,分别是expectedStamp和newStamp,两个参数都是int型的,需要我们手动传入。

总结

ABA问题其实是由两类问题组成的,需要我们分开来对待和解决。

第三十六章 并发和Read-copy update(RCU)

简介

在上一篇文章中的并发和ABA问题的介绍中,我们提到了要解决ABA中的memory reclamation问题,有一个办法就是使用RCU。

详见ABA问题的本质及其解决办法,今天本文将会深入的探讨一下RCU是什么,RCU和COW(Copy-On-Write)之间的关系。

RCU(Read-copy update)是一种同步机制,并在2002年被加入了Linux内核中。它的优点就是可以在更新的过程中,运行多个reader进行读操作。

熟悉锁的朋友应该知道,对于排它锁,同一时间只允许一个操作进行,不管这个操作是读还是写。

对于读写锁,可以允许同时读,但是不能允许同时写,并且这个写锁是排他的,也就是说写的同时是不允许进行读操作的。

RCU可以支持一个写操作和多个读操作同时进行。

更多内容请访问<www.flydean.com>

Copy on Write和RCU

什么是Copy on Write? 它和read copy update有什么关系呢?

我们把Copy on Write简写为COW,COW是并发中经常会用到的一种算法,java里面就有java.util.concurrent.CopyOnWriteArrayList和java.util.concurrent.CopyOnWriteArraySet。

COW的本质就是,在并发的环境中,如果想要更新某个对象,首先将它拷贝一份,在这个拷贝的对象中进行修改,最后把指向原对象的指针指回更新好的对象。

CopyOnWriteArrayList和CopyOnWriteArraySet中的COW使用在遍历的时候。

我们知道使用Iterator来遍历集合的时候,是不允许在Iterator外部修改集合的数据的,只能在Iterator内部遍历的时候修改,否则会抛出ConcurrentModificationException。

而对于CopyOnWriteArrayList和CopyOnWriteArraySet来说,在创建Iterator的时候,就对原List进行了拷贝,Iterator的遍历是在拷贝过后的List中进行的,这时候如果其他的线程修改了原List对象,程序正常执行,不会抛出ConcurrentModificationException。

同时CopyOnWriteArrayList和CopyOnWriteArraySet中的Iterator是不支持remove,set,add方法的,因为这是拷贝过来的对象,在遍历过后是要被丢弃的。在它上面的修改是没有任何意义的。

在并发情况下,COW其实还有一个问题没有处理,那就是对于拷贝出来的对象什么时候回收的问题,是不是可以马上将对象回收?有没有其他的线程在访问这个对象? 处理这个问题就需要用到对象生命周期的跟踪技术,也就是RCU中的RCU-sync。

所以RCU和COW的关系就是:RCU是由RCU-sync和COW两部分组成的。

因为java中有自动垃圾回收功能,我们并不需要考虑拷贝对象的生命周期问题,所以在java中我们一般只看到COW,看不到RCU。

RCU的流程和API

我们将RCU和排它锁和读写锁进行比较。

对于排它锁来说,需要这两个API:

1
2
java复制代码lock()
unlock()

对于读写锁来说,需要这四个API:

1
2
3
4
java复制代码read_lock()
read_unlock()
write_lock()
write_unlock()

而RCU需要下面三个API:

1
2
3
java复制代码rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()

rcu_read_lock和rcu_read_unlock必须是成对出现的,并且synchronize_rcu不能出现在rcu_read_lock和rcu_read_unlock之间。

虽然RCU并不提供任何排他锁,但是RCU必须要满足下面的两个条件:

  1. 如果Thread1(T1)中synchronize_rcu方法在Thread2(T2)的rcu_read_lock方法之前返回,则happens before synchronize_rcu的操作一定在T2的rcu_read_lock方法之后可见。
  2. 如果T2的rcu_read_lock方法调用在T1的synchronize_rcu方法调用之前,则happens after synchronize_rcu的操作一定在T2的rcu_read_unlock方法之前不可见。

听起来很拗口,没关系,我们画个图来理解一下:

记住RCU比较的是synchronize_rcu和rcu_read_lock的顺序。

Thread2和Thread3中rcu_read_lock在synchronize_rcu之前执行,则b=2在T2,T3中一定不可见。

Thread4中rcu_read_lock虽然在synchronize_rcu启动之后才开始执行的,但是rcu_read_unlock是在synchronize_rcu返回之后才执行的,所以可以等同于看做Thread5的情况。

Thread5中,rcu_read_lock在synchronize_rcu返回之后才执行的,所以a=1一定可见。

RCU要注意的事项

RCU虽然没有提供锁的机制,但允许同时多个线程进行读操作。注意,RCU同时只允许一个synchronize_rcu操作,所以需要我们自己来实现synchronize_rcu的排它锁操作。

所以对于RCU来说,它是一个写多个读的同步机制,而不是多个写多个读的同步机制。

RCU的java实现

最后放上一段大神的RCU的java实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class RCU {
final static long NOT_READING = Long.MAX_VALUE;
final static int MAX_THREADS = 128;
final AtomicLong reclaimerVersion = new AtomicLong(0);
final AtomicLongArray readersVersion = new AtomicLongArray(MAX_THREADS);

public RCU() {
for (int i=0; i < MAX_THREADS; i++) readersVersion.set(i, NOT_READING);
}

public static int getTID() {
return (int)(Thread.currentThread().getId() % MAX_THREADS);
}

public void read_lock(final int tid) { // rcu_read_lock()
final long rv = reclaimerVersion.get();
readersVersion.set(tid, rv);
final long nrv = reclaimerVersion.get();
if (rv != nrv) readersVersion.lazySet(tid, nrv);
}

public void read_unlock(final int tid) { // rcu_read_unlock()
readersVersion.set(tid, NOT_READING);
}

public void synchronize_rcu() {
final long waitForVersion = reclaimerVersion.incrementAndGet();
for (int i=0; i < MAX_THREADS; i++) {
while (readersVersion.get(i) < waitForVersion) { } // spin
}
}
}

简单讲解一下这个RCU的实现:

readersVersion是一个长度为128的Long数组,里面存放着每个reader的读数。默认情况下reader存储的值是NOT_READING,表示未存储任何数据。

在RCU初始化的时候,将会初始化这些reader。

read_unlock方法会将reader的值重置为NOT_READING。

reclaimerVersion存储的是修改的数据,它的值将会在synchronize_rcu方法中进行更新。

同时synchronize_rcu将会遍历所有的reader,只有当所有的reader都读取完毕才继续执行。

最后,read_lock方法将会读取reclaimerVersion的值。这里会读取两次,如果两次的结果不同,则会调用readersVersion.lazySet方法,延迟设置reader的值。

为什么要读取两次呢?因为虽然reclaimerVersion和readersVersion都是原子性操作,但是在多线程环境中,并不能保证reclaimerVersion一定就在readersVersion之前执行,所以我们需要添加一个内存屏障:memory barrier来实现这个功能。

总结

本文介绍了RCU算法和应用。希望大家能够喜欢。

第三十七章 同步类的基础AbstractQueuedSynchronizer(AQS)

我们之前介绍了很多同步类,比如ReentrantLock,Semaphore, CountDownLatch, ReentrantReadWriteLock,FutureTask等。

AQS封装了实现同步器时设计的大量细节问题。他提供了FIFO的wait queues并且提供了一个int型的state表示当前的状态。

根据JDK的说明,并不推荐我们直接使用AQS,我们通常需要构建一个内部类来继承AQS并按照需要重写下面几个方法:

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

在这些方法中,我们需要调用getState, setState 或者 compareAndSetState这三种方法来改变state值。

上面的方法提到了两种操作,独占操作(如:ReentrantLock)和共享操作(如:Semaphore,CountdownLatch)。

两种的区别在于同一时刻能否有多个线程同时获取到同步状态。

比如我们运行同时多个线程去读,但是同时只允许一个线程去写,那么这里的读锁就是共享操作,而写锁就是独占操作。

在基于QAS构建的同步类中,最基本的操作就是获取操作和释放操作。而这个state就表示的是这些获取和释放操作所依赖的值。

State是一个int值,你可以使用它来表示任何状态,比如ReentrantLock用它来表示所有者线程重复获取该锁的次数。Semaphore用它来表示剩余的许可量,而FutureTask用它来表示任务的状态(开始,运行,完成或者取消)。当然你还可以自定义额外的状态变量来表示其他的信息。

下的伪代码表示的是AQS中获取和释放操作的形式:

1
2
3
4
5
6
7
8
9
java复制代码   Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}

Release:
if (tryRelease(arg))
unblock the first queued thread;

获取操作,首先判断当前状态是否允许获取操作,如果如果不允许,则将当前的线程入Queue,并且有可能阻塞当前线程。

释放操作,则先判断是否运行释放操作,如果允许,则解除queue中的thread,并运行。

我们看一个具体的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class AQSUsage {

private final Sync sync= new Sync();

private class Sync extends AbstractQueuedSynchronizer{
protected int tryAcquireShared(int ignored){
return (getState() ==1 )? 1: -1;
}
protected boolean tryReleaseShared(int ignored){
setState(1);
return true;
}
}

public void release() {
sync.releaseShared(0);
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}
}

上面的例子中,我们定义了一个内部类Sync,在这个类中我们实现了tryAcquireShared和tryReleaseShared两个方法,在这两个方法中我们判断并设置了state的值。

sync.releaseShared和sync.acquireSharedInterruptibly会分别调用tryAcquireShared和tryReleaseShared方法。

前面我们也提到了很多同步类都是使用AQS来实现的,我们可以再看看其他标准同步类中tryAcquire的实现。

首先看下ReentrantLock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码   final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

ReentrantLock只支持独占锁。所以它需要实现tryAcquire方法。除此之外它还维护了一个owner变量来保存当前所有者线程的标志符,从而来实现可重入锁。

我们再看下Semaphore和CountDownLatch的实现,因为他们是共享操作,所以需要实现tryAcqureShared方法:

1
2
3
4
5
6
7
8
9
java复制代码        final int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

第三十八章 java并发Exchanger的使用

简介

Exchanger是java 5引入的并发类,Exchanger顾名思义就是用来做交换的。这里主要是两个线程之间交换持有的对象。当Exchanger在一个线程中调用exchange方法之后,会等待另外的线程调用同样的exchange方法。

两个线程都调用exchange方法之后,传入的参数就会交换。

类定义

1
java复制代码public class Exchanger<V>

其中V表示需要交换的对象类型。

类继承

1
2
java复制代码java.lang.Object
↳ java.util.concurrent.Exchanger<V>

Exchanger直接继承自Object。

构造函数

1
java复制代码Exchanger()

Exchanger提供一个无参构造函数。

两个主要方法

  1. public V exchange(V x) throws InterruptedException

当这个方法被调用的时候,当前线程将会等待直到其他的线程调用同样的方法。当其他的线程调用exchange之后,当前线程将会继续执行。

在等待过程中,如果有其他的线程interrupt当前线程,则会抛出InterruptedException。

  1. public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

和第一个方法类似,区别是多了一个timeout时间。如果在timeout时间之内没有其他线程调用exchange方法,则会抛出TimeoutException。

具体的例子

我们先定义一个带交换的类:

1
2
3
4
5
java复制代码@Data
public class CustBook {

private String name;
}

然后定义两个Runnable,在run方法中调用exchange方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码@Slf4j
public class ExchangerOne implements Runnable{

Exchanger<CustBook> ex;

ExchangerOne(Exchanger<CustBook> ex){
this.ex=ex;
}

@Override
public void run() {
CustBook custBook= new CustBook();
custBook.setName("book one");

try {
CustBook exhangeCustBook=ex.exchange(custBook);
log.info(exhangeCustBook.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码@Slf4j
public class ExchangerTwo implements Runnable{

Exchanger<CustBook> ex;

ExchangerTwo(Exchanger<CustBook> ex){
this.ex=ex;
}

@Override
public void run() {
CustBook custBook= new CustBook();
custBook.setName("book two");

try {
CustBook exhangeCustBook=ex.exchange(custBook);
log.info(exhangeCustBook.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

最后在主方法中调用:

1
2
3
4
5
6
7
8
9
java复制代码public class ExchangerUsage {

public static void main(String[] args) {
Exchanger<CustBook> exchanger = new Exchanger<>();
// Starting two threads
new Thread(new ExchangerOne(exchanger)).start();
new Thread(new ExchangerTwo(exchanger)).start();
}
}

我们看下结果:

1
2
java复制代码22:14:09.069 [Thread-1] INFO com.flydean.ExchangerTwo - book one
22:14:09.073 [Thread-0] INFO com.flydean.ExchangerOne - book two

可以看到对象已经被交换了。

结语

Exchanger在两个线程需要交换对象的时候非常好用。大家可以在实际工作生活中使用。

本文的例子github.com/ddean2009/l…

本文PDF下载链接concurrent-all-in-one.pdf

欢迎关注我的公众号:程序那些事,更多精彩等着您!
更多内容请访问 <www.flydean.com>

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Async注解其实也就这么回事。

发表于 2021-09-06

你好呀,我是why。

我之前写过一些关于线程池的文章,然后有同学去翻了一圈,发现我没有写过一篇关于 @Async 注解的文章,于是他来问我:

是的,我摊牌了。

我不喜欢这个注解的原因,是因为我压根就没用过。

我习惯用自定义线程池的方式去做一些异步的逻辑,且这么多年一直都是这样用的。

所以如果是我主导的项目,你在项目里面肯定是看不到 @Async 注解的。

那我之前见过 @Async 注解吗?

肯定是见过啊,有的朋友就喜欢用这个注解。

一个注解就搞定异步开发,多爽啊。

我不知道用这个注解的人知不知道其原理,反正我是不知道的。

最近开发的时候引入了一个组件,发现调用的方法里面,有的地方用到了这个注解。

既然这次用到了,那就研究一下吧。

首先需要说明的是,本文并不会写线程池相关的知识点。

仅描述我是通过什么方式,去了解这个我之前一无所知的注解的。

搞个 Demo

不知道大家如果碰到这种情况会去怎么下手啊。

但是我认为不论是从什么角度去下手的,最后一定是会落到源码里面的。

所以,我一般是先搞个 Demo。

Demo 非常简单啊,就三个类。

首先是启动类,这没啥说的:

然后搞个 service:

这个 service 里面的 syncSay 方法被打上了 @Async 注解。

最后,搞个 Controller 来调用它,完事:

Demo 就搭建好了,你也动手去搞一个,耗时超过 5 分钟,算我输。

然后,把项目启动起来,调用接口,查看日志:

我去,从线程名称来看,这也没异步呀?

怎么还是 tomcat 的线程呢?

于是,我就遇到了研究路上的第一个问题:@Async 注解没有生效。

为啥不生效?

为什么不生效呢?

我也是懵逼的,我说了之前对这个注解一无所知,那我怎么知道呢?

那遇到这个问题的时候会怎么办?

当然是面向浏览器编程啦!

这个地方,如果我自己从源码里面去分析为啥没生效,一定也能查出原因。

但是,如果我面向浏览器编程,只需要 30 秒,我就能查到这两个信息:

失效原因:

  • 1.@SpringBootApplication 启动类当中没有添加 @EnableAsync 注解。
  • 2.没有走 Spring 的代理类。因为 @Transactional 和 @Async 注解的实现都是基于 Spring 的 AOP,而 AOP 的实现是基于动态代理模式实现的。那么注解失效的原因就很明显了,有可能因为调用方法的是对象本身而不是代理对象,因为没有经过 Spring 容器管理。

很显然,我这个情况符合第一种情况,没有添加 @EnableAsync 注解。

另外一个原因,我也很感兴趣,但是现在我的首要任务是把 Demo 搭建好,所以不能被其他信息给诱惑了。

很多同学带着问题去查询的时候,本来查的问题是@Async 注解为什么没有生效,结果慢慢的就走偏了,十五分钟后问题就逐渐演变为了 SpringBoot 的启动流程。

再过半小时,网页上就显示的是一些面试必背八股文之类的东西…

我说这个意思就是,查问题就好好查问题。查问题的过程中肯定会由这个问题引发的自己更加感兴趣的问题。但是,记录下来,先不要让问题发散。

这个道理,就和带着问题去看源码一样,看着看着,可能连自己的问题是什么都不知道了。

好了,说回来。

我在启动类上加上该注解:

再次发起调用:

可以看到线程名字变了,说明真的就好了。

现在我的 Demo 已经搭好了,可以开始找角度去卷了。

从上面的日志我也能知道,在默认情况下有一个线程前缀为 task- 的线程池在帮我执行任务。

说到线程池,我就得知道这个线程池的相关配置才放心。

那么我怎么才能知道呢?

先压一压

其实正常人的思路这个时候就应该是去翻源码,找对应的注入线程池的地方。

而我,就有点不正常了,我懒得去源码里面找,我想让它自己暴露到我的面前。

怎么让它暴露出来呢?

仗着我对线程池的了解,我的第一个思路是先压一压这个线程池。

压爆它,压的它处理不过来任务,让它走到拒绝逻辑里面去,正常来说是会抛出异常的吧?

于是,我把程序稍微改造了一下:

想的是直接来一波大力出奇迹:

结果…

它竟然…

照单全收了,没有异常?

日志一秒打几行,打的很欢乐:

虽然没有出现我预想的拒绝异常,但是我从日志里面还是看出了一点点端倪。

比如我就发现这个 taks 最多就到 8:

朋友们,你说这是啥意思?

是不是就是说这个我正在寻找的线程池的核心线程数的配置是 8 ?

什么,你问我为什么不能是最大线程数?

有可能吗?

当然有可能。但是我 10000 个任务发过来,没有触发线程池拒绝策略,刚好把最大线程池给用完了?

也就是说这个线程池的配置是队列长度 9992,最大线程数 8 ?

这也太巧合了且不合理了吧?

所以我觉得核心线程数配置是 8 ,队列长度应该是 Integer.MAX_VALUE。

为了证实我的猜想,我把请求改成了这样:

num=一千万。

通过 jconsole 观察堆内存使用情况:

那叫一个飙升啊,点击【执行GC】按钮也没有任何缓解。

也从侧面证明了:任务有可能都进队列里面排队了,导致内存飙升。

虽然,我现在还不知道它的配置是什么,但是经过刚刚的黑盒测试,我有正当的理由怀疑:

默认的线程池有导致内存溢出的风险。

但是,同时也意味着我想从让它抛出异常,从而自己暴露在我面前的骚想法落空。

怼源码

前面的思路走不通,老老实实的开始怼源码吧。

我是从这个注解开始怼的:

点进这个注解之后,几段英文,不长,我从里面获取到了一个关键信息:

主要关注我画线的地方。

In terms of target method signatures, any parameter types are supported.

在目标方法的签名中,入参是任何类型都支持的。

多说一句:这里说到目标方法,说到 target,大家脑海里面应该是要立刻出现一个代理对象的概念的。

上面这句话好理解,甚至感觉是一句废话。

但是,它紧跟了一个 However:

However, the return type is constrained to either void or Future.

constrained,受限制,被约束的意思。

这句话是说:返回类型被限制为 void 或者 Future。

啥意思呢?

那我偏要返回一个 String 呢?

WTF,打印出来的居然是 null !?

那这里如果我返回一个对象,岂不是很容易爆出空指针异常?

看完注解上的注释之后,我发现了第二个隐藏的坑:

如果被 @Async 注解修饰的方法,返回值只能是 void 或者 Future。

void 就不说了,说说这个 Future。

看我划线的另外一句:

it will have to return a temporary {@code Future} handle that just passes a value through: e.g. Spring’s {@link AsyncResult}

上有一个 temporary,是四级词汇啊,应该认识的,就是短暂的、暂时的意思。

temporary worker,临时工,明白吧。

所以意思就是如果你要返回值,你就用 AsyncResult 对象来包一下,这个 AsyncResult 就是 temporary worker。

就像这样:

接着我们把目光放到注解的 value 属性上:

这个注解,看注释上面的意思,就是说这个应该填一个线程池的 bean 名称,相当于指定线程池的意思。

也不知道理解的对不对,等会写个方法验证一下就知道了。

好了,到现在,我把信息整理汇总一下。

  • 我之前完全不懂这个注解,现在我有一个 Demo 了,搭建 Demo 的时候我发现除了 @Async 注解之外,还需要加上 @EnableAsync 注解,比如加在启动类上。
  • 然后把这个默认的线程池当做黑盒测试了一把,我怀疑它的核心线程数默认是 8,队列长度无线长。有内存溢出的风险。
  • 通过阅读 @Async 上的注解,我发现返回值只能是 void 或者 Future 类型,否则即使返回了其他值,不会报错,但是返回的值是 null,有空指针风险。
  • @Async 注解中有一个 value 属性,看注释应该是可以指定自定义线程池的。

接下来我把要去探索的问题排个序,只聚焦到 @Async 的相关问题上:

  • 1.默认线程池的具体配置是什么?
  • 2.源码是怎么做到只支持 void 和 Future 的?
  • 3.value 属性是干什么用的?

具体配置是啥?

我找到具体配置其实是一个很快的过程。

因为这个类的 value 参数简直太友好了:

五处调用的地方,其中四处都是注释。

有效的调用就这一个地方,直接先打上断点再说:

org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier

发起调用之后,果然跑到了断点这个地方:

顺着断点往下调试,就会来到这个地方:

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor

这个代码结构非常的清晰。

编号为 ① 的地方,是获取对应方法上的 @Async 注解的 value 值。这个值其实就是 bean 名称,如果不为空则从 Spring 容器中获取对应的 bean。

如果 value 是没有值的,也就是我们 Demo 的这种情况,会走到编号为 ② 的地方。

这个地方就是我要找的默认的线程池。

最后,不论是默认的线程池还是 Spring 容器中我们自定义的线程池。

都会以方法为维度,在 map 中维护方法和线程池的映射关系。

也就是编号为 ③ 的这一步,代码中的 executors 就是一个 map:

所以,我要找的东西,就是编号为 ② 的这个地方的逻辑。

这里面主要是一个 defaultExecutor 对象:

这个玩意是一个函数式编程,所以如果你不知道这个玩意是干什么的,调试起来可能有点懵逼:

我建议你去恶补一下, 10 分钟就能入门。

最终你会调试到这个地方来:

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor

这个代码就有点意思了,就是从 BeanFactory 里面获取一个默认的线程池相关的 Bean 出来。流程很简单,日志也打印的很清楚,就不赘述了。

但是我想说的有意思的点是,我不知道你看到这份代码,有没有看出一丝丝双亲委派内味。

都是利用异常,在异常里面处理逻辑。

就上面这“垃圾”代码,直接就触犯了阿里开发规范中的两大条:

在源码里面这就是好代码。

在业务流程里面,这就是违反了规范。

所以,说一句题外话。

就是阿里开发规范我个人感觉,其实是针对我们写业务代码的同事一个最佳实践。

但是当把这个尺度拉到中间件、基础组件、框架源码的范围时,就会出现一点水土不服的症状,这个东西见仁见智,我是觉得阿里开发规范的 idea 插件,对于我这样写增删查改的程序员来说,是真的香。

不说远了,我们还是回来看看获取到的这个线程池:

这不就找到我想要的东西了吗,这个线程池的相关参数都可以看到了。

也证实了我之前猜想:

我觉得核心线程数配置是 8 ,队列长度应该是 Integer.MAX_VALUE。

但是,现在我是直接从 BeanFactory 获取到了这个线程池的 Bean,那么这个 Bean 是什么时候注入的呢?

朋友们,这还不简单吗?

我都已经拿到这个 Bean 的 beanName 了,就是 applicationTaskExecutor,但凡你把 Spring 获取 bean 的流程的八股文背的熟练一点,你都知道在这个地方打上断点,加上调试条件,慢慢去 Debug 就知道了:

org.springframework.beans.factory.support.AbstractBeanFactory#getBean(java.lang.String)

假设你就是不知道在上面这个地方打断点去调试呢?

再说一个简单粗暴的方法,你都拿到 beanName 了,在代码里面一搜不就出来了嘛。

简单粗暴效果好:

org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration

都找到这个类了,随便打个断点,就可以开始调试了。

再说一个骚一点的操作。

假设我现在连 beaName 都不知道,但是我知道它肯定是一个被 Spring 管理的线程池。

那么我就获取项目里面所有被 Spring 管理的线程池,总有一个得是我要找的吧?

你看下面截图,当前这个 bean 不就是我要找的 applicationTaskExecutor 吗?

这都是一些野路子,骚操作,知道就好,有时候多个排查思路。

返回类型的支持

前面我们卷完了第一个关于配置的问题。

接下来,我们看另外一个前面提出的问题:

源码是怎么做到只支持 void 和 Future 的?

答案就藏在这个方法里面:

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

标号为 ① 的地方,其实就是我们前面分析的从 map 里面拿 method 对应的线程池的方法。

拿到线程池之后来到标号为 ② 的地方,就是封装一个 Callable 对象。

那么是把什么封装到 Callable 对象里面呢?

这个问题先按下不表,我们先牢牢的围绕我们的问题往下走,不然问题会越来越多。

标号为 ③ 的地方,doSubmit,见名知意,这个地方就是执行任务的地方了。

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit

其实这里就是我要找的答案。

你看这个方法的入参 returnType 是 String,其实就是被 @Async 注解修饰的 asyncSay 方法。

你要不信,我可以带你看看前一个调用栈,这里可以看到具体的方法:

怎么样,没有骗你吧。

所以,现在你再看 doSubmit 方法拿着这个方法的返回类型干啥了。

一共四个分支,前面三个都是判断是否是 Future 类型的。

其中的 ListenableFuture 和 CompletableFuture 都是继承自 Future 的。

这个两个类在 @Async 注解的方法注释里面也提到了:

而我们的程序走到了最后的一个 else,含义就是返回值不是 Future 类型的。

那么你看它干了啥事儿?

直接把任务 submit 到线程池之后,就返回了一个 null。

这可不得爆出空指针异常吗?

到这个地方,我们也解决了这个问题:

源码是怎么做到只支持 void 和 Future 的?

其实道理很简单,我们正常的使用线程池提交不也就这两个返回类型吗?

用 submit 的方式提交,返回一个 Future,把结果封装到 Future 里面:

用 execute 的方式提交,没有返回值:

而框架通过一个简单的注解帮我们实现异步化,它玩的再花里胡哨 ,就算是玩出花来了,它也得遵守线程池提交的底层原理啊。

所以,源码为什么只支持 void 和 Future 的返回类型?

因为底层的线程池只支持这两种类型的返回。

只是它的做法稍微有点坑,直接把其他的返回类型的返回值都处理为 null 了。

你还别不服,谁叫你不读注释上的说明呀。

另外,我发现这个地方还有个小的优化点:

当它走到这个方法的时候,返回值已经明确是 null 了。

为什么还用 executor.submit(task) 提交任务呢?

用 execute 就行了啊。

区别,你问我区别?

不是刚刚才说了吗, submit 方法是有返回值的。

虽然你不用,但是它还是会去构建一个返回的 Future 对象呀。

然而构建出来了,也没用上呀。

所以直接用 execute 提交就行了。

少生成一个 Future 对象,算不算优化?

有一说一,不算什么有价值的优化,但是说出去可是优化过 Spring 的源码的,装逼够用了。

接着,再说一下我们前面按下不表的部分,这里编号为 ② 的地方封装的到底是什么?

其实这个问题用脚指头应该也猜到了:

只是我单独拧出来说的原因是我要给你证明,这里返回的 result 就是我们方法返回的真实的值。

只是判断了一下类型不是 Future 的话就不做处理,比如我这里其实是返回了 hi:1 字符串的,只是不符合条件,就被扔掉了:

另外,idea 还是很智能的,它会提示你这个地方的返回值是有问题的:

甚至修改方法都给你标出来了,你只需要一点,它就给你重新改好了。

对于为什么要这么改,现在我们已经拿捏的非常清楚了。

知其然,也知其所以然。

@Async 注解的 value

接下来我们看看 @Async 注解的 value 属性是干什么的。

其实在前面我已经悄悄的提到了,只是一句话就带过了,就是这个地方:

前面说编号为 ① 的地方,是获取对应方法上的 @Async 注解的 value 值。这个值其实就是 bean 名称,如果不为空则从 Spring 容器中获取对应的 bean。

然后我就直接分析到标号为 ② 的地方了。

现在我们重新看看标号为 ① 的地方。

我也重新安排一个测试用例去验证我的想法。

反正 value 值应该是 Spring 的 bean 名称,而且这个 bean 一定是一个线程池对象,这个没啥说的。

所以,我把 Demo 程序修改为这样:

再次跑起来,跑到这个断点的地方,就和我们默认的情况不一样了,这个时候 qualifier 有值了:

接下来就是去 beanFactory 里面拿名字为 whyThreadPool 的 bean 了。

最后,拿出来的线程池就是我自定义的这个线程池:

这个其实是一个很简单的探索过程,但是这背后蕴涵了一个道理。

就是之前有同学问我的这个问题:

其实这个问题挺有代表性的,很多同学都认为线程池不能滥用,一个项目共用一个就好了。

线程池确实不能滥用,但是一个项目里面确实是可以有多个自定义线程池的。

根据你的业务场景来划分。

比如举个简单的例子,业务主流程上可以用一个线程池,但是当主流程中的某个环节出问题了,假设需要发送预警短信。

发送预警短信的这个操作,就可以用另外一个线程池来做。

它们可以共用一个线程池吗?

可以,能用。

但是会出现什么问题呢?

假设项目中某个业务出问题了,在不断的,疯狂的发送预警短信,甚至把线程池都占满了。

这个时候如果主流程的业务和发送短信用的是同一个线程池,会出现什么美丽的场景?

是不是一提交任务,就直接走到拒绝策略里面去了?

预警短信发送这个附属功能,导致了业务不可以,本末倒置的了吧?

所以,建议使用两个不同的线程池,各司其职。

这其实就是听起来很高大上的线程池隔离技术。

那么落到 @Async 注解上是怎么回事呢?

其实就是这样的:

然后,还记得我们前面提到的那个维护方法和线程池的映射关系的 map 吗?

就是它:

现在,我把程序跑起来调用一下上面的三个方法,目的是为了把值给放进去这个 map:

看明白了吗?

再次复述一次这句话:

以方法维度维护方法和线程池之间的关系。

现在,我对于 @Async 这个注解算是有了一点点的了解,我觉得它也还是很可爱的。后面也许我会考虑在项目里面把它给用起来。毕竟它更加符合 SpringBoot 的基于注解开发的编程理念。

最后说一句

好了,看到了这里了,点赞、关注随便安排一个吧,要是你都安排上我也不介意。写文章很累的,需要一点正反馈。

给各位读者朋友们磕一个了:

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Springboot 日志、配置文件、接口数据如何脱敏?老鸟

发表于 2021-09-06

一、前言

核心隐私数据无论对于企业还是用户来说尤其重要,因此要想办法杜绝各种隐私数据的泄漏。下面陈某带大家从以下三个方面讲解一下隐私数据如何脱敏,也是日常开发中需要注意的:

  1. 配置文件数据脱敏
  2. 接口返回数据脱敏
  3. 日志文件数据脱敏

文章目录如下:

二、配置文件如何脱敏?

经常会遇到这样一种情况:项目的配置文件中总有一些敏感信息,比如数据源的url、用户名、密码….这些信息一旦被暴露那么整个数据库都将会被泄漏,那么如何将这些配置隐藏呢?

以前都是手动将加密之后的配置写入到配置文件中,提取的时候再手动解密,当然这是一种思路,也能解决问题,但是每次都要手动加密、解密不觉得麻烦吗?

公众号:码猿技术专栏

今天介绍一种方案,让你在无感知的情况下实现配置文件的加密、解密。利用一款开源插件:jasypt-spring-boot。项目地址如下:

1
java复制代码https://github.com/ulisesbocchio/jasypt-spring-boot

使用方法很简单,整合Spring Boot 只需要添加一个starter。

1. 添加依赖

1
2
3
4
5
xml复制代码<dependency>
<groupId>com.github.ulisesbocchio</groupId>
<artifactId>jasypt-spring-boot-starter</artifactId>
<version>3.0.3</version>
</dependency>

2. 配置秘钥

在配置文件中添加一个加密的秘钥(任意),如下:

1
2
3
yaml复制代码jasypt:
encryptor:
password: Y6M9fAJQdU7jNp5MW

当然将秘钥直接放在配置文件中也是不安全的,我们可以在项目启动的时候配置秘钥,命令如下:

1
shell复制代码java -jar xxx.jar  -Djasypt.encryptor.password=Y6M9fAJQdU7jNp5MW

3. 生成加密后的数据

这一步骤是将配置明文进行加密,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringbootJasyptApplicationTests {

/**
* 注入加密方法
*/
@Autowired
private StringEncryptor encryptor;

/**
* 手动生成密文,此处演示了url,user,password
*/
@Test
public void encrypt() {
String url = encryptor.encrypt("jdbc\\:mysql\\://127.0.0.1\\:3306/test?useUnicode\\=true&characterEncoding\\=UTF-8&zeroDateTimeBehavior\\=convertToNull&useSSL\\=false&allowMultiQueries\\=true&serverTimezone=Asia/Shanghai");
String name = encryptor.encrypt("root");
String password = encryptor.encrypt("123456");
System.out.println("database url: " + url);
System.out.println("database name: " + name);
System.out.println("database password: " + password);
Assert.assertTrue(url.length() > 0);
Assert.assertTrue(name.length() > 0);
Assert.assertTrue(password.length() > 0);
}
}

上述代码对数据源的url、user、password进行了明文加密,输出的结果如下:

1
2
3
4
5
properties复制代码database url: szkFDG56WcAOzG2utv0m2aoAvNFH5g3DXz0o6joZjT26Y5WNA+1Z+pQFpyhFBokqOp2jsFtB+P9b3gB601rfas3dSfvS8Bgo3MyP1nojJgVp6gCVi+B/XUs0keXPn+pbX/19HrlUN1LeEweHS/LCRZslhWJCsIXTwZo1PlpXRv3Vyhf2OEzzKLm3mIAYj51CrEaN3w5cMiCESlwvKUhpAJVz/uXQJ1spLUAMuXCKKrXM/6dSRnWyTtdFRost5cChEU9uRjw5M+8HU3BLemtcK0vM8iYDjEi5zDbZtwxD3hA=

database name: L8I2RqYPptEtQNL4x8VhRVakSUdlsTGzEND/3TOnVTYPWe0ZnWsW0/5JdUsw9ulm

database password: EJYCSbBL8Pmf2HubIH7dHhpfDZcLyJCEGMR9jAV3apJtvFtx9TVdhUPsAxjQ2pnJ

4. 将加密后的密文写入配置

jasypt默认使用ENC()包裹,此时的数据源配置如下:

1
2
3
4
5
6
7
8
yaml复制代码spring:
datasource:
# 数据源基本配置
username: ENC(L8I2RqYPptEtQNL4x8VhRVakSUdlsTGzEND/3TOnVTYPWe0ZnWsW0/5JdUsw9ulm)
password: ENC(EJYCSbBL8Pmf2HubIH7dHhpfDZcLyJCEGMR9jAV3apJtvFtx9TVdhUPsAxjQ2pnJ)
driver-class-name: com.mysql.jdbc.Driver
url: ENC(szkFDG56WcAOzG2utv0m2aoAvNFH5g3DXz0o6joZjT26Y5WNA+1Z+pQFpyhFBokqOp2jsFtB+P9b3gB601rfas3dSfvS8Bgo3MyP1nojJgVp6gCVi+B/XUs0keXPn+pbX/19HrlUN1LeEweHS/LCRZslhWJCsIXTwZo1PlpXRv3Vyhf2OEzzKLm3mIAYj51CrEaN3w5cMiCESlwvKUhpAJVz/uXQJ1spLUAMuXCKKrXM/6dSRnWyTtdFRost5cChEU9uRjw5M+8HU3BLemtcK0vM8iYDjEi5zDbZtwxD3hA=)
type: com.alibaba.druid.pool.DruidDataSource

上述配置是使用默认的prefix=ENC(、suffix=),当然我们可以根据自己的要求更改,只需要在配置文件中更改即可,如下:

1
2
3
4
5
6
yaml复制代码jasypt:
encryptor:
## 指定前缀、后缀
property:
prefix: 'PASS('
suffix: ')'

那么此时的配置就必须使用PASS()包裹才会被解密,如下:

1
2
3
4
5
6
7
8
yaml复制代码spring:
datasource:
# 数据源基本配置
username: PASS(L8I2RqYPptEtQNL4x8VhRVakSUdlsTGzEND/3TOnVTYPWe0ZnWsW0/5JdUsw9ulm)
password: PASS(EJYCSbBL8Pmf2HubIH7dHhpfDZcLyJCEGMR9jAV3apJtvFtx9TVdhUPsAxjQ2pnJ)
driver-class-name: com.mysql.jdbc.Driver
url: PASS(szkFDG56WcAOzG2utv0m2aoAvNFH5g3DXz0o6joZjT26Y5WNA+1Z+pQFpyhFBokqOp2jsFtB+P9b3gB601rfas3dSfvS8Bgo3MyP1nojJgVp6gCVi+B/XUs0keXPn+pbX/19HrlUN1LeEweHS/LCRZslhWJCsIXTwZo1PlpXRv3Vyhf2OEzzKLm3mIAYj51CrEaN3w5cMiCESlwvKUhpAJVz/uXQJ1spLUAMuXCKKrXM/6dSRnWyTtdFRost5cChEU9uRjw5M+8HU3BLemtcK0vM8iYDjEi5zDbZtwxD3hA=)
type: com.alibaba.druid.pool.DruidDataSource

5. 总结

jasypt还有许多高级用法,比如可以自己配置加密算法,具体的操作可以参考Github上的文档。

三、接口返回数据如何脱敏?

通常接口返回值中的一些敏感数据也是要脱敏的,比如身份证号、手机号码、地址…..通常的手段就是用*隐藏一部分数据,当然也可以根据自己需求定制。

言归正传,如何优雅的实现呢?有两种实现方案,如下:

  • 整合Mybatis插件,在查询的时候针对特定的字段进行脱敏
  • 整合Jackson,在序列化阶段对特定字段进行脱敏
  • 基于Sharding Sphere实现数据脱敏,查看之前的文章:基于Sharding Sphere实现数据“一键脱敏”

第一种方案网上很多实现方式,下面演示第二种,整合Jackson。

1. 自定义一个Jackson注解

需要自定义一个脱敏注解,一旦有属性被标注,则进行对应得脱敏,如下:

1
2
3
4
5
6
7
8
9
10
11
java复制代码/**
* 自定义jackson注解,标注在属性上
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@JacksonAnnotationsInside
@JsonSerialize(using = SensitiveJsonSerializer.class)
public @interface Sensitive {
//脱敏策略
SensitiveStrategy strategy();
}

2. 定制脱敏策略

针对项目需求,定制不同字段的脱敏规则,比如手机号中间几位用*替代,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码/**
* 脱敏策略,枚举类,针对不同的数据定制特定的策略
*/
public enum SensitiveStrategy {
/**
* 用户名
*/
USERNAME(s -> s.replaceAll("(\\S)\\S(\\S*)", "$1*$2")),
/**
* 身份证
*/
ID_CARD(s -> s.replaceAll("(\\d{4})\\d{10}(\\w{4})", "$1****$2")),
/**
* 手机号
*/
PHONE(s -> s.replaceAll("(\\d{3})\\d{4}(\\d{4})", "$1****$2")),
/**
* 地址
*/
ADDRESS(s -> s.replaceAll("(\\S{3})\\S{2}(\\S*)\\S{2}", "$1****$2****"));


private final Function<String, String> desensitizer;

SensitiveStrategy(Function<String, String> desensitizer) {
this.desensitizer = desensitizer;
}

public Function<String, String> desensitizer() {
return desensitizer;
}
}

以上只是提供了部分,具体根据自己项目要求进行配置。

3. 定制JSON序列化实现

下面将是重要实现,对标注注解@Sensitive的字段进行脱敏,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码/**
* 序列化注解自定义实现
* JsonSerializer<String>:指定String 类型,serialize()方法用于将修改后的数据载入
*/
public class SensitiveJsonSerializer extends JsonSerializer<String> implements ContextualSerializer {
private SensitiveStrategy strategy;

@Override
public void serialize(String value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeString(strategy.desensitizer().apply(value));
}

/**
* 获取属性上的注解属性
*/
@Override
public JsonSerializer<?> createContextual(SerializerProvider prov, BeanProperty property) throws JsonMappingException {

Sensitive annotation = property.getAnnotation(Sensitive.class);
if (Objects.nonNull(annotation)&&Objects.equals(String.class, property.getType().getRawClass())) {
this.strategy = annotation.strategy();
return this;
}
return prov.findValueSerializer(property.getType(), property);

}
}

4. 定义Person类,对其数据脱敏

使用注解@Sensitive注解进行数据脱敏,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码@Data
public class Person {
/**
* 真实姓名
*/
@Sensitive(strategy = SensitiveStrategy.USERNAME)
private String realName;
/**
* 地址
*/
@Sensitive(strategy = SensitiveStrategy.ADDRESS)
private String address;
/**
* 电话号码
*/
@Sensitive(strategy = SensitiveStrategy.PHONE)
private String phoneNumber;
/**
* 身份证号码
*/
@Sensitive(strategy = SensitiveStrategy.ID_CARD)
private String idCard;
}

5. 模拟接口测试

以上4个步骤完成了数据脱敏的Jackson注解,下面写个controller进行测试,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@RestController
public class TestController {
@GetMapping("/test")
public Person test(){
Person user = new Person();
user.setRealName("不才陈某");
user.setPhoneNumber("19796328206");
user.setAddress("浙江省杭州市温州市....");
user.setIdCard("4333333333334334333");
return user;
}
}

调用接口查看数据有没有正常脱敏,结果如下:

1
2
3
4
5
6
json复制代码{
"realName": "不*陈某",
"address": "浙江省****市温州市..****",
"phoneNumber": "197****8206",
"idCard": "4333****34333"
}

6. 总结

数据脱敏有很多种实现方式,关键是哪种更加适合,哪种更加优雅…..

四、日志文件如何数据脱敏?

上面讲了配置文件、接口返回值的数据脱敏,现在总该轮到日志脱敏了。项目中总避免不了打印日志,肯定会涉及到一些敏感数据被明文打印出来,那么此时就需要过滤掉这些敏感数据(身份证、号码、用户名…..)。

关于Spring Boot 日志方面的问题有不理解的可以看我之前的文章:Spring Boot第三弹,一文带你搞懂日志如何配置?、Spring Boot第二弹,配置文件怎么造?。

下面以log4j2这款日志为例讲解一下日志如何脱敏,其他日志框架大致思路一样。

1. 添加log4j2日志依赖

Spring Boot 默认日志框架是logback,但是我们可以切换到log4j2,依赖如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!-- 去掉springboot默认配置 -->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--使用log4j2替换 LogBack-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

2. 在/resource目录下新建log4j2.xml配置

log4j2的日志配置很简单,只需要在/resource文件夹下新建一个log4j2.xml配置文件,内容如下图:

关于每个节点如何配置,含义是什么,在我上面的两篇文章中有详细的介绍。

上图的配置并没有实现数据脱敏,这是普通的配置,使用的是PatternLayout

3. 自定义PatternLayout实现数据脱敏

步骤2中的配置使用的是PatternLayout实现日志的格式,那么我们也可以自定义一个PatternLayout来实现日志的过滤脱敏。

PatternLayout的类图继承关系如下:

从上图中可以清楚的看出来,PatternLayout继承了一个抽象类AbstractStringLayout,因此想要自定义只需要继承这个抽象类即可。

1、创建CustomPatternLayout,继承抽象类AbstractStringLayout

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
java复制代码/**
* log4j2 脱敏插件
* 继承AbstractStringLayout
**/
@Plugin(name = "CustomPatternLayout", category = Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
public class CustomPatternLayout extends AbstractStringLayout {


public final static Logger logger = LoggerFactory.getLogger(CustomPatternLayout.class);
private PatternLayout patternLayout;


protected CustomPatternLayout(Charset charset, String pattern) {
super(charset);
patternLayout = PatternLayout.newBuilder().withPattern(pattern).build();
initRule();
}

/**
* 要匹配的正则表达式map
*/
private static Map<String, Pattern> REG_PATTERN_MAP = new HashMap<>();
private static Map<String, String> KEY_REG_MAP = new HashMap<>();


private void initRule() {
try {
if (MapUtils.isEmpty(Log4j2Rule.regularMap)) {
return;
}
Log4j2Rule.regularMap.forEach((a, b) -> {
if (StringUtils.isNotBlank(a)) {
Map<String, String> collect = Arrays.stream(a.split(",")).collect(Collectors.toMap(c -> c, w -> b, (key1, key2) -> key1));
KEY_REG_MAP.putAll(collect);
}
Pattern compile = Pattern.compile(b);
REG_PATTERN_MAP.put(b, compile);
});

} catch (Exception e) {
logger.info(">>>>>> 初始化日志脱敏规则失败 ERROR:{}", e);
}

}

/**
* 处理日志信息,进行脱敏
* 1.判断配置文件中是否已经配置需要脱敏字段
* 2.判断内容是否有需要脱敏的敏感信息
* 2.1 没有需要脱敏信息直接返回
* 2.2 处理: 身份证 ,姓名,手机号敏感信息
*/
public String hideMarkLog(String logStr) {
try {
//1.判断配置文件中是否已经配置需要脱敏字段
if (StringUtils.isBlank(logStr) || MapUtils.isEmpty(KEY_REG_MAP) || MapUtils.isEmpty(REG_PATTERN_MAP)) {
return logStr;
}
//2.判断内容是否有需要脱敏的敏感信息
Set<String> charKeys = KEY_REG_MAP.keySet();
for (String key : charKeys) {
if (logStr.contains(key)) {
String regExp = KEY_REG_MAP.get(key);
logStr = matchingAndEncrypt(logStr, regExp, key);
}
}
return logStr;
} catch (Exception e) {
logger.info(">>>>>>>>> 脱敏处理异常 ERROR:{}", e);
//如果抛出异常为了不影响流程,直接返回原信息
return logStr;
}
}

/**
* 正则匹配对应的对象。
*
* @param msg
* @param regExp
* @return
*/
private static String matchingAndEncrypt(String msg, String regExp, String key) {
Pattern pattern = REG_PATTERN_MAP.get(regExp);
if (pattern == null) {
logger.info(">>> logger 没有匹配到对应的正则表达式 ");
return msg;
}
Matcher matcher = pattern.matcher(msg);
int length = key.length() + 5;
boolean contains = Log4j2Rule.USER_NAME_STR.contains(key);
String hiddenStr = "";
while (matcher.find()) {
String originStr = matcher.group();
if (contains) {
// 计算关键词和需要脱敏词的距离小于5。
int i = msg.indexOf(originStr);
if (i < 0) {
continue;
}
int span = i - length;
int startIndex = span >= 0 ? span : 0;
String substring = msg.substring(startIndex, i);
if (StringUtils.isBlank(substring) || !substring.contains(key)) {
continue;
}
hiddenStr = hideMarkStr(originStr);
msg = msg.replace(originStr, hiddenStr);
} else {
hiddenStr = hideMarkStr(originStr);
msg = msg.replace(originStr, hiddenStr);
}

}
return msg;
}

/**
* 标记敏感文字规则
*
* @param needHideMark
* @return
*/
private static String hideMarkStr(String needHideMark) {
if (StringUtils.isBlank(needHideMark)) {
return "";
}
int startSize = 0, endSize = 0, mark = 0, length = needHideMark.length();

StringBuffer hideRegBuffer = new StringBuffer("(\\S{");
StringBuffer replaceSb = new StringBuffer("$1");

if (length > 4) {
int i = length / 3;
startSize = i;
endSize = i;
} else {
startSize = 1;
endSize = 0;
}

mark = length - startSize - endSize;
for (int i = 0; i < mark; i++) {
replaceSb.append("*");
}
hideRegBuffer.append(startSize).append("})\\S*(\\S{").append(endSize).append("})");
replaceSb.append("$2");
needHideMark = needHideMark.replaceAll(hideRegBuffer.toString(), replaceSb.toString());
return needHideMark;
}


/**
* 创建插件
*/
@PluginFactory
public static Layout createLayout(@PluginAttribute(value = "pattern") final String pattern,
@PluginAttribute(value = "charset") final Charset charset) {
return new CustomPatternLayout(charset, pattern);
}


@Override
public String toSerializable(LogEvent event) {
return hideMarkLog(patternLayout.toSerializable(event));
}

}

关于其中的一些细节,比如@Plugin、@PluginFactory这两个注解什么意思?log4j2如何实现自定义一个插件,这里不再详细介绍,不是本文重点,有兴趣的可以查看log4j2的官方文档。

2、自定义自己的脱敏规则

上述代码中的Log4j2Rule则是脱敏规则静态类,我这里是直接放在了静态类中配置,实际项目中可以设置到配置文件中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码/**
* 现在拦截加密的日志有三类:
* 1,身份证
* 2,姓名
* 3,身份证号
* 加密的规则后续可以优化在配置文件中
**/
public class Log4j2Rule {

/**
* 正则匹配 关键词 类别
*/
public static Map<String, String> regularMap = new HashMap<>();
/**
* TODO 可配置
* 此项可以后期放在配置项中
*/
public static final String USER_NAME_STR = "Name,name,联系人,姓名";
public static final String USER_IDCARD_STR = "empCard,idCard,身份证,证件号";
public static final String USER_PHONE_STR = "mobile,Phone,phone,电话,手机";

/**
* 正则匹配,自己根据业务要求自定义
*/
private static String IDCARD_REGEXP = "(\\d{17}[0-9Xx]|\\d{14}[0-9Xx])";
private static String USERNAME_REGEXP = "[\\u4e00-\\u9fa5]{2,4}";
private static String PHONE_REGEXP = "(?<!\\d)(?:(?:1[3456789]\\d{9})|(?:861[356789]\\d{9}))(?!\\d)";

static {
regularMap.put(USER_NAME_STR, USERNAME_REGEXP);
regularMap.put(USER_IDCARD_STR, IDCARD_REGEXP);
regularMap.put(USER_PHONE_STR, PHONE_REGEXP);
}

}

经过上述两个步骤,自定义的PatternLayout已经完成,下面将是改写log4j2.xml这个配置文件了。

4. 修改log4j2.xml配置文件

其实这里修改很简单,原配置文件是直接使用PatternLayout进行日志格式化的,那么只需要将默认的<PatternLayout/>这个节点替换成<CustomPatternLayout/>,如下图:

直接全局替换掉即可,至此,这个配置文件就修改完成了。

5. 演示效果

在步骤3这边自定义了脱敏规则静态类Log4j2Rule,其中定义了姓名、身份证、号码这三个脱敏规则,如下:

下面就来演示这三个规则能否正确脱敏,直接使用日志打印,代码如下:

1
2
3
4
java复制代码@Test
public void test3(){
log.debug("身份证:{},姓名:{},电话:{}","320829112334566767","不才陈某","19896327106");
}

控制台打印的日志如下:

1
java复制代码身份证:320829******566767,姓名:不***,电话:198*****106

哦豁,成功了,so easy!!!

6. 总结

日志脱敏的方案很多,陈某也只是介绍一种常用的,有兴趣的可以研究一下。

五、总结

本篇文章从三个维度介绍了隐私数据的脱敏实现方案,码字不易,赶紧点赞收藏吧!!!

源码已经上传GitHub,需要的公众号码猿技术专栏,回复关键词数据脱敏获取。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 中如何实现线程间通信 1 如何让两个线程依次执行

发表于 2021-09-05

世界以痛吻我,要我报之以歌 —— 泰戈尔《飞鸟集》

虽然通常每个子线程只需要完成自己的任务,但是有时我们希望多个线程一起工作来完成一个任务,这就涉及到线程间通信。

关于线程间通信本文涉及到的方法和类包括:thread.join()、object.wait()、object.notify()、CountdownLatch、CyclicBarrier、FutureTask、Callable。

接下来将用几个例子来介绍如何在Java中实现线程间通信:

  1. 如何让两个线程依次执行,即一个线程等待另一个线程执行完成后再执行?
  2. 如何让两个线程以指定的方式有序相交执行?
  3. 有四个线程:A、B、C、D,如何实现 D 在 A、B、C 都同步执行完毕后执行?
  4. 三个运动员分开准备,然后在每个人准备好后同时开始跑步。
  5. 子线程完成任务后,将结果返回给主线程。
  1. 如何让两个线程依次执行?

假设有两个线程:A 和 B,这两个线程都可以按照顺序打印数字,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class Test01 {

public static void main(String[] args) throws InterruptedException {
demo1();
}

public static void demo1() {
Thread a = new Thread(() -> {
printNumber("A");
});

Thread b = new Thread(() -> {
printNumber("B");
});

a.start();
b.start();
}

public static void printNumber(String threadName) {
int i = 0;
while (i++ < 3) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadName + " print: " + i);
}
}

}

得到的结果如下:

1
2
3
4
5
6
java复制代码A print: 1
B print: 1
B print: 2
A print: 2
A print: 3
B print: 3

可以看到 A 和 B 同时打印数字,如果我们希望 B 在 A 执行完成之后开始执行,那么可以使用 thread.join() 方法实现,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public static void demo2() {
Thread a = new Thread(() -> {
printNumber("A");
});

Thread b = new Thread(() -> {
System.out.println("B 等待 A 执行");
try {
a.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
printNumber("B");
});

a.start();
b.start();
}

得到的结果如下:

1
2
3
4
5
6
7
java复制代码B 等待 A 执行
A print: 1
A print: 2
A print: 3
B print: 1
B print: 2
B print: 3

我们可以看到该 a.join() 方法会让 B 等待 A 完成打印。

thread.join() 方法的作用就是阻塞当前线程,等待调用 join() 方法的线程执行完毕后再执行后面的代码。

查看 join() 方法的源码,内部是调用了 join(0) ,如下:

1
2
3
java复制代码public final void join() throws InterruptedException {
join(0);
}

查看 join(0) 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码// 注意这里使用了 sychronized 加锁,锁对象是线程的实例对象
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
// 调用 join(0) 执行下面的代码
if (millis == 0) {
// 这里使用 while 循环的目的是为了避免虚假唤醒
// 如果当前线程存活则调用 wait(0), 0 表示永久等待,直到调用 notifyAll() 或者 notify() 方法
// 当线程结束的时候会调用 notifyAll() 方法
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

从源码中可以看出 join(long millis) 方法是通过 wait(long timeout) (Object 提供的方法)方法实现的,调用 wait 方法之前,当前线程必须获得对象的锁,所以此 join 方法使用了 synchronized 加锁,锁对象是线程的实例对象。其中 wait(0)方法会让当前线程阻塞等待,直到另一个线程调用此对象的 notify() 或者 notifyAll() 方法才会继续执行。当调用 join 方法的线程结束的时候会调用 notifyAll() 方法,所以 join() 方法可以实现一个线程等待另一个调用 join() 的线程结束后再执行。

虚假唤醒:一个线程在没有被通知、中断、超时的情况下被唤醒;

虚假唤醒可能导致条件不成立的情况下执行代码,破坏被锁保护的约束关系;

为什么使用 while 循环来避免虚假唤醒:

在 if 块中使用 wait 方法,是非常危险的,因为一旦线程被唤醒,并得到锁,就不会再判断 if 条件而执行 if 语句块外的代码,所以建议凡是先要做条件判断,再 wait 的地方,都使用 while 循环来做,循环会在等待之前和之后对条件进行测试。

  1. 如何让两个线程按照指定的方式有序相交?

如果现在我们希望 B线程在 A 线程打印 1 后立即打印 1,2,3,然后 A 线程继续打印 2,3,那么我们需要更细粒度的锁来控制执行顺序。

在这里,我们可以利用 object.wait() 和 object.notify() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public static void demo3() {
Object lock = new Object();
Thread A = new Thread(() -> {
synchronized (lock) {
System.out.println("A 1");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A 2");
System.out.println("A 3");
}
});

Thread B = new Thread(() -> {
synchronized (lock) {
System.out.println("B 1");
System.out.println("B 2");
System.out.println("B 3");
lock.notify();
}
});

A.start();
B.start();
}

得到的结果如下:

1
2
3
4
5
6
java复制代码A 1
B 1
B 2
B 3
A 2
A 3

上述代码的执行流程如下:

  1. 首先我们创建一个由 A 和 B 共享的对象锁: lock = new Object();
  2. 当A拿到锁时,先打印1,然后调用lock.wait()方法进入等待状态,然后交出锁的控制权;
  3. B 不会被执行,直到 A 调用该lock.wait()方法释放控制权并且 B 获得锁;
  4. B拿到锁后打印1,2,3,然后调用lock.notify()方法唤醒正在等待的A;
  5. A 唤醒后继续打印剩余的 2,3。

为了便于理解,我将上面的代码添加了日志,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public static void demo3() {
Object lock = new Object();
Thread A = new Thread(() -> {
System.out.println("INFO:A 等待获取锁");
synchronized (lock) {
System.out.println("INFO:A 获取到锁");
System.out.println("A 1");
try {
System.out.println("INFO:A 进入 waiting 状态,放弃锁的控制权");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("INFO:A 被 B 唤醒继续执行");
System.out.println("A 2");
System.out.println("A 3");
}
});

Thread B = new Thread(() -> {
System.out.println("INFO:B 等待获取锁");
synchronized (lock) {
System.out.println("INFO:B 获取到锁");
System.out.println("B 1");
System.out.println("B 2");
System.out.println("B 3");
System.out.println("INFO:B 执行结束,调用 notify 方法唤醒 A");
lock.notify();
}
});

A.start();
B.start();
}

得到的结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码INFO:A 等待获取锁
INFO:A 获取到锁
A 1
INFO:A 进入 waiting 状态,放弃锁的控制权
INFO:B 等待获取锁
INFO:B 获取到锁
B 1
B 2
B 3
INFO:B 执行结束,调用 notify 方法唤醒 A
INFO:A 被 B 唤醒继续执行
A 2
A 3
  1. 线程 D 在A、B、C都同步执行完毕后执行

thread.join() 前面介绍的方法允许一个线程在等待另一个线程完成运行后继续执行。但是如果我们将A、B、C依次加入到D线程中,就会让A、B、C依次执行,而我们希望它们三个同步运行。

我们要实现的目标是:A、B、C三个线程可以同时开始运行,各自独立运行完成后通知D;D 不会开始运行,直到 A、B 和 C 都运行完毕。所以我们 CountdownLatch 用来实现这种类型的通信。它的基本用法是:

  1. 创建一个计数器,并设置一个初始值, CountdownLatch countDownLatch = new CountDownLatch(3);
  2. 调用countDownLatch.await()进入等待状态,直到计数值变为0;
  3. 在其他线程调用countDownLatch.countDown(),该方法会将计数值减一;
  4. 当计数器的值变为 0 时,countDownLatch.await()等待线程中的方法会继续执行下面的代码。

实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码public static void runDAfterABC() {
int count = 3;
CountDownLatch countDownLatch = new CountDownLatch(count);
new Thread(() -> {
System.out.println("INFO: D 等待 A B C 运行完成");
try {
countDownLatch.await();
System.out.println("INFO: A B C 运行完成,D 开始运行");
System.out.println("D is working");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

for (char threadName = 'A'; threadName <= 'C' ; threadName++) {
final String name = String.valueOf(threadName);
new Thread(() -> {
System.out.println(name + " is working");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " finished");
countDownLatch.countDown();
}).start();
}
}

得到的结果如下:

1
2
3
4
5
6
7
8
9
java复制代码INFO: D 等待 A B C 运行完成
A is working
B is working
C is working
C finished
B finished
A finished
INFO: A B C 运行完成,D 开始运行
D is working

其实CountDownLatch它本身就是一个倒数计数器,我们把初始的count值设置为3。D运行的时候,首先调用该countDownLatch.await()方法检查计数器的值是否为0,如果不是0则保持等待状态. A、B、C 运行完毕后,分别使用countDownLatch.countDown()方法将倒数计数器减1。计数器将减为 0,然后通知await()方法结束等待,D开始继续执行。

因此,CountDownLatch适用于一个线程需要等待多个线程的情况。

  1. 三个运动员分开准备同时开跑

这一次,A、B、C这三个线程都需要分别准备,等三个线程都准备好后开始同时运行,我们应该如何做到这一点?

CountDownLatch可以用来计数,但完成计数的时候,只有一个线程的一个await()方法会得到响应,所以多线程不能在同一时间被触发。为了达到线程相互等待的效果,我们可以使用该CyclicBarrier,其基本用法为:

  1. 首先创建一个公共对象CyclicBarrier,并设置同时等待的线程数,CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
  2. 这些线程同时开始准备,准备好后,需要等待别人准备好,所以调用cyclicBarrier.await()方法等待别人;
  3. 当指定的需要同时等待的线程都调用了该cyclicBarrier.await()方法时,意味着这些线程准备好了,那么这些线程就会开始同时继续执行。

想象一下有三个跑步者需要同时开始跑步,所以他们需要等待其他人都准备好,实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public static void runABCWhenAllReady() {
int count = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(count);
Random random = new Random();
for (char threadName = 'A'; threadName <= 'C' ; threadName++) {
final String name = String.valueOf(threadName);
new Thread(() -> {
int prepareTime = random.nextInt(10000);
System.out.println(name + " 准备时间:" + prepareTime);
try {
Thread.sleep(prepareTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " 准备好了,等待其他人");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " 开始跑步");
}).start();
}
}

得到结果如下:

1
2
3
4
5
6
7
8
9
java复制代码A 准备时间:1085
B 准备时间:7729
C 准备时间:8444
A 准备好了,等待其他人
B 准备好了,等待其他人
C 准备好了,等待其他人
C 开始跑步
A 开始跑步
B 开始跑步

CyclicBarrier 的作用就是等待多个线程同时执行。

  1. 子线程将结果返回给主线程

在实际开发中,往往我们需要创建子线程来做一些耗时的任务,然后将执行结果传回主线程。那么如何在 Java 中实现呢?

一般在创建线程的时候,我们会把 Runnable 对象传递给 Thread 执行,Runable 的源码如下:

1
2
3
4
java复制代码@FunctionalInterface
public interface Runnable {
public abstract void run();
}

可以看到 Runable 是一个函数式接口,该接口中的 run 方法没有返回值,那么如果要返回结果,可以使用另一个类似的接口 Callable。

函数式接口:只有一个方法的接口

Callable 接口的源码如下:

1
2
3
4
5
6
7
8
9
10
java复制代码@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

可以看出,最大的区别Callable在于它返回的是泛型。

那么接下来的问题是,如何将子线程的结果传回去呢?Java 有一个类,FutureTask,它可以与 一起工作Callable,但请注意,get用于获取结果的方法会阻塞主线程。FutureTask 本质上还是一个 Runnable,所以可以直接传到 Thread 中。

image-20210905183800030.png

比如我们想让子线程计算1到100的总和,并将结果返回给主线程,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public static void getResultInWorker() {
Callable<Integer> callable = () -> {
System.out.println("子任务开始执行");
Thread.sleep(1000);
int result = 0;
for (int i = 0; i <= 100; i++) {
result += i;
}
System.out.println("子任务执行完成并返回结果");
return result;
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();

try {
System.out.println("开始执行 futureTask.get()");
Integer result = futureTask.get();
System.out.println("执行的结果:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

得到的结果如下:

1
2
3
4
java复制代码开始执行 futureTask.get()
子任务开始执行
子任务执行完成并返回结果
执行的结果:5050

可以看出在主线程调用futureTask.get()方法时阻塞了主线程;然后Callable开始在内部执行并返回操作的结果;然后futureTask.get()得到结果,主线程恢复运行。

在这里我们可以了解到,FutureTask和Callable可以直接在主线程中获取子线程的结果,但是它们会阻塞主线程。当然,如果你不希望阻塞主线程,可以考虑使用ExecutorService把FutureTask到线程池来管理执行。

参考文章:

www.tutorialdocs.com/article/jav…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

leetcode-字符串相乘

发表于 2021-09-05

8月每日在leetcode至少做一题,9月份开始有点懈怠了,希望可以继续保持吧。每天学习和进步一点,才能保持感觉,对算法的应用场景上,嗅觉也会更灵敏一点点,有合适的场景就能用起来,毕竟解决了实际的问题,算法才提现价值。

今天做的是leetcode的第43题,题目比较水,也没什么算法的思想,就当练练编程基础吧。

题目

给定两个以字符串形式表示的非负整数 num1 和 num2,返回 num1 和 num2 的乘积,它们的乘积也表示为字符串形式。

示例 1:

输入: num1 = “2”, num2 = “3”

输出: “6”

示例 2:

输入: num1 = “123”, num2 = “456”

输出: “56088”

说明:

num1 和 num2 的长度小于110。

num1 和 num2 只包含数字 0-9。

num1 和 num2 均不以零开头,除非是数字 0 本身。

不能使用任何标准库的大数类型(比如 BigInteger)或直接将输入转换为整数来处理。

思路

看到说明里面的最后一条,我差点笑出声,其实很多年之前在学校刷题的时候也做过类似的,我们有个同学就是靠java的BigDecimal过的。当然这样过就失去意义了,我们还是探寻一下一般的解法。

回到小学的多位数相乘,其实就是把其中一个数字逐位拆开,与另外一个数去相乘,然后再把和相加。这样就从多位数乘法简化成了一位数乘以多位数;然后进一步简化成一位数乘以一位数再相加。

我们现在用程序来解,就是模拟了这个过程,如下图所示
43.png

当然,这道题目做的过程中也有一些注意点:

1、注意补零,逐位相乘的时候,高位数去做乘法时,后面是要补零的,如上图的竖式所示

2、数字可能很长,中间的结果也要用数组表示,用整数的话可能会溢出

3、虽然原字符串遍历完了,但是可能还有从低位进位上来的数组,别丢了

所以,本题还需要一个隐含的函数,就是2个字符串类型的数字相加,这其实就是leetcode第415题,字符串相加

Java版本代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
ini复制代码class Solution {
public String multiply(String num1, String num2) {
String ans = "0";
if ("0".equals(num1) || "0".equals(num2)) {
return ans;
}
int len1 = num1.length();
int len2 = num2.length();
for (int i = len2 - 1; i >= 0; i--) {
int adddition = 0;
int n2 = num2.charAt(i) - '0';
StringBuilder temp = new StringBuilder();
// 因为temp是倒过来的,所以需要先补零
int need = len2 - 1 - i;
while (need-- > 0) {
temp.append("0");
}
for (int j = len1 - 1; j >= 0; j--) {
int n1 = num1.charAt(j) - '0';
int num = n2 * n1 + adddition;
temp.append(num % 10);
adddition = num / 10;
}
if (adddition > 0) {
temp.append(adddition);
}
ans = addStrings(ans, temp.reverse().toString());
}
return ans;
}

public static String addStrings(String num1, String num2) {
int i = num1.length() - 1;
int j = num2.length() - 1;
// 进位
int adddition = 0;
StringBuilder ans = new StringBuilder();
while (i >=0 || j >=0 || adddition > 0) {
int temp = 0;
if (i >= 0) {
temp += num1.charAt(i) - '0';
i--;
}
if (j >= 0) {
temp += num2.charAt(j) - '0';
j--;
}
if (adddition > 0) {
temp += adddition;
}
ans.append(temp % 10);
adddition = temp / 10;
}
return ans.reverse().toString();
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

解析 JDBC 的批处理参数 rewriteBatchedS

发表于 2021-09-05

提升SQL性能的一大原则就是降低访问数据库的次数,能合并下发的SQL是很好的手段。也就是将insert into t values();insert into t values();转换成insert into t values(),();。

在JDBC中,这种转换的大概流程是这样的:

1
java复制代码PreparedStatement preparedStatement = conn.prepareStatement("insert into t (c1,c2) values(?,?)");

判断执行的SQL是否可以做batch重写。这里需要url使用rewriteBatchedStatements=true,会变成insert into t (id) values(1),(2)。这里其实有一些本身应该符合预期的情况却出现rewriteBatched失效的情况。


我们看下JDBC源码:

直接来到 ParseInfo.java,这里是对SQL进行解析进而进行条件判断,JDBC解析SQL的方式比较粗暴,通过字符判断,通过I判断是否是insert语句、通过L判断是否是load data语句、通过on, duplicate, key, update判断是否是on duplicate key update语句,通过?获得preparestatment占位符的位置,通过;判断这个请求包含几条SQL等。

比如占位符的判断,这里会记录下每一个?的位置,用staticSql进行分组,例如insert into t (c1) values (?)会解析成长度为3的数组:

  1. INSERT INTO t (c1,c2) VALUES (
  2. ,
  3. )
1
2
3
4
5
6
7
java复制代码for(i = this.statementStartPos; i < this.statementLength; ++i){
char c = sql.charAt(i);
if ((c == '?')) {
endpointList.add(new int[] { lastParmEnd, i });
lastParmEnd = i + 1;
}
}

之后判断是否可以进行SQL重写,判断条件有3个,最基本的2个条件:

  1. this.numberOfQueries == 1,必须是一条SQL,即不能是sql;sql的形式。
  2. !this.parametersInDuplicateKeyClause,这指的是on duplicate key update 后不能有占位符。举个例子,insert into t (c1) values (?) on duplicate key update c1 = ?是不能重写的,insert into t (c1) values (?) on duplicate key update c1 = 123这种可以。(感觉这里JDBC实现的偷懒了)
    除了这两个条件必须为true,还有canRewrite()函数是判断条件,我们去看看里面的逻辑:
  3. 如果是insert语句,如果SQL中存在select,则不能被重写。
  4. on duplicate key update 语句不能使用 LAST_INSERT_ID。
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码if (StringUtils.startsWithIgnoreCaseAndWs(sql, "INSERT", statementStartPos)) {
if (StringUtils.indexOfIgnoreCase(statementStartPos, sql, "SELECT", "\"'`", "\"'`", StringUtils.SEARCH_MODE__MRK_COM_WS) != -1) {
return false;
}
if (isOnDuplicateKeyUpdate) {
int updateClausePos = StringUtils.indexOfIgnoreCase(locationOfOnDuplicateKeyUpdate, sql, " UPDATE ");
if (updateClausePos != -1) {
return StringUtils.indexOfIgnoreCase(updateClausePos, sql, "LAST_INSERT_ID", "\"'`", "\"'`", StringUtils.SEARCH_MODE__MRK_COM_WS) == -1;
}
}
return true;
}

所以这里有一个比较特殊的case,如果SQL中包含select,则rewriteBatchedStatements失效。比如下面这个例子,表结构如下:

1
2
3
4
5
6
7
8
9
sql复制代码mysql> desc t1;
+------------+----------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------+----------+------+------+---------+-------+
| id | int(11) | YES | | NULL | |
| selectNum | int(11) | YES | | NULL | |
| updateTime | datetime | YES | | NULL | |
+------------+----------+------+------+---------+-------+
3 rows in set (0.01 sec)

使用上面的方式进行批量插入,理想情况到数据库应该是
insert into t (id, selectNum, updateTime) values(1,11,'2021-01-01 11:11:11'),(1,11,'2021-01-01 11:11:11');插入两条数据。

我们debug一下看看数据库中收到的是什么

截屏2021-08-28 下午3.36.04.png

可见,数据库收到的还是单SQL,没有整合成一条语句。所以考虑到batch insert的情况,表结构尽量不要带有select字符。

而正常情况,应该会开始构建batch value结构,是在buildRewriteBatchedParams。

1
2
3
java复制代码if (this.canRewriteAsMultiValueInsert && session.getPropertySet().getBooleanProperty(PropertyKey.rewriteBatchedStatements).getValue()) {
buildRewriteBatchedParams(sql, session, encoding);
}

这里会解析value后的内容,比如insert into t (c1,c2) values (?,?)就是(?,?)。这样SQL和value的parseInfo都有了。

excuteBatch

我们直接看如果符合canRewriteAsMultiValueInsertAtSqlLevel是如何处理的,在executeBatchedInserts。
获取这个insert batch有几个需要做合并的,这里的batchedArgs是程序执行addBatch()时候赋值的。

1
java复制代码int numBatchedArgs = this.query.getBatchedArgs().size();

在prepareBatchedInsertSQL中的((PreparedQuery<?>) this.query).getParseInfo().getParseInfoForBatch(numBatches)对value进行拼接。

JDBC中实现了一个visitor,会将SQL和value的组成merge到一起:

截屏2021-09-05 下午5.31.55.png

  1. INSERT INTO t (c1,c2) VALUES (
  2. ,
  3. ),(
  4. ,
  5. )

结果就是拼出来的是insert into t (c1,c2) values(,),(,)

这里还需要填写占位符,所以程序会来到这里getSqlForBatch,这里用到的sqlStrings就是上面提到的visitor数组,可以看到,循环这个数组,在每个的中间补上?就变成insert into t (c1,c2) values(?,?),(?,?)了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码final byte[][] sqlStrings = this.staticSql;
final int sqlStringsLength = sqlStrings.length;

for (int i = 0; i < sqlStringsLength; i++) {
size += sqlStrings[i].length;
size++; // for the '?'
}

StringBuilder buf = new StringBuilder(size);

for (int i = 0; i < sqlStringsLength - 1; i++) {
buf.append(StringUtils.toString(sqlStrings[i], this.charEncoding));
buf.append("?");
}

buf.append(StringUtils.toString(sqlStrings[sqlStringsLength - 1]));

return buf.toString();

这样整个拼接流程就结束了。可以看到,有的时候虽然我们设置了rewriteBatchedStatements=true,但在某些特殊的语句格式下,拼接依然不能生效,在实现上,感觉JDBC实现的比较偷懒,比如不能有select,这个可能处于考虑到insert into ... select这种情况,但简单粗暴的禁止,也会给很多人带来困惑。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

7张图了解kafka基本概念

发表于 2021-09-05

kafka是apache基金会管理的开源流处理平台(官网kafka.apache.org/),但国内大多数人对其认知基本都是消息队列,所以我们先来了解下什么是消息队列。

消息队列

在这里插入图片描述

消息队列顾名思义就是存储消息的一个队列,消息生产者(producer) 往消息队列中投放消息, **消费者(consumer)**读取消息队列中的内容。在消息队列中的每条消息都会有个位置,就好比数组中的下标(index),在kafka中我们称之为offset。对于生产者而言,有个特殊的offset——LEO(log end offset) ,指向的是消息队列中下一个将被存放消息的位置。

这里重点说下消费者(consumer),一个消息队列当然可以被多个消费者(consumer)读取,每个消费者(consumer)都有唯一一个group-id将其区分开来。kafka也会记录下来每个消费者(consumer)已经读到哪个位置了(offset)。

问:为什么消费者消费的offset是由kafka记录,而不是由消费者自己记录?

主题(Topic)

在这里插入图片描述

上文我们以一份数据为例,讲了什么是消息队列。如果有多份数据(多个队列)该怎么办? 也很简单,kafka中我们可以使用不同的**主题(Topic)**将不同的数据区分开。不同的生产者(producer)可以往不同的Topic中存放数据,不同的消费者(consumer)也可以从不同的Topic中读取数据。

分区(Partation)

在这里插入图片描述

当一份数据非常大的情况下怎么办? 当然是考虑拆分了。在kafka中,你可以设置将一个主题(Topic)拆分成多个不同的分区(Partation),然后以分区(Partation)的维度来管理、生产和消费数据。 拆分带来最明显的好处就是提升吞吐性能,多个分区(Partation)之间并行,互不干扰。
至于怎么拆分,kafka有提供几个默认分区策略 轮询、随机、hash,当然你可以自己实现自己的分区策略,这里就不过多展开了。

消费者组(Consumer-group)

在这里插入图片描述
主题(Topic)做完分区以后,消费者如何消费? 这里就不得不提到消费者组(Consumer-group)的概念,在kafka中,为了保证数据的一致性,同一个分区(Partation)同时只能被一个消费者(consumer)实例消费,为了提升消费者(consumer)的吞吐量,一般都会设置多个消费者(consumer)实例来消费不同的分区(Partation),这些实例共同组成一个消费者组(Consumer-group) ,他们共用一个Group-id。

注意:

  1. 由于同一个分区(Partation)同时只能被一个消费者(consumer)实例消费,所以超过分区(Partation)数量的消费者(consumer)实例个数没有任何意义,多余的消费者(consumer)实例也会被闲置。
  2. 如果消费者组(Consumer-group) 中有实例发生变化(上下线),或者分区(Partation)数量发生变化,都会触发消费者组rebalence。

副本(Replication)

在这里插入图片描述

kafka如何解决数据高可用的问题?在分布式环境下,要想保证数据尽可能不丢失,唯一的方法就是多复制几份放到不同的机器上,复制出来的数据就叫做副本(Replication)。

这里有几个关键词。
HW: high-water,一个特殊的offset,只有在这个offset以下的消息才能被消费者(consumer)读到,高水位的具体值取决于主从副本数据同步的状态,这里不再展开。
ISR: in-sync-replica,处于同步状态的副本集合,是指副本数据和主副本数据相差在一定返回(时间范围或数量范围)之内的副本,当然主副本肯定是一直在ISR中的。 当主副本挂了之后,新的主副本将从ISR中被选出来接替它的工作。

OSR: 和IRS相对应 out-sync-replica,其实就是指那些不在ISR中的副本。

副本主从同步

在这里插入图片描述
当一份数据比复制出多份副本后,肯定得涉及到主从副本的同步在,从副本会定期从主副本拉去最新的数据。 另外需要注意kafka中,只有主副本才会对外提供读和写(高版本kafka从副本提供了有限的读功能),从副本的唯一作用就是给主副本当备胎。

说到主从同步,顺带提一下kafka的ack设置。
kafka中生产者(producer),可以通过request.required.acks参数来设置数据可靠性的级别:

  • 0: 生产者(producer) 不等待来自主副本的确认,发出去即认为发送成功,这种情况效率最高但可能有丢失数据的风险。
  • 1: (默认)生产者(producer) 发出数据后会等待主副本确认收到后,才认为消息发送成功,这种情况下主副本宕机时可能会丢失消息。
  • -1: (或者是all):生产者(producer) 等待ISR中的所有副本都确认接收到数据后才任务消息发送成功,可靠性最高,但因为需要等从副本拉去和确认,效率最低。

Broker

在这里插入图片描述

Kafka是以副本(Replica)维度管理数据的,管理这些数据肯定是需要管理者的,这个管理者就是Broker。Broker会将同一份数据的不同副本(Replication) 调度到不同的机器上,并且在副本(Replication) 数不足时生成新的副本,从而保证在部分Broker宕机后也能保证数据不丢失。

所有的Broker之间也会做一些元数据的相互同步,比如某份主数据在谁哪,从副本要从谁哪去拉取数据……

结语

第一次尝试手绘风讲解kafka入门知识,讲的很粗浅,确实很多细节都没有展开,见谅。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot跨域问题

发表于 2021-09-05

一、同源策略

同源策略是由Netscape提出的一个著名的安全策略,它是浏览器最核心也最基本的安全功能,现在所有支持JavaScript的浏览器都会使用这个策略。所谓同源是指协议、域名以及端口要相同。同源策略是基于安全方面的考虑提出来的,这个策略本身没问题,但是我们在实际开发中,由于各种原因又经常有跨域的需求,传统的跨域方案是JSONP,JSONP虽然能解决跨域但是有一个很大的局限性,那就是只支持GET请求,不支持其他类型的请求,而今天我们说的CORS(跨域源资源共享)(CORS,Cross-origin resource sharing)是一个W3C标准,它是一份浏览器技术的规范,提供了Web服务从不同网域传来沙盒脚本的方法,以避开浏览器的同源策略,这是JSONP模式的现代版。

在Spring框架中,对于CORS也提供了相应的解决方案。

非同源限制

  • 无法读取非同源网页的 Cookie、LocalStorage 和 IndexedDB
  • 无法接触非同源网页的 DOM
  • 无法向非同源地址发送 AJAX 请求

二、java后端实现CORS跨域请求的方式

  1. 返回新的CorsFilter
  2. 重写 WebMvcConfigurer
  3. 使用注解 @CrossOrigin
  4. 手动设置响应头 (HttpServletResponse)
  5. 自定web filter 实现跨域

注意:

  • CorFilter / WebMvConfigurer / @CrossOrigin 需要 SpringMVC 4.2以上版本才支持,对应springBoot 1.3版本以上
  • 上面前两种方式属于全局 CORS 配置,后两种属于局部 CORS配置。如果使用了局部跨域是会覆盖全局跨域的规则,所以可以通过 @CrossOrigin 注解来进行细粒度更高的跨域资源控制。
  • 其实无论哪种方案,最终目的都是修改响应头,向响应头中添加浏览器所要求的数据,进而实现跨域

1.返回新的 CorsFilter(全局跨域)

在任意配置类,返回一个 新的 CorsFIlter Bean ,并添加映射路径和具体的CORS配置路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码@Configuration
public class GlobalCorsConfig {
@Bean
public CorsFilter corsFilter() {
//1. 添加 CORS配置信息
CorsConfiguration config = new CorsConfiguration();
//放行哪些原始域
config.addAllowedOrigin("*");
//是否发送 Cookie
config.setAllowCredentials(true);
//放行哪些请求方式
config.addAllowedMethod("*");
//放行哪些原始请求头部信息
config.addAllowedHeader("*");
//暴露哪些头部信息
config.addExposedHeader("*");
//2. 添加映射路径
UrlBasedCorsConfigurationSource corsConfigurationSource = new UrlBasedCorsConfigurationSource();
corsConfigurationSource.registerCorsConfiguration("/**",config);
//3. 返回新的CorsFilter
return new CorsFilter(corsConfigurationSource);
}
}

2. 重写 WebMvcConfigurer(全局跨域)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Configuration
public class CorsConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
//是否发送Cookie
.allowCredentials(true)
//放行哪些原始域
.allowedOrigins("*")
.allowedMethods(new String[]{"GET", "POST", "PUT", "DELETE"})
.allowedHeaders("*")
.exposedHeaders("*");
}
}

3. 使用注解 (局部跨域)

在控制器(类上)上使用注解 @CrossOrigin:,表示该类的所有方法允许跨域。

1
2
3
4
5
6
7
8
java复制代码@RestController
@CrossOrigin(origins = "*")
public class HelloController {
@RequestMapping("/hello")
public String hello() {
return "hello world";
}
}

在方法上使用注解 @CrossOrigin:

1
2
3
4
5
6
java复制代码@RequestMapping("/hello")
@CrossOrigin(origins = "*")
//@CrossOrigin(value = "http://localhost:8081") //指定具体ip允许跨域
public String hello() {
return "hello world";
}

4. 手动设置响应头(局部跨域)

使用 HttpServletResponse 对象添加响应头(Access-Control-Allow-Origin)来授权原始域,这里 Origin的值也可以设置为 “*”,表示全部放行。

1
2
3
4
5
java复制代码@RequestMapping("/index")
public String index(HttpServletResponse response) {
response.addHeader("Access-Allow-Control-Origin","*");
return "index";
}

5. 使用自定义filter实现跨域

首先编写一个过滤器,可以起名字为MyCorsFilter.java

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Component
public class MyCorsFilter implements Filter {
public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) throws IOException, ServletException {
HttpServletResponse response = (HttpServletResponse) res;
response.setHeader("Access-Control-Allow-Origin", "*");
response.setHeader("Access-Control-Allow-Methods", "POST, GET, OPTIONS, DELETE");
response.setHeader("Access-Control-Max-Age", "3600");
response.setHeader("Access-Control-Allow-Headers", "x-requested-with,content-type");
chain.doFilter(req, res);
}
public void init(FilterConfig filterConfig) {}
public void destroy() {}
}

在web.xml中配置这个过滤器,使其生效

1
2
3
4
5
6
7
8
9
10
xml复制代码<!-- 跨域访问 START-->
<filter>
<filter-name>CorsFilter</filter-name>
<filter-class>com.mesnac.aop.MyCorsFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>CorsFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<!-- 跨域访问 END -->

三、例子

首先创建两个普通的SpringBoot项目,第一个命名为provider提供服务,第二个命名为consumer消费服务,第一个配置端口为8080,第二个配置配置为8081,然后在provider上提供两个hello接口,一个get,一个post,如下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@RestController
public class Provider {
@GetMapping("/hello")
public String hello() {
return "hello";
}

@PostMapping("/hello")
public String hello2() {
return "post hello";
}
}

在consumer的resources/templates目录下创建一个html文件,发送一个简单的ajax请求,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
html复制代码<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>CORS</title>
</head>
<body>
<div id="app"></div>
<input type="button" onclick="btnClick()" value="get_button">
<input type="button" onclick="btnClick2()" value="post_button">
<script>
function btnClick() {
$.get('http://localhost:8080/hello', function (msg) {
$("#app").html(msg);
});
}

function btnClick2() {
$.post('http://localhost:8080/hello', function (msg) {
$("#app").html(msg);
});
}
</script>
</body>
</html>

然后分别启动两个项目,发送请求按钮,观察浏览器控制台如下:

Access to XMLHttpRequest at ‘http://localhost:8080/hello‘ from origin ‘http://localhost:8081‘ has been blocked by CORS policy: No ‘Access-Control-Allow-Origin’ header is present on the requested resource

可以看到,由于同源策略的限制,请求无法发送成功。

使用CORS可以在前端代码不做任何修改的情况下,实现跨域,那么接下来看看在provider中如何配置。首先可以通过@CrossOrigin注解配置某一个方法接受某一个域的请求,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@RestController
public class Provider {
@CrossOrigin(value = "http://localhost:8081")
@GetMapping("/hello")
public String hello() {
return "hello";
}

@CrossOrigin(value = "http://localhost:8081")
@PostMapping("/hello")
public String hello2() {
return "post hello";
}
}

这个注解表示这两个接口接受来自http://localhost:8081地址的请求,配置完成后,重启provider,再次发送请求,浏览器控制台就不会报错了,consumer也能拿到数据了。

provider上,每一个方法上都去加注解未免太麻烦了,在Spring Boot中,还可以通过全局配置一次性解决这个问题,全局配置只需要在配置类中重写addCorsMappings方法即可,如下:

1
2
3
4
5
6
7
8
9
10
java复制代码@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("http://localhost:8081")
.allowedMethods("*")
.allowedHeaders("*");
}
}

/**表示本应用的所有方法都会去处理跨域请求,allowedMethods表示允许通过的请求数,allowedHeaders则表示允许的请求头。经过这样的配置之后,就不必在每个方法上单独配置跨域了。

四、存在的问题

了解了整个CORS的工作过程之后,我们通过Ajax发送跨域请求,虽然用户体验提高了,但是也有潜在的威胁存在,常见的就是CSRF(Cross-site request forgery)跨站请求伪造。跨站请求伪造也被称为one-click attack 或者 session riding,通常缩写为CSRF或者XSRF,是一种挟制用户在当前已登录的Web应用程序上执行非本意的操作的攻击方法,举个例子:

假如一家银行用以运行转账操作的URL地址如下:http://icbc.com/aa?bb=cc,那么,一个恶意攻击者可以在另一个网站上放置如下代码:<img src="http://icbc.com/aa?bb=cc">,如果用户访问了恶意站点,而她之前刚访问过银行不久,登录信息尚未过期,那么她就会遭受损失。

基于此,浏览器在实际操作中,会对请求进行分类,分为简单请求,预先请求,带凭证的请求等,预先请求会首先发送一个options探测请求,和浏览器进行协商是否接受请求。默认情况下跨域请求是不需要凭证的,但是服务端可以配置要求客户端提供凭证,这样就可以有效避免csrf攻击。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…538539540…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%