开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

【Scala系列】Scala类型系统很强大?Scala都有啥

发表于 2021-11-22

Type System

Two Types of Variables

Scala 中有两种变量类型(非数据类型)

  • val 用来创建不可变类型的变量,类似于Java中final修饰符的作用
  • var 用来创建可变类型的变量

如下为Scala中变量声明的示例代码:

1
2
3
4
5
6
7
8
9
scala复制代码// immutable
val s = "hello"
s = "hello, world" // compile error: reassignment to val

val dog = new Dog("Tom")

// mutable
var i = 123
i = 456 // compile pass

The Type is Optional

上一个Section的示例表明Scala编译器足够智能,在编译期间可以通过 = 右侧的变量值推断出变量的数据类型.当然你也可以显式地声明变量的数据类型,示例代码如下:

1
2
scala复制代码val s: String = "hello"
var i: Int = 123

一般来说,对于Scala编译器在编译期间对变量数据类型的推断,是不需要参考变量显式声明的数据类型的,不显式声明数据类型是首选,因为显式的方式将让代码变得冗余. 但如果你觉得显式声明变量的数据类型会增加代码的可读性,你可以这样做.

Tip:

实际上,当我们使用第三方库中的方法时,显式声明变量的数据类型会给代码的可读性带来显著提升,尤其是非常见库或者其方法名称没有明确数据类型的场景. 当然你也可以使用注释来阐明数据类型, 但是代码的见名知意是更加推荐的做法,毫不夸张地说:好代码不需要注释

A few build-in Data Types

Scala 内置了很多基础数据类型供开发者使用,所有的这些数据类型都被封装成成熟的object,并非原始数据类型(int, float等)

如下为定义这些基础数据类型的示例:

1
2
3
4
scala复制代码val b: Byte = 1
val i: Int = 1
val l: Long = 1
val s: Short = 1

上述 Byte, Int, Long, Short四个数据类型均为 整数型 ,整数型默认的数据类型为 Int, 如果你想将整数型变量设置为其他整数型数据类型,你将需要使用类似如上的 显式声明数据类型,也即在变量名后边跟上 “: {数据类型}“.

小数型变量默认的数据类型为 Double, 同样地如果你想将小数型变量设置为其他小数型数据类型,也需要 显式声明数据类型, 如下为内置小数型数据类型示例:

1
2
3
scala复制代码val d = 2.0          // default "Double" type

val f: Float = 3.0 // set to `Float` type

当然了 Scala 也内置了 Boolean, String, Char 三种基本数据类型, 示例如下:

1
2
3
4
5
scala复制代码val b = true         // default "Boolean" type

val s = "hello" // default "String" type

val c = 'a' // default "Char" type

Tip

如下附上 Scala 官方对 9 种基本数据类型 值范围 的说明:

Data Type Possible Values
Boolean true or false
Byte 8-bit signed two’s complement integer (-2^7 to 2^7-1, inclusive)
Short 16-bit signed two’s complement integer (-2^15 to 2^15-1, inclusive)
Int 32-bit two’s complement integer (-2^31 to 2^31-1, inclusive)
Long 64-bit two’s complement integer (-2^63 to 2^63-1, inclusive)
Float 32-bit IEEE 754 single-precision float (1.40129846432481707e-45 to 3.40282346638528860e+38)
Double 64-bit IEEE 754 double-precision float (4.94065645841246544e-324d to 1.79769313486231570e+308d)
Char 16-bit unsigned Unicode character (0 to 2^16-1, inclusive)
String a sequence of Char

String Topic

Scala的字符串有很多不错的特性,接下来我们主要讨论几种常见的字符串拼接方法:

首先声明两个字符串变量

1
2
scala复制代码val firstName = "Shuai"
val lastName = "Li"

使用 + 操作符拼接多个字符串

1
scala复制代码val name = firstName + " " + lastName  // Shuai Li

使用 s 对字符串插值处理

1
scala复制代码val name = s"$firstName $lastName"  // Shuai Li

s 字符串插值也支持你将变量用 花括号 括起来

1
scala复制代码val name = s"${firstName} ${lastName}"  // Shuai Li

难道 花括号 就这?不止,花括号内支持是支持表达式的

1
scala复制代码val calculation_description = s"1 + 1 = ${1 + 1}"  // 1 + 1 = 2

对于一些特殊字符,使用 s 插值器时需要注意对这些字符进行 转义

1
scala复制代码println(s"The book is $$30.54")  // The book is $30.54

双引号也是需要进行 转义 的,如下包含正确和错误两个示例:

1
2
3
scala复制代码println(s"The focus of this book is on \"clean code\".")  // compile error

println("""The focus of this book is on "clean code".""") // The focus of this book is on "clean code".

也就是说,双引号 的转义需要通过 三重双引号 完成

使用 f 对字符串插值处理

f 字符串插值允许创建简单的格式化字符串,类似于其他语言的 printf。使用 f 插值时,所有变量引用都应该在变量尾部追加一个格式化符号(如 %d),请看如下示例:

1
2
scala复制代码val money = 7.856
println(f"$name%s has $money%2.2f yuan left") // Shuai Li has 7.86 yuan left

多行字符串处理

在 Scala 中我们可以使用三个双引号包裹字符串来达到创建多行字符串的目的, 示例代码如下:

1
2
3
4
scala复制代码val introduction =
"""My name
is Shuai Li..."""
println(introduction)

我们可以得到如下的输出结果:

1
2
scala复制代码My name
is Shuai Li...

但是如大家所见,这种方法有一个明显的缺陷就是: 除了第一行外的所有行都是带缩进效果的,为了解决这个问题我们可以将 | 符号加到首行外所有行的行首,并在字符串后调用 stripMargin(去除边缘空白) 方法, 代码示例如下:

1
2
3
4
scala复制代码val introduction =
"""My name
|is Shuai Li...""".stripMargin
println(introduction)

我们可以得到如下的输出结果:

1
2
scala复制代码My name
is Shuai Li...

BigInt and BigDecimal Topic

针对大数处理场景,Scala 提供了两种数据类型:

  • BigInt 大整数
  • BigDecimal 大十进制数(小数)

不同于其在Java中对应的类,Scala 中这两种大数数据类型支持所有你习惯使用的数字类型的操作符,一个字: 妙, 让我们看几个REPL[1]环境下的示例:

1
2
3
4
5
6
7
8
9
10
11
shell复制代码scala> val bi = BigInt(100000001)
val bi: scala.math.BigInt = 100000001

scala> bi + bi
val res2: scala.math.BigInt = 200000002

scala> bi - bi
val res3: scala.math.BigInt = 0

scala> bi * bi
val res4: scala.math.BigInt = 10000000200000001
1
2
3
4
5
6
7
8
shell复制代码scala> val bd = BigDecimal(10000.123)
val bd: scala.math.BigDecimal = 10000.123

scala> bd + bd
val res0: scala.math.BigDecimal = 20000.246

scala> bd * bd
val res1: scala.math.BigDecimal = 100002460.01512

Referrences

  • [1] Scala REPL, docs.scala-lang.org/overviews/s…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java垃圾回收,首先要知道怎么判断对象是否存活

发表于 2021-11-22

这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战

  1. 引言

Java 的垃圾回收主要是针对 JVM 中的栈区、堆区、常量区进行的一系列的操作,回收(格式化)掉已经不会被使用到的对象占用的内存,合理化的利用内存资源。

  1. 对象已死?

想要进行垃圾回收,首先要做的,就是如何判断对象是否已经不再被引用。目前有两种常见的算法。

  • 引用计数算法
  • 可达性分析算法

引用计数算法比较简单。其实就是给对象中添加一个引用计数器,每当有一个地方引用它时,计数器值就加1;当引用失效时,计数器值就减1。如果在某个时间段计数值为 0 ,那说明该对象已经不再被引用,可以被回收。

但是引用计数算法有一个非常致命的问题,就是它很难解决双向引用(循环引用)的问题。比如,

1
2
3
4
5
6
7
8
9
ini复制代码
ObjA objA = new ObjA();
ObjB objB = new ObjB();

objA.next = objB;
objB.next = objA;

objA = null;
objB = null;

这种情况下,objA 和 objB 对象的引用计数都不是 0,但是已经没办法使用 objA 和 objB 的对象了。因为出现循环引用,导致引用计数不为 0 ,无法被 GC。

由于引用计数算法存在这样的问题,目前主流的 JVM 均没有采用这种方式,而是采用的是可达性分析算法。

可达性分析算法的核心思想是,通过一系列称之为“GC Roots”的对象作为起始节点,通过这些节点搜索所有可达的节点。当发现某些对象不可达,即说明此对象不可用。可以被回收。

在 Java 中可以作为 GC Roots 的对象包括:

  • 虚拟机栈中引用的对象
  • 方法区中类静态属性引用的对象
  • 方法区中常量引用的对象
  1. finalize() 方法

finalize() 方法是在对象被回收之前调用的方法。但是 finalize() 方法的调用具有不确定性。

当进行完可达性分析之后,某个对象被标记为不可达时,会判断当前对象是否重写了 finalize() 方法,是否已经执行过对象的 finalize() 方法,如果没有覆盖,或者已经执行过,则不再执行,对象将被回收。

如果有必要执行 finalize() 方法,则会将这个对象放到一个 F-Queue 的低优先级队列里等待执行。之后 GC 将对 F-Queue 中的对象再次进行可达性分析。

所以,如果对象可以在 finalize() 方法中再次复活,即将自己的 this 指针,重新赋值到某个可达的对象的引用上。

下面的代码演示了如何在 finalize() 方法中复活对象。(PS:虽然可以这样做,但是一直没有遇到过这样的场景)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public class MyGC {
private static GCTest gcTest = null;

private static class GCTest{
public String name;

public GCTest(String name) {
this.name = name;
}

@Override
protected void finalize() throws Throwable {
super.finalize();
System.out.println("finalize 被执行");
//将自身的引用赋值到其他可达 GC Roots 上
gcTest = this;
}
}
public static void main(String[] args) {
gcTest = new GCTest("myGc");
// GCTest 对象不可达
gcTest = null;
System.gc();

//等待 5s
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//对象可达
System.out.println(gcTest.name);
}
}

我们定义了一个 MyGC 类,这个类有一个静态属性,类型是一个内部类(GCTest),GCTest 重写了 finalize() 方法,在 finalize() 方法中进行了自救,将自身的引用赋值到了 外部的属性上。所以在 main 方法中,即便是我们显示的将 gcTest 对象置为 null 但是还是可以引用得到。

关于对象的引用,在 Java 1.2 之后,对象的引用分为四大类:

  • 强引用(Strong Reference)

类似”Object obj=new Object()”这类的引用,只要强引用还存在,垃圾收集器永远不会回收掉被引用的对象。

  • 软引用(Soft Reference)

对于软引用关联着的对象,在系统将要发生内存溢出异常之前,将会把这些对象列进回收范围之中进行第二次回收

  • 弱引用(Weak Reference)

被弱引用关联的对象只能生存到下一次垃圾收集发生之前。当垃圾收集器工作时,无论当前内存是否足够,都会回收掉只被弱引用关联的对象

  • 虚引用(Phantom Reference)

它是最弱的一种关系。一个对象是否有虚引用的存在,完全不会对其生存时间构成影响,也无法通过虚引用来取得一个对象实例。为一个对象设置虚引用关联的唯一目的就是能在这个对象被收集器回收时收到一个系统通知。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

从JDK中学习设计模式——迭代器模式

发表于 2021-11-22

这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战

概述

迭代器模式(Iterator Pattern)提供一种方法来遍历容器对象中的各个元素,而不用暴露该对象的内部细节,是一种对象行为型模式。

结构

迭代器模式UML.png

  • Aggregate(抽象聚合类):用于存储和管理元素对象,一般是抽象类或接口。
  • ConcreteAggregate(具体聚合类):实现创建迭代器的方法。
  • Iterator(抽象迭代器):定义了访问和遍历元素的接口,一般是抽象类或接口。
  • ConcreteIterator(具体迭代器):它实现了抽象迭代器接口,完成对聚合对象的遍历,同时在具体迭代器中通过游标来记录在聚合对象中所处的当前位置。

优点

  1. 提供了统一的方法来遍历聚合对象,客户端遍历集合时,只需取到迭代器,而不必知道聚合的具体组成,符合迪米特法则。
  2. 迭代器面向接口编程,可以以不同的方式遍历聚合对象,客户不需要考虑聚合的类型,符合开闭原则。
  3. 把对象的管理与遍历分开,集合改变只影响聚合对象,遍历方式改变只影响迭代器,符合单一职责原则。

缺点

  1. 每个聚合对象都需要一个迭代器,会造成迭代器过多,不便于管理和维护。
  2. 由于迭代器模式将存储数据和遍历数据的职责分离,增加新的聚合类需要对应增加新的迭代器类,类的个数成对增加,这在一定程度上增加了系统的复杂性。
  3. 抽象迭代器的设计难度较大,需要充分考虑到系统将来的扩展,不利于扩展。

应用场景

  1. 需要为一个聚合对象提供多种遍历方式。
  2. 需要为遍历不同的聚合结构提供一个统一的接口。
  3. 需要访问一个聚合对象的内容,而又不想暴露它的内部。

JDK 中的应用

在 JDK 中 java.util.Iterator 和 java.util.Enumeration 都使用了迭代器模式。

Iterator.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java实现GIF图转字符动图实例demo

发表于 2021-11-22

「这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战」。

上一篇文章介绍了静态图转字符的实现demo;接下来也该是动态图转字符的demo了

从前面几篇文章的学习过程中,要想实现这个功能就属于信手拈来了

单张图转字符完成之后,动图无非是每一张静态图都转一遍,保存最后的结果即可

这里我们就不介绍基础的JDK写法了(感兴趣的可以到前面几篇文章中获取),我们直接进入进阶的玩法

接下来我们借助开源项目 github.com/liuyueyi/qu… 来迅速的实现输出Gif字符图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@Test
public void testCharLines() {
String file = "https://c-ssl.duitang.com/uploads/item/201707/11/20170711194634_nTiK5.thumb.1000_0.gif";
java.util.List<java.util.List<String>> list = ImgPixelWrapper.build()
.setSourceImg(file)
.setBlockSize(3)
.setRate(0.6)
.setPixelType(PixelStyleEnum.CHAR_COLOR)
.build()
.asChars();
for (List<String> s: list) {
for (String t: s) {
System.out.println(t);
}

System.out.println("------- 分割 -------");
}
}

注意上面的实现,List<String> 表示一个张字符图,一个gif图可以转换成多个

具体的输出字符太多,这里简单截取几个看一看效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
ruby复制代码$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$d?{/||_z$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$jJ$#h@$$p/$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$dc%[ `}k$*\M$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$}@t )WW_d$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$}@^ "L@nf$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$(%. )%Z($$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$${$] <&p($$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$cZ&l i8Lc$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$M+m$%/' [$}M$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$${8ouL@a)' 0af$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$dU#^ IC@< !$-$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$Qhc ` bLd$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$Qoj xM{$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$QwZ _$t$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$1@! ;$+$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$fo*, `$?$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$+$8W\` '$\d$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$dCp"YB) ,$@)d$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$Qw0 ^ 'zX$)d$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$QZO "L@}d$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$dco. \]ZB{M$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$?@( l]]d&}$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$dz%_ +]]{Wpj$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$1QB\ :]]]]/@jM$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$ju$w> '-]]]]]Xkf$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$Q{b$bc{}rm- '_]]]]]?!(M$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$f{YbW%$w! "_]]]]]]lQ$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$Mf{>$\\:~]]]]]]]i{$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$Qqh]]]]]]]]-:Q$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$_$r]]]]]-l+d$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
------- 分割 -------
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$d({/{)n__nd$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$fY$#b%$@$$Bc)$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$MuB} '[d$qvBJQ$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$n%j [*&Q$]$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$?$, ^c@$Jd$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$[$' _WW/$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$?${ ,a#\$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$QL8> \*qf$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$(z$%f' i@/M$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$zq8Q0B#t` v&)$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$[@< :X%- ,B)$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$MCp ` 0wc$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$Q0m {B)$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$r#. I$}$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$($) .B]$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$dY8~ #($$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$zk$Bx\ #nQ$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$}$!r8m W$/Q$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$]$ `" ~r$tQ$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$-$` `U$)d$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$)$_ "]L@-M$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$Qdq. :]]p8{$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$[@c. +]]}#dj$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$d{@Q" \]]]]\@rM$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$d-WW\` '_]]]]]ckf$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$M]XB&0f1xZ0 '<]]]]]?l(M$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$Q}/maM$p} ^_]]]]]]!Q$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$Q{~#z,;<?]]]]]]i{$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$dzW[]]]]]]]-;c$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$1BU]]]]]-l+d$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$

00.gif

一灰灰的联系方式

尽信书则不如无书,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

  • 个人站点:blog.hhui.top
  • 微博地址: 小灰灰Blog
  • QQ: 一灰灰/3302797840
  • 微信公众号:一灰灰blog

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Golang 中 defer Close() 的潜在风险

发表于 2021-11-22

作为一名 Gopher,我们很容易形成一个编程惯例:每当有一个实现了 io.Closer 接口的对象 x 时,在得到对象并检查错误之后,会立即使用 defer x.Close() 以保证函数返回时 x 对象的关闭 。以下给出两个惯用写法例子。

  • HTTP 请求
1
2
3
4
5
6
go复制代码resp, err := http.Get("https://golang.google.cn/")
if err != nil {
return err
}
defer resp.Body.Close()
// The following code: handle resp
  • 访问文件
1
2
3
4
5
6
go复制代码f, err := os.Open("/home/golangshare/gopher.txt")
if err != nil {
return err
}
defer f.Close()
// The following code: handle f

存在问题

实际上,这种写法是存在潜在问题的。defer x.Close() 会忽略它的返回值,但在执行 x.Close() 时,我们并不能保证 x 一定能正常关闭,万一它返回错误应该怎么办?这种写法,会让程序有可能出现非常难以排查的错误。

那么,Close() 方法会返回什么错误呢?在 POSIX 操作系统中,例如 Linux 或者 maxOS,关闭文件的 Close() 函数最终是调用了系统方法 close(),我们可以通过 man close 手册,查看 close() 可能会返回什么错误

1
2
3
4
5
6
7
8
9
shell复制代码ERRORS
The close() system call will fail if:

[EBADF] fildes is not a valid, active file descriptor.

[EINTR] Its execution was interrupted by a signal.

[EIO] A previously-uncommitted write(2) encountered an
input/output error.

错误 EBADF 表示无效文件描述符 fd,与本文中的情况无关;EINTR 是指的 Unix 信号打断;那么本文中可能存在的错误是 EIO。

EIO 的错误是指未提交读,这是什么错误呢?

水印图片.png

EIO 错误是指文件的 write() 的读还未提交时就调用了 close() 方法。

上图是一个经典的计算机存储器层级结构,在这个层次结构中,从上至下,设备的访问速度越来越慢,容量越来越大。存储器层级结构的主要思想是上一层的存储器作为低一层存储器的高速缓存。

CPU 访问寄存器会非常之快,相比之下,访问 RAM 就会很慢,而访问磁盘或者网络,那意味着就是蹉跎光阴。如果每个 write() 调用都将数据同步地提交到磁盘,那么系统的整体性能将会极度降低,而我们的计算机是不会这样工作的。当我们调用 write() 时,数据并没有立即被写到目标载体上,计算机存储器每层载体都在缓存数据,在合适的时机下,将数据刷到下一层载体,这将写入调用的同步、缓慢、阻塞的同步转为了快速、异步的过程。

这样看来,EIO 错误的确是我们需要提防的错误。这意味着如果我们尝试将数据保存到磁盘,在 defer x.Close() 执行时,操作系统还并未将数据刷到磁盘,这时我们应该获取到该错误提示(只要数据还未落盘,那数据就没有持久化成功,它就是有可能丢失的,例如出现停电事故,这部分数据就永久消失了,且我们会毫不知情)。但是按照上文的惯例写法,我们程序得到的是 nil 错误。

解决方案

我们针对关闭文件的情况,来探讨几种可行性改造方案

  • 第一种方案,那就是不使用 defer
1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码func solution01() error {
f, err := os.Create("/home/golangshare/gopher.txt")
if err != nil {
return err
}

if _, err = io.WriteString(f, "hello gopher"); err != nil {
f.Close()
return err
}

return f.Close()
}

这种写法就需要我们在 io.WriteString 执行失败时,明确调用 f.Close() 进行关闭。但是这种方案,需要在每个发生错误的地方都要加上关闭语句 f.Close(),如果对 f 的写操作 case 较多,容易存在遗漏关闭文件的风险。

  • 第二种方案是,通过命名返回值 err 和闭包来处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
go复制代码func solution02() (err error) {
f, err := os.Create("/home/golangshare/gopher.txt")
if err != nil {
return
}

defer func() {
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()

_, err = io.WriteString(f, "hello gopher")
return
}

这种方案解决了方案一中忘记关闭文件的风险,如果有更多 if err !=nil 的条件分支,这种模式可以有效降低代码行数。

  • 第三种方案是,在函数最后 return 语句之前,显示调用一次 f.Close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
go复制代码func solution03() error {
f, err := os.Create("/home/golangshare/gopher.txt")
if err != nil {
return err
}
defer f.Close()

if _, err := io.WriteString(f, "hello gopher"); err != nil {
return err
}

if err := f.Close(); err != nil {
return err
}
return nil
}

这种解决方案能在 io.WriteString 发生错误时,由于 defer f.Close() 的存在能得到 close 调用。也能在 io.WriteString 未发生错误,但缓存未刷新到磁盘时,得到 err := f.Close() 的错误,而且由于 defer f.Close() 并不会返回错误,所以并不担心两次 Close() 调用会将错误覆盖。

  • 最后一种方案是,函数 return 时执行 f.Sync()
1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码func solution04() error {
f, err := os.Create("/home/golangshare/gopher.txt")
if err != nil {
return err
}
defer f.Close()

if _, err = io.WriteString(f, "hello world"); err != nil {
return err
}

return f.Sync()
}

由于调用 close() 是最后一次获取操作系统返回错误的机会,但是在我们关闭文件时,缓存不一定被会刷到磁盘上。那么,我们可以调用 f.Sync() (其内部调用系统函数 fsync )强制性让内核将缓存持久到磁盘上去。

1
2
3
4
5
6
7
8
9
10
11
12
go复制代码// Sync commits the current contents of the file to stable storage.
// Typically, this means flushing the file system's in-memory copy
// of recently written data to disk.
func (f *File) Sync() error {
if err := f.checkValid("sync"); err != nil {
return err
}
if e := f.pfd.Fsync(); e != nil {
return f.wrapErr("sync", e)
}
return nil
}

由于 fsync 的调用,这种模式能很好地避免 close 出现的 EIO。可以预见的是,由于强制性刷盘,这种方案虽然能很好地保证数据安全性,但是在执行效率上却会大打折扣。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring线程池ThreadPoolTaskExecuto

发表于 2021-11-22

1 线程池简介

1.1 为什么使用线程池

  • 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
  • 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
  • 方便线程并发数的管控,因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场)
  • 提供更强大的功能,延时定时线程池

1.2 线程池为什么需要使用队列

因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换。

创建线程池的消耗较高或者线程池创建线程需要获取mainlock这个全局锁,影响并发效率,阻塞队列可以很好的缓冲

1.3 线程池为什么要使用阻塞队列而不使用非阻塞队列

阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源,当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。

使得在线程不至于一直占用cpu资源。(线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如:while (task != null || (task = getTask()) != null) {})。

不用阻塞队列也是可以的,不过实现起来比较麻烦而已,有好用的为啥不用呢

1.4 如何配置线程池

  • CPU密集型任务

尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换

  • IO密集型任务

可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间

  • 混合型任务

可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效

因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。

因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失

1.5 execute()和submit()方法

  1. execute(),执行一个任务,没有返回值
  2. submit(),提交一个线程任务,有返回值

submit(Callable<T> task)能获取到它的返回值,通过future.get()获取(阻塞直到任务执行完)。一般使用FutureTask+Callable配合使用

submit(Runnable task, T result)能通过传入的载体result间接获得线程的返回值。

submit(Runnable task)则是没有返回值的,就算获取它的返回值也是null

Future.get()方法会使取结果的线程进入阻塞状态,直到线程执行完成之后,唤醒取结果的线程,然后返回结果

1.6 Spring线程池

Spring 通过任务执行器(TaskExecutor)来实现多线程和并发编程,使用ThreadPoolTaskExecutor实现一个基于线程池的TaskExecutor,

还得需要使用@EnableAsync开启异步,并通过在需要的异步方法那里使用注解@Async声明是一个异步任务

Spring 已经实现的异常线程池:

  • SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。
  • SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方
  • ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
  • SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类
  • ThreadPoolTaskExecutor:最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装

1.7 @Async调用中的事务处理机制

在@Async标注的方法,同时也使用@Transactional进行标注;在其调用数据库操作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的操作。

那该如何给这些操作添加事务管理呢?

可以将需要事务管理操作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional

示例:

  • 方法A, 使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。
  • 方法B, 使用了@Async来标注,B中调用了C、D,C/D分别使用@Transactional做了标注,则可实现事务控制的目的

2 示例

2.1 线程池配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
kotlin复制代码package cn.jzh.thread;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@ComponentScan("cn.jzh.thread")
@EnableAsync //开启异步操作
public class TaskExecutorConfig implements AsyncConfigurer {

/**
* 通过getAsyncExecutor方法配置ThreadPoolTaskExecutor,获得一个基于线程池TaskExecutor
*
* @return
*/
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);//核心线程数
pool.setMaxPoolSize(10);//最大线程数
pool.setQueueCapacity(25);//线程队列
pool.initialize();//线程初始化
return pool;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}

配置类中方法说明:

Spring 中的ThreadPoolExecutor是借助JDK并发包中的java.util.concurrent.ThreadPoolExecutor来实现的。其中一些值的含义如下:

  • int corePoolSize:线程池维护线程的最小数量
  • int maximumPoolSize:线程池维护线程的最大数量,线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。
  • long keepAliveTime:空闲线程的存活时间TimeUnit
  • unit:时间单位,现由纳秒,微秒,毫秒,秒
  • BlockingQueue workQueue:持有等待执行的任务队列
  • RejectedExecutionHandler handler 线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施。

当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭。第二,任务数量超过线程池的最大限制。

Reject策略预定义有四种:

  1. ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException
  2. ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.
  3. ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃.
  4. ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如

果再次失败,则重复此过程)

2.2 异步方法

@Async注解可以用在方法上,表示该方法是个异步方法,也可以用在类上,那么表示此类的所有方法都是异步方法

异步方法会自动注入使用ThreadPoolTaskExecutor作为TaskExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码package cn.jzh.thread;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.Future;

@Service
public class AsyncTaskService {
/**
*
* @param i
*/
@Async
public void executeAsync(Integer i) throws Exception{
System.out.println("线程ID:" + Thread.currentThread().getId() + "线程名字:" +Thread.currentThread().getName()+"执行异步任务:" + i);
}

@Async
public Future<String> executeAsyncPlus(Integer i) throws Exception {
System.out.println("线程ID:" + Thread.currentThread().getId() +"线程名字:" +Thread.currentThread().getName()+ "执行异步有返回的任务:" + i);
return new AsyncResult<>("success:"+i);
}

}

2.3 启动测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码package cn.jzh.thread;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.concurrent.Future;

public class MainApp {
public static void main(String[] args) throws Exception{
System.out.println("主线程id:" + Thread.currentThread().getId() + "开始执行调用任务...");
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class);
AsyncTaskService service = context.getBean(AsyncTaskService.class);
for (int i = 0;i<10;i++){
service.executeAsync(i);
Future<String> result = service.executeAsyncPlus(i);
System.out.println("异步程序执行结束,获取子线程返回内容(会阻塞当前main线程)" + result.get());
}
context.close();

System.out.println("主线程id:" + Thread.currentThread().getId() + "程序结束!!");
}
}

注意:

  1. 是否影响主线程

如果main主线程不去获取子线程的结果(Future.get()),那么主线程完全可以不阻塞。那么,此时,主线程和子线程完全异步。此功能,可以做成类似MQ消息中间件之类的,消息异步进行发送
2. 判断是否执行完毕

当返回的数据类型为Future类型,其为一个接口。具体的结果类型为AsyncResult,这个是需要注意的地方。

调用返回结果的异步方法,判断是否执行完毕时需要使用future.isDone()来判断是否执行完毕

public void testAsyncAnnotationForMethodsWithReturnType()

throws InterruptedException, ExecutionException {

System.out.println(“Invoking an asynchronous method. “ + Thread.currentThread().getName());

Future future = asyncAnnotationExample.asyncMethodWithReturnType();

1
2
3
4
5
6
7
8
kotlin复制代码while (true) {  ///这里使用了循环判断,等待获取结果信息  
if (future.isDone()) { //判断是否执行完毕
System.out.println("Result from asynchronous process - " + future.get());
break;
}
System.out.println("Continue doing something else. ");
Thread.sleep(1000);
}

}

这些获取异步方法的结果信息,是通过不停的检查Future的状态来获取当前的异步方法是否执行完毕来实现的

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python matplotlibpyplot 绘制动态图

发表于 2021-11-22

这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战

复习回顾

在matplotlib模块中使用最多的类是pyplot。pyplot位于matplotlib模块中脚本层为入门用户提供快速绘制折线、柱状、散点、饼图等图形方法,我们前期也学习往期的文章如下

  • matplotlib 底层结构:对matplotlib模块底三层功能说明
  • matplotlib 绘制折线图:pyplot.plot()相关属性进行汇总说明
  • matplotlib 绘制柱状图:pyplot.bar()相关属性进行汇总说明
  • matplotlib Animation类绘制动画:animation类提供快速绘制动态图的类

在matplotlib模块为用户提供快速绘制动态图,除了专门绘制的动态类animation类外,pyplot也提供绘制动态图相关方法。

本期,我们将学习使用pyplot的绘制动态图相关方法,Let’s gp~

  1. pyplot绘图显示模式

我们前面学习绘制的方法后,最后都会调用pyplot.show()方法,运行程序后才会显示图像。

在matplotlib模式中绘制图形通常有两种模式:

  • block:阻塞模式

+ 阻塞模式打开一个fig窗口后不会显示新fig窗口
+ pyplot提供的绘制图形方法都是直接出图像,需要调用pyplot.show()才能显示图像
+ 在Python脚本中,matplotlib默认是阻塞模式
+ pyplot.show()在阻塞情况,为True
  • interactive:交互模式

+ 新创建的图像会立即显示
+ 图表上的数据会根据变化而自动重新绘制
+ 当使用交互模式下,需要确保事件循环正在运行具有响应数字
+ 在python 命令行中,matplotlib 默认为交互模式
+ pyplot.show 在交互模式下为False

image.png

  • pyplot提供绘制动图的方法

方法 作用
pyplot.ion() 打开交互模式
pyplot.ioff() 关闭交互模式
pyplot.clf() 清除当前的画布figure对象
pyplot.cla() 清除当前Axes对象
pyplot.pause() 暂停功能
  1. pyplot绘制动图步骤

matplotlib模块中,我们可以使用交互模式与GUI主循环组合绘制动态图形,主要有以下步骤:

  • 导入绘制图形的matplotlib.pyplot模块
1
python复制代码import matplotlib.pyplot as plt
  • 调用pyplot.ion()方法打开交互模式
1
python复制代码plt.ion()
  • 使用numpy.linspace()、numpy.arange()等方法准备x,y轴数据
1
2
python复制代码x = np.linspace(0,np.pi*i+1,1000)
y = np.cos(x)
  • 调用pyplot.xlim(),pyplot.ylim()方法设置x,y轴坐标范围
1
2
python复制代码plt.xlim(-0.2,20.4)
plt.ylim(-1.2,1.2)
  • 调用pyplot提供绘制折线、柱状图等方法
1
ini复制代码plt.plot(x,y,color="pink")
  • 使用for循环包含上述创建的x,y数据、x,y轴坐标、绘制图形方法
  • 使用for循环开始之前调用pyplot.cla()方法清空前一次Axes对象
1
python复制代码plt.cla()
  • 在for循环结束末尾调用pyplot.pause()暂停gonn
1
python复制代码plt.pause(0.1)
  • 当for循环结束后,需要调用pyplot.ioff()关闭交互模式已经pyplot.show显示图像
1
2
3
python复制代码plt.ioff()

plt.show()

2021-11-23 10-32-41.gif

  1. 小试牛刀

我们学习以pyplot方法绘制动态图方法,哪我们来实操一下吧

  • 调用numpy.arange()准备x,y轴数据
  • 调用pyplot.scatter()绘制散点图
  • 使用for循环包含以上步骤,在for循环开始调用pyplot.ion()打开交互模式
  • for循环结束后,调用pyplot.ioff()关闭交互模式
  • 最后调用pyplot.show()展示图像画面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scss复制代码def scatter_plot():
# 打开交互模式
plt.ion()
for index in range(50):
# plt.cla()

plt.title("动态散点图")
plt.grid(True)

point_count = 5
x_index = np.random.random(point_count)
y_index = np.random.random(point_count)

color_list = np.random.random(point_count)
scale_list = np.random.random(point_count) * 100

plt.scatter(x_index, y_index, s=scale_list, c=color_list, marker="^")

plt.pause(0.2)

plt.ioff()

plt.show()

image.png

2021-11-23 10-31-36.gif

总结

本期,我们学习matplotlib模块pyplot绘制动态图形的方法,在绘制图形中我们要使用for循环来生成每一次的数据,最后使用pyplot.show()展示图形时,需要先关闭交互模式,否则图形会一闪而过,发生闪退现象。

以上是本期内容,欢迎大佬们点赞评论,下期见~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringMVC中的传值

发表于 2021-11-22

C得到数据之后,跳转到V,并向V传递数据,进而V中可以渲染数据,让用户看到含有数据的也页面

转发跳转:Request作用域

重定向跳转:Session作用域

Request和Session

形参中即可获得作用域

image.png

JSP中取值

image.png

Model类

SpringMVC提供了Model对象用于数据共享和页面跳转。Model对象可以直接通过方法参数的形式获取。

–数据共享:Model对象的方法addAttribute(attrName, attrValue)。

–页面跳转:方法的返回值。

注事事项:Model方式,默认页面跳转方式也是请求转发,但是可以设置为重定向。

image.png

ModelAndView类

ModelAndView对象 可以集中管理 跳转和数据

–数据共享:ModelAndView对象的addObject(attrName, attrValue)方法。

–页面跳转:ModelAndView对象的setViewName(uri)方法。

注意事项:ModelAndView方式,默认页面跳转方式是请求转发。
对象中既包含传递的值也包含跳转的位置

image.png

当访问/test4时就会跳转到hello.jsp中,并传递数据

image.png

/hello.jsp

image.png

image.png

@SessionAttributes

默认情况下Spring MVC将模型中的数据存储到request域中。

当一个请求结束后,数据就失效了。如果要跨页面使用。那么需要使用到session。

而@SessionAttributes注解就可以使得模型中的数据存储一份到session域中。

@SessionAttributes({“gender”,”name”}) Model中的name和gender会存入session中
SessionStatus 移除session

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

并发编程(四)

发表于 2021-11-22

这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战

Semaphore

信号量, 提供了资源数量的并发控制, 存在公平锁和非公平锁。Semaphore是共享锁,当初始资源为1时,退化成排它锁, 内部有Sync,FairSync,NonfairSync

  • acquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
  • release
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

CountDownLatch

基于AQS, 没有公平和非公平的区别

  • await
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
  • countDown
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

CyclicBarrier

基于ReetrantLock和Condition实现

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class CyclicBarrier {
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private int count;
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
  • await
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
java复制代码 public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

CyclicBarrier可以重用的,会响应中断, 如果没有达到parties, 此时收到中断信号,阻塞的线程也会被唤醒,ount被设置成初始值。

Exchanger

用于线程之间交换数据, 使用CAS和park/unpark,内部有两个内部类,Participant和Node

  • Node
1
2
3
4
5
6
7
8
9
10
java复制代码//伪共享和缓存行填充
@sun.misc.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
}
  • Participant
1
2
3
java复制代码   static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}

每个对象调用exchange方法交换数据,先创建Node对象。这个对象是对线程的包装, 里面有线程要交换的数据,对方线程交换来的数据,线程本身。一个Node只能交换一个线程的数据,并行的交换数据用Node数组。

  • exchange
1
2
3
4
5
6
7
8
9
10
java复制代码 public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}

单个交换使用slotExchange, 多个交换使用arenaExchange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
java复制代码private final Object slotExchange(Object item, boolean timed, long ns) {
Node p = participant.get();
Thread t = Thread.currentThread();
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;

for (Node q;;) {
//此时说明有其他线程在交换数据
if ((q = slot) != null) {
//交换数据
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
// create arena on contention, but continue until slot null
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
else if (arena != null)
return null; // caller must reroute to arenaExchange
else {
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
}

// await release
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
//当前线程阻塞
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
return v;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
java复制代码private final Object arenaExchange(Object item, boolean timed, long ns) {
Node[] a = arena;
Node p = participant.get();
for (int i = p.index;;) { // access slot at i
int b, m, c; long j; // j is raw array offset
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
Object v = q.item; // release
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
else if (i <= (m = (b = bound) & MMASK) && q == null) {
p.item = item; // offer
if (U.compareAndSwapObject(a, j, null, p)) {
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
Thread t = Thread.currentThread(); // wait
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
if (v != null) {
U.putOrderedObject(p, MATCH, null);
p.item = null; // clear for next use
p.hash = h;
return v;
}
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
}
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
if (m != 0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break; // expired; restart
}
}
}
else
p.item = null; // clear offer
}
else {
if (p.bound != b) { // stale; reset
p.bound = b;
p.collides = 0;
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0) ? m : i - 1; // cyclically traverse
}
else
i = m + 1; // grow
p.index = i;
}
}
}

Phaser

可以替代CyclicBarrier和CountDownLatch

对应CountDownLatch的是awaitAdvance和arrive

对应CyclicBarrier的是awaitAdvance和arriveAndAwaitAdvance

动态调整线程个数

运行期间动态调整线程个数

1
2
3
java复制代码register()
bulkRegister(int parties)
arriveAndDeregister()

层次

多个Phaser组成树状结构

1
java复制代码private final Phaser parent;

在Phaser内部记录自己的父节点,没有记录子节点

State变量

phaser没有基于AQS

64位的State变量分为四部分: 最高位0表示未同步完成,1表示同步完成, 初始为0。接下来的31位表示轮数, 接下来的16位标识总线程数,最后的16位标识未到达线程数。

  • 初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}

当parties=0时, state被赋予1,当不等于0时,把phase左移32位, 把parties左移16位, parties作为最低的16位,三个值做或操作赋值给state

阻塞使用的是Treiber stack的数据结构, Treiber Stack是一个无锁的栈,单向链表, 出栈入栈都在链表头部,只要有一个head指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码static final class QNode implements ForkJoinPool.ManagedBlocker {
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread; // nulled to cancel wait
QNode next;

QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L;
thread = Thread.currentThread();
}

public boolean isReleasable() {
if (thread == null)
return true;
if (phaser.getPhase() != phase) {
thread = null;
return true;
}
if (Thread.interrupted())
wasInterrupted = true;
if (wasInterrupted && interruptible) {
thread = null;
return true;
}
if (timed) {
if (nanos > 0L) {
nanos = deadline - System.nanoTime();
}
if (nanos <= 0L) {
thread = null;
return true;
}
}
return false;
}

public boolean block() {
if (isReleasable())
return true;
else if (!timed)
LockSupport.park(this);
else if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
}
1
2
java复制代码private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;

使用两个链表,减少并发冲突,当phase为奇数, 阻塞线程在oddQ,否则在evenQ里。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码private void releaseWaiters(int phase) {
QNode q; // first element of queue
Thread t; // its thread
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
  • arrive
1
2
3
java复制代码public int arrive() {
return doArrive(ONE_ARRIVAL);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
//未到达线程数
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
if (unarrived == 1) {
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);
}
else if (nextUnarrived == 0) { // propagate deregistration
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}
  • awaitAdvance
1
2
3
4
5
6
7
8
9
10
java复制代码public int awaitAdvance(int phase) {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}
  • awaitAdvanceInterruptibly
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase) {
QNode node = new QNode(this, phase, true, false, 0L);
p = root.internalAwaitAdvance(phase, node);
if (node.wasInterrupted)
throw new InterruptedException();
}
return p;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
java复制代码private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}

if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}

这里调用了ForkJoinPool.ManagedBlock方法,目的是把node对应的线程阻塞

Atomic

AtomicInteger/AtomicLong

使用CAS达到原子的效果

1
2
3
4
5
6
7
8
9
10
11
java复制代码public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

AtomicBoolean/AtomicReference

让比较和设置值是原子操作

1
2
3
4
5
java复制代码public final boolean compareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}

在Unsafe只提供了三种类型的CAS操作: int,long,Object

1
2
3
4
5
java复制代码public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

AtomicStampedReference/AtomicMarkableReference

ABA 问题的解决

通过版本号

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public boolean compareAndSet(V   expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

AtomicMarkableReference和AtomicStampedReference类似,只是AtomicMarkableReference的Pair是Boolean类型, AtomicStampedReference是整型的累加

AtomicIntegerFiledUpdater

类似的有AtomicLongFiledUpdater, AtomicReferenceFiledUpdater.

应用场景: 无法修改类,实现对成员变量的原子操作

AtomicIntegerFiledUpdater是抽象类,通过newUpdater获取实例。表示类的某个成员,而不是对象的某个成员。

1
2
3
java复制代码 public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,   String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>(tclass, fieldName, Reflection.getCallerClass());
}
1
2
3
4
5
6
7
8
java复制代码public int getAndIncrement(T obj) {
int prev, next;
do {
prev = get(obj);
next = prev + 1;
} while (!compareAndSet(obj, prev, next));
return prev;
}

限制条件: 成员变量必须是volatile的int类型(不能是Integer包装类)

AtomicIntegerArray

AtomicLongArray, AtomicReferenceArray

使用和原理与AtomicIntegerFiledUpdater一样

LongAdder

JDK8提供了LongAdder, LongAccumulator, DoubleAdder和DoubleAccumulator,都继承了Striped64

原理

把一个Long型分成base和多个cell, 获取值的时候对base和多个cell求sum

1
2
3
4
5
6
7
8
9
10
11
java复制代码public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

最终一致性

类似ConcurrentHashMap, 只有最终一致性,没有强一致性。

  • increment/decrement
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public void increment() {
add(1L);
}
public void decrement() {
add(-1L);
}
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
java复制代码final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

LongAccumulator可以自定义操作符。

1
2
3
4
java复制代码@FunctionalInterface
public interface LongBinaryOperator {
long applyAsLong(long left, long right);
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

CAS 并发编程思想

发表于 2021-11-22

CAS 简介

CAS 是什么,它的英文全称是 Compare-And-Swap,中文叫做“比较并交换”,它是一种思想、一种算法。

在多线程的情况下,各个代码的执行顺序是不能确定的,所以为了保证并发安全,我们可以使用互斥锁。而 CAS 的特点是避免使用互斥锁,当多个线程同时使用 CAS 更新同一个变量时,只有其中一个线程能够操作成功,而其他线程都会更新失败。不过和同步互斥锁不同的是,更新失败的线程并不会被阻塞,而是被告知这次由于竞争而导致的操作失败,但还可以再次尝试。

CAS 被广泛应用在并发编程领域中,以实现那些不会被打断的数据交换操作,从而就实现了无锁的线程安全。

CAS 的思路

在大多数处理器的指令中,都会实现 CAS 相关的指令,这一条指令就可以完成“比较并交换”的操作,也正是由于这是一条(而不是多条)CPU 指令,所以 CAS 相关的指令是具备原子性的,这个组合操作在执行期间不会被打断,这样就能保证并发安全。由于这个原子性是由 CPU 保证的,所以无需我们程序员来操心。

CAS 有三个操作数:内存值 V、预期值 A、要修改的值 B。CAS 最核心的思路就是,仅当预期值 A 和当前的内存值 V 相同时,才将内存值修改为 B。

我们对此展开描述一下:CAS 会提前假定当前内存值 V 应该等于值 A,而值 A 往往是之前读取到当时的内存值 V。在执行 CAS 时,如果发现当前的内存值 V 恰好是值 A 的话,那 CAS 就会把内存值 V 改成值 B,而值 B 往往是在拿到值 A 后,在值 A 的基础上经过计算而得到的。如果执行 CAS 时发现此时内存值 V 不等于值 A,则说明在刚才计算 B 的期间内,内存值已经被其他线程修改过了,那么本次 CAS 就不应该再修改了,可以避免多人同时修改导致出错。这就是 CAS 的主要思路和流程。

JDK 正是利用了这些 CAS 指令,可以实现并发的数据结构,比如 AtomicInteger 等原子类。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

Unsafe 本地方法

1
java复制代码public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

利用 CAS 实现的无锁算法,就像我们谈判的时候,用一种非常乐观的方式去协商,彼此之间很友好,这次没谈成,还可以重试。CAS 的思路和之前的互斥锁是两种完全不同的思路,如果是互斥锁,不存在协商机制,大家都会尝试抢占资源,如果抢到了,在操作完成前,会把这个资源牢牢的攥在自己的手里。当然,利用 CAS 和利用互斥锁,都可以保证并发安全,它们是实现同一目标的不同手段。

CAS 的语义

我们来看一看 CAS 的语义,有了下面的等价代码之后,理解起来会更加容易,因为代码实际上是一目了然的。接下来我们把 CAS 拆开,看看它内部究竟做了哪些事情。CAS 的等价语义的代码,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码/**

 * 描述:     模拟CAS操作,等价代码

 */

 

public class SimulatedCAS {



    private int value;



    public synchronized int compareAndSwap(int expectedValue, int newValue) {

        int oldValue = value;

        if (oldValue == expectedValue) {

            value = newValue;

        }

        return oldValue;

    }

}

在这段代码中有一个 compareAndSwap 方法,在这个方法里有两个入参,第 1 个入参期望值 expectedValue,第 2 个入参是 newValue,它就是我们计算好的新的值,我们希望把这个新的值去更新到变量上去。

compareAndSwap 方法是被 synchronized 修饰的,我们用同步方法为 CAS 的等价代码保证了原子性。

接下来我将讲解,在 compareAndSwap 方法里都做了哪些事情。需要先拿到变量的当前值,所以代码里用就会用 int oldValue = value 把变量的当前值拿到。然后就是 compare,也就是“比较”,所以此时会用 if (oldValue == expectedValue) 把当前值和期望值进行比较,如果它们是相等的话,那就意味着现在的值正好就是我们所期望的值,满足条件,说明此时可以进行 swap,也就是交换,所以就把 value 的值修改成 newValue,最后再返回 oldValue,完成了整个 CAS 过程。

CAS 最核心的思想就在上面这个流程中体现了,可以看出,compare 指的就是 if 里的比较,比较 oldValue 是否等于 expectedValue;同样,swap 实际上就是把 value 改成 newValue,并且返回 oldValue。所以这整个 compareAndSwap 方法就还原了 CAS 的语义,也象征了 CAS 指令在背后所做的工作。

这类似于我们数据库乐观锁的实现,通过version字段来控制变量是否更新。在数据库乐观锁的实现中,我们一般指定某个字段,这里为version(int)。当我们需要更改某条记录时,会通过 update tablename set version = version +1, column_1 = val where version = version;

具体是个怎样的流程?下面说一下具体的实现步骤:
使用version字段控制版本后:

  1. 两人先查询该数据 select * from tablename where id = 1

此时两人查询到的数据一样 version = 0

  1. 两人都执行update,第一人执行update(因为mysql行锁的特性,两人不可能同时修改一条数据,所以update同一条数据的时候,是有先后顺序的,只有在第一个执行完update,才能释放行锁,第二个继续进行update):

update tablename set column_1 = 1, version = version + 1 where id = 1 and version = 0

执行完成后,version字段值将变成1, 第二人执行update:

update order set price = 1, version = version + 1 where id = 1 and version = 0

此时的version的值已经被修改为1,所以第二人修改失败,实现乐观锁控制。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…232233234…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%