开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

究竟什么是可重入锁?

发表于 2017-10-23

经历

很久之前就听说了可重入锁,可重入锁究竟是什么意思,以前是囫囵吞枣的,只要记住ReentrantLock和sychronized是可重入锁就行了,爱咋用咋用,好吧,原谅我的无知,最近对基础查漏补缺,发现竟然对其一问三不知,赶紧预习一波,觉得有必要写一篇博客来讲解,就当做什么都没有发生吧,嘿嘿。。。

释义

广义上的可重入锁指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁(前提得是同一个对象或者class),这样的锁就叫做可重入锁。ReentrantLock和synchronized都是可重入锁,下面是一个用synchronized实现的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码public class ReentrantTest implements Runnable {

public synchronized void get() {
System.out.println(Thread.currentThread().getName());
set();
}

public synchronized void set() {
System.out.println(Thread.currentThread().getName());
}

public void run() {
get();
}

public static void main(String[] args) {
ReentrantTest rt = new ReentrantTest();
for(;;){
new Thread(rt).start();
}
}
}

整个过程没有发生死锁的情况,截取一部分输出结果如下:

1
2
3
4
5
6
7
8
复制代码Thread-8492
Thread-8492
Thread-8494
Thread-8494
Thread-8495
Thread-8495
Thread-8493
Thread-8493

set()和get()同时输出了线程名称,表明即使递归使用synchronized也没有发生死锁,证明其是可重入的。

不可重入锁

不可重入锁,与可重入锁相反,不可递归调用,递归调用就发生死锁。看到一个经典的讲解,使用自旋锁来模拟一个不可重入锁,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码import java.util.concurrent.atomic.AtomicReference;

public class UnreentrantLock {

private AtomicReference<Thread> owner = new AtomicReference<Thread>();

public void lock() {
Thread current = Thread.currentThread();
//这句是很经典的“自旋”语法,AtomicInteger中也有
for (;;) {
if (!owner.compareAndSet(null, current)) {
return;
}
}
}

public void unlock() {
Thread current = Thread.currentThread();
owner.compareAndSet(current, null);
}
}

代码也比较简单,使用原子引用来存放线程,同一线程两次调用lock()方法,如果不执行unlock()释放锁的话,第二次调用自旋的时候就会产生死锁,这个锁就不是可重入的,而实际上同一个线程不必每次都去释放锁再来获取锁,这样的调度切换是很耗资源的。稍微改一下,把它变成一个可重入锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
复制代码import java.util.concurrent.atomic.AtomicReference;

public class UnreentrantLock {

private AtomicReference<Thread> owner = new AtomicReference<Thread>();
private int state = 0;

public void lock() {
Thread current = Thread.currentThread();
if (current == owner.get()) {
state++;
return;
}
//这句是很经典的“自旋”式语法,AtomicInteger中也有
for (;;) {
if (!owner.compareAndSet(null, current)) {
return;
}
}
}

public void unlock() {
Thread current = Thread.currentThread();
if (current == owner.get()) {
if (state != 0) {
state--;
} else {
owner.compareAndSet(current, null);
}
}
}
}

在执行每次操作之前,判断当前锁持有者是否是当前对象,采用state计数,不用每次去释放锁。

ReentrantLock中可重入锁实现

这里看非公平锁的锁获取方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码        final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//就是这里
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

在AQS中维护了一个private volatile int state来计数重入次数,避免了频繁的持有释放操作,这样既提升了效率,又避免了死锁。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python学习笔记(进阶篇一)

发表于 2017-10-19

笔记整理出处:廖雪峰教程

  • 进阶
    • 函数
      • 位置参数
      • 默认参数
      • 可变参数
      • 关键字参数
      • 命名关键字参数
      • 参数组合
      • 递归函数
    • 高级特性
      • 切片
      • 迭代
      • 列表生成式
      • 生成器
      • 迭代器

进阶

函数

在Python中,定义一个函数要使用def语句,依次写出函数名、括号、括号中的参数和冒号:,然后,在缩进块中编写函数体,函数的返回值用return语句返回。

我们以自定义一个求绝对值的my_abs函数为例:

1
2
3
4
5
复制代码def my_abs(x):
if x >= 0:
return x
else:
return -x

请注意,函数体内部的语句在执行时,一旦执行到return时,函数就执行完毕,并将结果返回。因此,函数内部通过条件判断和循环可以实现非常复杂的逻辑。

如果没有return语句,函数执行完毕后也会返回结果,只是结果为None。
return None可以简写为return。
python中函数没有返回值类型声明,同时,函数名其实就是指向一个函数对象的引用,完全可以把函数名赋给一个变量,相当于给这个函数起了一个“别名”:

1
2
3
复制代码>>> a = abs # 变量a指向abs函数
>>> a(-1) # 所以也可以通过a调用abs函数
1
位置参数

我们先写一个计算x2的函数:

1
2
复制代码def power(x):
return x * x

对于power(x)函数,参数x就是一个位置参数。

当我们调用power函数时,必须传入有且仅有的一个参数x:

默认参数
1
2
复制代码def power(x , y = 2):
return x * y

我们调用时既可以这样用power(2,3),也可以这样用power(2),明显的,当我们不传递y这个参数时,方法内部会去y的默认值进行运算,也就是2

默认参数可以简化函数的调用。设置默认参数时,有几点要注意:

  • 必选参数在前,默认参数在后,否则Python的解释器会报错(思考一下为什么默认参数不能放在必选参数前面);
  • 如何设置默认参数。
    当函数有多个参数时,把变化大的参数放前面,变化小的参数放后面。变化小的参数就可以作为默认参数。

使用默认参数有什么好处?最大的好处是能降低调用函数的难度。因为有些参数,可能我们大部分时间传递的是同样的值。
注意事项:

  • 定义默认参数要牢记一点:默认参数必须指向不变对象!
  • 定义默认参数要牢记一点:默认参数必须指向不变对象!
  • 定义默认参数要牢记一点:默认参数必须指向不变对象!

举例说明,先定义一个函数,传入一个list,添加一个END再返回:

1
2
3
复制代码def add_end(L=[]):
L.append('END')
return L

当你正常调用时,结果似乎不错:

1
2
3
4
复制代码>>> add_end([1, 2, 3])
[1, 2, 3, 'END']
>>> add_end(['x', 'y', 'z'])
['x', 'y', 'z', 'END']

当你使用默认参数调用时,一开始结果也是对的:

1
2
复制代码>>> add_end()
['END']

但是,再次调用add_end()时,结果就不对了:

1
2
3
4
复制代码>>> add_end()
['END', 'END']
>>> add_end()
['END', 'END', 'END']

很多初学者很疑惑,默认参数是[],但是函数似乎每次都“记住了”上次添加了’END’后的list。

原因解释如下:

Python函数在定义的时候,默认参数L的值就被计算出来了,即[],因为默认参数L也是一个变量,它指向对象[],每次调用该函数,如果改变了L的内容,则下次调用时,默认参数的内容就变了,不再是函数定义时的[]了。

所以,定义默认参数要牢记一点:默认参数必须指向不变对象!

要修改上面的例子,我们可以用None这个不变对象来实现:

1
2
3
4
5
复制代码def add_end(L=None):
if L is None:
L = []
L.append('END')
return L

现在,无论调用多少次,都不会有问题:

1
2
3
4
复制代码>>> add_end()
['END']
>>> add_end()
['END']

为什么要设计str、None这样的不变对象呢?因为不变对象一旦创建,对象内部的数据就不能修改,这样就减少了由于修改数据导致的错误。此外,由于对象不变,多任务环境下同时读取对象不需要加锁,同时读一点问题都没有。我们在编写程序时,如果可以设计一个不变对象,那就尽量设计成不变对象。

可变参数

定义与java类似,基本使用方法如下:

1
2
3
4
5
复制代码def calc(*numbers):
sum = 0
for n in numbers:
sum = sum + n * n
return sum

对于已经存在的list类型参数,可变参数的使用方法和java略有不同,不能直接传入该变量,需要增加*

1
2
3
复制代码>>> nums = [1, 2, 3]
>>> calc(*nums)
14

*nums表示把nums这个list的所有元素作为可变参数传进去。这种写法相当有用,而且很常见。

关键字参数

可变参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。而关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。请看示例:

1
2
复制代码def person(name, age, **kw):
print('name:', name, 'age:', age, 'other:', kw)

函数person除了必选参数name和age外,还接受关键字参数kw。在调用该函数时,可以只传入必选参数:

1
2
复制代码>>> person('Michael', 30)
name: Michael age: 30 other: {}

也可以传入任意个数的关键字参数:

1
2
3
4
复制代码>>> person('Bob', 35, city='Beijing')
name: Bob age: 35 other: {'city': 'Beijing'}
>>> person('Adam', 45, gender='M', job='Engineer')
name: Adam age: 45 other: {'gender': 'M', 'job': 'Engineer'}

和可变参数类似,也可以先组装出一个dict,然后,把该dict转换为关键字参数传进去:

1
2
3
复制代码>>> extra = {'city': 'Beijing', 'job': 'Engineer'}
>>> person('Jack', 24, **extra)
name: Jack age: 24 other: {'city': 'Beijing', 'job': 'Engineer'}

**extra表示把extra这个dict的所有key-value用关键字参数传入到函数的kw参数,kw将获得一个dict,

注意kw获得的dict是extra的一份拷贝,对kw的改动不会影响到函数外的extra。

命名关键字参数

对于关键字参数,函数的调用者可以传入任意不受限制的关键字参数。至于到底传入了哪些,就需要在函数内部检查。
如果要限制关键字参数的名字,就可以用命名关键字参数,例如,只接收city和job作为关键字参数。这种方式定义的函数如下:

1
2
3
4
5
6
7
8
复制代码def person(name, age, *, city, job):
print(name, age, city, job)
和关键字参数**kw不同,命名关键字参数需要一个特殊分隔符*,*后面的参数被视为命名关键字参数。

调用方式如下:
~~~python
>>> person('Jack', 24, city='Beijing', job='Engineer')
Jack 24 Beijing Engineer

如果函数定义中已经有了一个可变参数,后面跟着的命名关键字参数就不再需要一个特殊分隔符*了:

1
2
复制代码def person(name, age, *args, city, job):
print(name, age, args, city, job)

命名关键字参数必须传入参数名,这和位置参数不同。如果没有传入参数名,调用将报错:

1
2
3
4
复制代码>>> person('Jack', 24, 'Beijing', 'Engineer')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: person() takes 2 positional arguments but 4 were given

由于调用时缺少参数名city和job,Python解释器把这4个参数均视为位置参数,但person()函数仅接受2个位置参数。

命名关键字参数可以有缺省值,从而简化调用:

1
2
3
4
5
6
复制代码def person(name, age, *, city='Beijing', job):
print(name, age, city, job)
由于命名关键字参数city具有默认值,调用时,可不传入city参数:
~~~python
>>> person('Jack', 24, job='Engineer')
Jack 24 Beijing Engineer

使用命名关键字参数时,要特别注意,如果没有可变参数,就必须加一个作为特殊分隔符。如果缺少,Python解释器将无法识别位置参数和命名关键字参数:

1
2
3
复制代码def person(name, age, city, job):
# 缺少 *,city和job被视为位置参数
pass
参数组合

在Python中定义函数,可以用必选参数、默认参数、可变参数、关键字参数和命名关键字参数,这5种参数都可以组合使用。但是请注意,参数定义的顺序必须是:必选参数、默认参数、可变参数、命名关键字参数和关键字参数。

比如定义一个函数,包含上述若干种参数:

1
2
3
4
5
复制代码def f1(a, b, c=0, *args, **kw):
print('a =', a, 'b =', b, 'c =', c, 'args =', args, 'kw =', kw)

def f2(a, b, c=0, *, d, **kw):
print('a =', a, 'b =', b, 'c =', c, 'd =', d, 'kw =', kw)

在函数调用的时候,Python解释器自动按照参数位置和参数名把对应的参数传进去。

递归函数

使用递归函数需要注意防止栈溢出。在计算机中,函数调用是通过栈(stack)这种数据结构实现的,每当进入一个函数调用,栈就会加一层栈帧,每当函数返回,栈就会减一层栈帧。由于栈的大小不是无限的,所以,递归调用的次数过多,会导致栈溢出。
解决递归调用栈溢出的方法是通过尾递归优化,事实上尾递归和循环的效果是一样的,所以,把循环看成是一种特殊的尾递归函数也是可以的。

高级特性

切片

对经常取指定索引范围的操作,用循环十分繁琐,因此,Python提供了切片(Slice)操作符,能大大简化这种操作。
取前3个元素,用一行代码就可以完成切片:

1
2
复制代码>>> L[0:3]
['Michael', 'Sarah', 'Tracy']

前开后闭原则。默认从第一个开始取时可以省略不写0.
类似的,Python支持L[-1]取倒数第一个元素,那么它同样支持倒数切片:

1
2
3
4
复制代码>>> L[-2:]
['Bob', 'Jack']
>>> L[-2:-1]
['Bob']

记住倒数第一个元素的索引是-1。
支持间隔取值,比如前10个数,每两个取一个:

1
2
复制代码>>> L[:10:2]
[0, 2, 4, 6, 8]

所有数,每5个取一个:

1
2
复制代码>>> L[::5]
[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95]

甚至什么都不写,只写[:]就可以原样复制一个list:

1
2
复制代码>>> L[:]
[0, 1, 2, 3, ..., 99]

tuple也是一种list,唯一区别是tuple不可变。因此,tuple也可以用切片操作,只是操作的结果仍是tuple:

1
2
复制代码>>> (0, 1, 2, 3, 4, 5)[:3]
(0, 1, 2)

字符串’xxx’也可以看成是一种list,每个元素就是一个字符。因此,字符串也可以用切片操作,只是操作结果仍是字符串:

1
2
3
4
复制代码>>> 'ABCDEFG'[:3]
'ABC'
>>> 'ABCDEFG'[::2]
'ACEG'
迭代

只要是可迭代对象,无论有无下标,都可以迭代,比如dict就可以迭代:

1
2
3
4
5
6
7
复制代码>>> d = {'a': 1, 'b': 2, 'c': 3}
>>> for key in d:
... print(key)
...
a
c
b

默认情况下,dict迭代的是key。如果要迭代value,可以用for value in d.values(),如果要同时迭代key和value,可以用for k, v in d.items()。
由于字符串也是可迭代对象,因此,也可以作用于for循环。

那么,如何判断一个对象是可迭代对象呢?方法是通过collections模块的Iterable类型判断:

1
2
3
4
5
6
7
复制代码>>> from collections import Iterable
>>> isinstance('abc', Iterable) # str是否可迭代
True
>>> isinstance([1,2,3], Iterable) # list是否可迭代
True
>>> isinstance(123, Iterable) # 整数是否可迭代
False

最后一个小问题,如果要对list实现类似Java那样的下标循环怎么办?Python内置的enumerate函数可以把一个list变成索引-元素对,这样就可以在for循环中同时迭代索引和元素本身:

1
2
3
4
5
6
复制代码>>> for i, value in enumerate(['A', 'B', 'C']):
... print(i, value)
...
0 A
1 B
2 C

上面的for循环里,同时引用了两个变量,在Python里是很常见的,比如下面的代码:

1
2
3
4
5
6
复制代码>>> for x, y in [(1, 1), (2, 4), (3, 9)]:
... print(x, y)
...
1 1
2 4
3 9

任何可迭代对象都可以作用于for循环,包括自定义的数据类型,只要符合迭代条件,就可以使用for循环。

列表生成式

如果要生成[1x1, 2x2, 3x3, …, 10x10]怎么做?方法一是循环:

1
2
3
4
5
6
复制代码>>> L = []
>>> for x in range(1, 11):
... L.append(x * x)
...
>>> L
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

但是循环太繁琐,而列表生成式则可以用一行语句代替循环生成上面的list:

1
2
复制代码>>> [x * x for x in range(1, 11)]
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

写列表生成式时,把要生成的元素x * x放到前面,后面跟for循环,就可以把list创建出来。
还可以使用两层循环,可以生成全排列:

1
2
复制代码>>> [m + n for m in 'ABC' for n in 'XYZ']
['AX', 'AY', 'AZ', 'BX', 'BY', 'BZ', 'CX', 'CY', 'CZ']

鉴于列表生成式的便捷性,过于复杂的逻辑不建议直接使用生成式来写(个人观点)

生成器

通过列表生成式,我们可以直接创建一个列表。但是,受到内存限制,列表容量肯定是有限的。而且,创建一个包含100万个元素的列表,不仅占用很大的存储空间,如果我们仅仅需要访问前面几个元素,那后面绝大多数元素占用的空间都白白浪费了。

所以,如果列表元素可以按照某种算法推算出来,那我们是否可以在循环的过程中不断推算出后续的元素呢?这样就不必创建完整的list,从而节省大量的空间。在Python中,这种一边循环一边计算的机制,称为生成器:generator。

要创建一个generator,有很多种方法。第一种方法很简单,只要把一个列表生成式的[]改成(),就创建了一个generator:

1
2
3
4
5
6
复制代码>>> L = [x * x for x in range(10)]
>>> L
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> g = (x * x for x in range(10))
>>> g
<generator object <genexpr> at 0x1022ef630>

创建L和g的区别仅在于最外层的[]和(),L是一个list,而g是一个generator。
如果要一个一个打印出来,可以通过next()函数获得generator的下一个返回值:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码>>> next(g)
0
>>> next(g)
1
>>> next(g)
4
...
81
>>> next(g)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

我们讲过,generator保存的是算法,每次调用next(g),就计算出g的下一个元素的值,直到计算到最后一个元素,没有更多的元素时,抛出StopIteration的错误。

当然,上面这种不断调用next(g)实在是太变态了,正确的方法是使用for循环,因为generator也是可迭代对象:

1
2
3
4
5
6
复制代码>>> g = (x * x for x in range(10))
>>> for n in g:
... print(n)
...
0
1

定义generator的另一种方法。如果一个函数定义中包含yield关键字,那么这个函数就不再是一个普通函数,而是一个generator:

1
2
3
4
5
6
7
8
9
10
11
复制代码def fib(max):
n, a, b = 0, 0, 1
while n < max:
yield b
a, b = b, a + b
n = n + 1
return 'done'

>>> f = fib(6)
>>> f
<generator object fib at 0x104feaaa0>

这里,最难理解的就是generator和函数的执行流程不一样。函数是顺序执行,遇到return语句或者最后一行函数语句就返回。而变成generator的函数,在每次调用next()的时候执行,遇到yield语句返回,再次执行时从上次返回的yield语句处继续执行。

举个简单的例子,定义一个generator,依次返回数字1,3,5:

1
2
3
4
5
6
7
复制代码def odd():
print('step 1')
yield 1
print('step 2')
yield(3)
print('step 3')
yield(5)

调用该generator时,首先要生成一个generator对象,然后用next()函数不断获得下一个返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码>>> o = odd()
>>> next(o)
step 1
1
>>> next(o)
step 2
3
>>> next(o)
step 3
5
>>> next(o)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

可以看到,odd不是普通函数,而是generator,在执行过程中,遇到yield就中断,下次又继续执行。执行3次yield后,已经没有yield可以执行了,所以,第4次调用next(o)就报错。

回到fib的例子,我们在循环过程中不断调用yield,就会不断中断。当然要给循环设置一个条件来退出循环,不然就会产生一个无限数列出来。

同样的,把函数改成generator后,我们基本上从来不会用next()来获取下一个返回值,而是直接使用for循环来迭代:

1
2
3
4
5
6
7
8
9
复制代码>>> for n in fib(6):
... print(n)
...
1
1
2
3
5
8

但是用for循环调用generator时,发现拿不到generator的return语句的返回值。如果想要拿到返回值,必须捕获StopIteration错误,返回值包含在StopIteration的value中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码>>> g = fib(6)
>>> while True:
... try:
... x = next(g)
... print('g:', x)
... except StopIteration as e:
... print('Generator return value:', e.value)
... break
...
g: 1
g: 1
g: 2
g: 3
g: 5
g: 8
Generator return value: done
迭代器

可以直接作用于for循环的数据类型有以下几种:

一类是集合数据类型,如list、tuple、dict、set、str等;

一类是generator,包括生成器和带yield的generator function。

这些可以直接作用于for循环的对象统称为可迭代对象:Iterable。

可以使用isinstance()判断一个对象是否是Iterable对象:

1
2
3
4
5
复制代码>>> from collections import Iterable
>>> isinstance([], Iterable)
True
>>> isinstance(100, Iterable)
False

而生成器不但可以作用于for循环,还可以被next()函数不断调用并返回下一个值,直到最后抛出StopIteration错误表示无法继续返回下一个值了。

可以被next()函数调用并不断返回下一个值的对象称为迭代器:Iterator。
可以使用isinstance()判断一个对象是否是Iterator对象:

1
2
3
4
5
复制代码>>> from collections import Iterator
>>> isinstance((x for x in range(10)), Iterator)
True
>>> isinstance([], Iterator)
False

生成器都是Iterator对象,但list、dict、str虽然是Iterable,却不是Iterator。
把list、dict、str等Iterable变成Iterator可以使用iter()函数
:

1
2
3
4
复制代码>>> isinstance(iter([]), Iterator)
True
>>> isinstance(iter('abc'), Iterator)
True

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

翻译 服务性能监控:USE方法(The USE Meth

发表于 2017-10-19

原文链接: http://www.brendangregg.com/usemethod.html

原文作者:Bredan Gregg

翻译:小莞

校对:3D

USE 方法是一种能分析任何系统性能的方法论。我们可以根据能帮助系统分析的结构化清单,来迅速的定位资源的瓶颈和错误所在。它通常会先以列出问题为开始,然后再寻找适合的指标,而不是给你制定一些固定的指标,然后让你按部就班的执行下去。

本页左侧下方,是我列出的,根据不同的操作系统(Linux、Solaris等)衍生的USE方法列表。(译者注:可以参考原文链接)

我列出了为不同的操作系统而衍生的USE方法列表供大家参考,你们可以根据你的环境来为你的站点服务,选择适合的附加监控指标。

通过这个工具,可以很方便的筛选出适合不同的系统的建议 metrics:Rosetta(http://www.brendangregg.com/USEmethod/use-rosetta.html)

Intro(Introduction)

如果你遇到一个很严重的性能问题升级的时候,并且你不能确定它是否由服务导致的,这时候你该怎么办?

我们都说万事开头难。所以我开发出了 USE方法,来帮助大家,如何去快速的解决常见的性能问题,而同时又不容易忽略重要的地方。

USE方法在设计之初就定位了简洁、明了、完整、快速的特性,就好像一本航天手册的紧急事项列表那样。(译者注:航天手册,介绍包括不限于飞机的各种特性、指标、性能等,用于帮助飞行学员学习驾驶飞机,或者是帮助那些希望提高他们的飞行潜能和航空知识的人了解的更全面)。

USE方法已经在不同的企业、课堂(作为学习工具)以及最近的云计算等场景中,被成功应用了无数次。

USE方法基于 3+1 模型(三种指标类型+一种策略),来切入一个复杂的系统。我发现它仅仅发挥了 5% 的力量,就解决了大概 80% 的服务器问题,并且正如我将证明的,它除了服务器以外,也同样适应于各种系统。

它应当被理解为一种工具,一种很大的方法工具箱里面的工具。不过,它目前仍然还有很多问题类型以待解决,还需要点其他方法和更多的时间。

Summary

USE方法可以概括为:检查所有的资源的利用率,饱和度,和错误信息。

我们期望大家能尽早使用USE方法去做性能检查,或者是用它确定系统的瓶颈。

名词定义:

  • 资源: 服务器功能性的物理组成硬件(CPU, 硬盘, 总线 )
  • 利用率: 资源执行某工作的平均时间
  • 饱和: 衡量资源超载工作的程度,往往会被塞入队列
  • 错误: 错误事件的数量

分析软件资源,或者是软件的强制性限制(资源控制)也是很有用的,同时要关注哪些指标是处于正常的可接受范围之内的。这些指标通常用以下术语表示:

  • 利用率: 以一个时间段内的百分比来表示,例如:一个硬盘以90%的利用率运行
  • 饱和度: 一个队列的长度,例如:CPUs平均的运行时队列长度是4
  • 错误(数): 可度量的数量,例如:这个网络接口有50次(超时?)

我们应该要调查那些错误,因为它们会降低系统的性能,并且当故障模型处于可回复模式的时候,它可能不会立刻被发现。

这包括了那些失败和重试等操作,以及那些来自无效设备池的失效设备。

低利用率是否意味着未饱和?

即使在很长一段时间内利用率很低,一个爆发增长的高利用率,也会导致饱和 and 性能问题,这点要理解起来可能有违三观!

我举个例子,有一位客户遇到的问题,即使他们的监控工具显示 CPU使用率从来没有超出过 80%,但是 CPU饱和度依然有问题(延迟)监控工具报告了 5分钟的平均值,而其中,CPU利用率曾在数秒内高达 100% 。

资源列表

下面来看如何开始使用。

准备工作时, 你需要一个资源列表来按步就班的去做。 下面是一个服务器的通用列表:

  • CPUs: sockets, cores, hardware threads (virtual CPUs)
  • 内存: 容量
  • 网络接口
  • 存储设备: I/O, 容量
  • 控制器: 存储, 网卡
  • 通道: CPUs, memory, I/O

有些组件分两种类型的资源:存储设备是服务请求资源(I / O)以及容量资源(population), 两种类型都可能成为系统瓶颈。 请求资源可以定义为队列系统,可以将请求先存入排队然后再消化请求。

有些物理组件已被省略,例如硬件缓存(例如,MMU TLB / TSB,CPU)。

USE方法对于在高利用率或高饱和度下,遭受性能退化、导致瓶颈的资源最有效,在高利用率下缓存可以提高性能。

在使用USE方法排除系统的瓶颈问题之后 ,你可以检查缓存利用率和其他的性能属性。

如果你不确认要不要监控某一个资源时,不要犹豫,监控它,然后你就能看到那些指标工作的有多么的棒。

功能模块示意图

另外一种迭代资源的方法,是找到或者绘制一张系统的功能模块示意图。

这些显示了模块关系的图,在你查找数据流的瓶颈的时候是非常有用的,这里有一张Sun Fire V480 Guide (page 82)的例图:

我喜欢这些图表,尽管制作出它很难。 不过,由硬件工程师来画这张图是最适合的-他们最善于做这类事。如果不信的话你可以自己试试。

在确定各种总线的利用率的同时,为每个总线的功能图表,注释好它的最大带宽。这样我们就能在进行单次测量之前,得到能将系统瓶颈识别出来的图表。

Interconnects

CPU,内存和I / O interconnects 往往被忽略。 幸运的是,它们并不会频繁地成为系统的瓶颈。 不幸的是,如果它们真的频繁的成为瓶颈,我们能做的很少(也许你可以升级主板,或减少load:例如,“zero copy”项目减轻内存总线load)。

使用USE方法,至少你会意识到你没有考虑过的内容:interconnect性能。 有关使用USE方法确定的互连问题的示例,请参阅分析Analyzing the HyperTransport。

Metrics

给定资源列表,识别指标类型:利用率,饱和度和错误指标。这里有几个示例。看下面的table,思考下每个资源和指标类型,metric列是一些通用的Unix/Linux的术语提示(你可以描述的更具体些):

resource type metric
CPU utilization CPU utilization (either per-CPU or a system-wide average)
CPU saturation run-queue length or scheduler latency(aka
Memory capacity utilization available free memory (system-wide)
Memory capacity saturation anonymous paging or thread swapping (maybe “page scanning” too)
Network interface utilization RX/TX throughput / max bandwidth
Storage device I/O utilization device busy percent
Storage device I/O saturation wait queue length
Storage device I/O errors device errors (“soft”, “hard”, …)

这些指标是每段间隔或者计数的平均值,作为你的自定义清单,要包括使用的监控软件,以及要查看的统计信息。如果是不可用的指标,可以打个问号。最后,你会完成一个完事的、简单、易读的 metrics 清单.

Harder Metrics

再来看几个硬件指标的组合

resource type metric
CPU errors eg, correctable CPU cache ECC events or faulted CPUs (if the OS+HW supports that)
Memory capacity errors
Network saturation
Storage controller utilization
CPU interconnect utilization
Memory interconnect saturation
I/O interconnect utilization

这些依赖于操作系统的指标一般会更难测量些, 而我通常要用自己写的软件去收集这些指标。

重复所有的组合,并附上获取每个指标的说明,你会完成一个大概有30项指标的列表,其中有些是不能被测量的,还有些是难以测量的。

幸运的是,最常见的问题往往是简单的(例如,CPU饱和度,内存容量饱和度,网络接口利用率,磁盘利用率),这类问题往往第一时间就能被检查出来。

本文的顶部,pic-1中的 example checklists 可作为参考。

In Practice

读取系统的所有组合指标,是非常耗时的,特别是当你开始使用总线和interconnect 指标的情况下。

现在我们可以稍微解放下了,USE方法可以让你了解你没有检查的部分,你可以只有关注其中几项的时间例如:CPUs, 内存容量, 存储容易, 存储设备 I/O, 网络接口等。通过USE方法,那些以前未知的未知指标现在变成了已知的未知指标(我理解为,以前我们不知道有哪些指标会有什么样的数据,现在起码能知道我们应该要关注哪些指标)。

如果将来定位一个性能问题的根本原因,对你的公司至关重要的时候,你至少已经有一个明确的、经过验证的列表,来辅助你进行更彻底的分析,请完成适合你自己的USE方法,有备无患。

希望随着时间的推移,易于检查的指标能得以增长,因为被添加到系统的metrics 越多,会使USE方法将更容易(发挥它的力量)。 性能监视软件也可以帮上忙,添加USE方法向导to do the work for you(do what work? ) 。

Software Resources

有些软件资源可以用类似的方式去分析。 这通常适用于软件的较小组件,而不是整个应用程序。 例如:

  • 互斥锁(mutex locks): 利用率可以定义为锁等待耗时;饱和率定义为等待这把锁的线程个数。
  • 线程池: 利用率可以定义为线程工作的时长;饱和率是等待线程池分配的请求数量。
  • 进程/线程 容量: 系统是有进程或线程的上限的,它的实际使用情况被定义为利用率;等待数量定义为饱和度;错误即是(资源)分配失败的情况(比如无法fork)。(译注:fork是一个现有进程,通过调用fork函数创建一个新进程的过程)
  • 文件描述符容量(file descriptor capacity): 和上述类似,但是把资源替换成文件描述符。

如果这几个指标很管用就一直用,要不然软件问题会被遗留给其他方法了(例如,延迟,后文会提到其他方法:other methodologies )。

Suggested Interpretations

USE方法帮助你定位要使用哪些指标。 在学习了如何从操作系统中读取到这些指标后,你的下一步工作就是诠释它们的值。对于有的人来说, 这些诠释可能是很清晰的(因为他们可能很早就学习过,或者是做过笔记)。而其他并不那么明了的人,可能取决于系统负载的要求或期望 。

下面是一些解释指标类型的通用建议:

  • Utilization: 利用率通常象征瓶颈(检查饱和度可以进一步确认)。高利用率可能开始导致若干问题:
  • 对利用率进行长期观察时(几秒或几分钟),通常来说 70% 的利用率会掩盖掉瞬时的 100% 利用率。
  • 某些系统资源,比如硬盘,就算是高优先级请求来了,也不会在操作进行中被中断。当他们的利用率到 70% 时候,队列系统中的等待已经非常频繁和明显。而 CPU 则不一样,它能在大部分情况下被中断。
  • Saturation: 任何非 0 的饱和度都可能是问题。它们通常是队列中排队的时间或排队的长度。
  • Errors: 只要有一条错误,就值得去检查,特别是当错误持续发生从而导致性能降低时候。

要说明负面情况很容易:利用率低,不饱和,没有错误。 这比听起来更有用 - 缩小调查范围可以快速定位问题区域。

Cloud Computing

在云计算环境中,软件资源控制可能是为了限制 使用共享计算服务的tenants 的流量 。在Joyent公司,我们主要使用操作系统虚拟化(SmartOS),它强加了内存限制,CPU限制和存储I / O限制。 所有这些资源限制,都可以使用USE Method进行检查,类似于检查物理资源。

例如,在我们的环境中,”内存容量利用率”可以是 tenants 的内存使用率 vs 它的内存上限 。即使传统的Unix页面扫描程序可能处于空闲状态,也可以通过匿名页面活动看到”内存容量饱和度”。

Strategy

下面是用流程图 的方式画了USE方法的示意图。 请注意,错误检查优先于利用率和饱和度检查(因为通常错误更快的表现出来,并更容易解释)。

USE方法定位到的问题, 可能是系统瓶颈。 不幸的是,系统可能会遇到多个性能问题,因此您发现的第一个可能的问题最终却不是个问题。 发现的每个问题都可以用方法持续的挖掘,然后继续使用 USE 方法对更多资源进行反复排查。

进一步分析的策略包括工作量特征和 drill-down 分析。 完成这些后,你应该有依据据能判断,纠正措施是要调整应用的负载或调整资源本身。

Apollo

(译者注:Apollo 这一段我们可以不太关注,它主要是讲 USE 方法,与阿波罗登月计划相关的系统设计的一些渊源)

我之前有提到过,USE方法可以被应用到除服务器之外。为了找到一个有趣的例子, 我想到了一个我没有完全不了解的系统,并且不知道从哪里开始:阿波罗月球模块指导系统。USE 方法提供了一个简单的流程来尝试第一步是寻找一个资源列表,或者更理想的话,找到一个功能模块图表。我在 【Lunar Module - LM10 Through LM14 Familiarization Manual】中发现了以下内容:

这些组件中的一部分可能未表现出利用率或饱和度特性。在迭代后,就可以重新绘制只包含相关组件的图表(还可以包括:“可擦除存储”部分的内存,”核心区域 “ 和 “ vac区域 “ 寄存器)。

我将从阿波罗主脑(AGC)本身开始。 对于每个指标,我浏览了各种LM文档,看看哪些是合理的(有意义的):

  • AGC utilization: This could be defined as the number of CPU cycles doing jobs (not the “DUMMY JOB”) divided by the clock rate (2.048 MHz). This metric appears to have been well understood at the time.
  • AGC saturation: This could be defined as the number of jobs in the “core set area”, which are seven sets of registers to store program state. These allow a job to be suspended (by the “EXECUTIVE” program - what we’d call
    a “kernel” these days) if an interrupt for a higher priority job arrives. Once exhausted, this moves from a saturation state to an error state, and the AGC reports a 1202 “EXECUTIVE OVERFLOW-NO CORE SETS” alarm.
  • AGC errors: Many alarms are defined. Apart from 1202, there is also a 1203 alarm “WAITLIST OVERFLOW-TOO MANY TASKS”, which is a performance issue of a different type: too many timed tasks are being processed before returning
    to normal job scheduling. As with 1202, it could be useful to define a saturation metric that was the length of the WAITLIST, so that saturation can be measured before the overflow and error occurs.

其中的一些细节,可能对于太空爱好者来说是非常熟悉的:在阿波罗11号降落的时候发生的著名的 1201(”NO VAC AREAS”)和1202警报。(”VAC”是向量加速器的缩写,用于处理vector quantities作业的额外存储; 我觉得wikipadia上将 “向量”描述为”空”可能是错误的)。

鉴于阿波罗11号的1201警报,可以继续使用其他方法分析,如工作负载表征。 工作负载很多可以在功能图中看到,大多数工作负载是通过中断来生效的。 包括用于跟踪命令模块的会合雷达,即使LM正在下降,该模块也仍然在执行中断AGC(阿波罗主脑)的任务。 这是发现非必要工作的一个例子(或低优先级的工作; 雷达的一些更新可能是可取的,因此 LM AGC可以立即计算出中止路径)。

作为一个更深的例子,我将把会合雷达当作资源去检查. 错误最容易识别。 有三种信号类型: “DATA NO GOOD”, “NO TRACK”, and “SHAFT- AND TRUNNION-AXIS ERROR”。

在有某一小段时间里,我不知道能从哪里开始使用这个方法, 去寻找和研究具体的指标。

Other Methodologies

虽然USE方法可能会发现80%的服务器问题,但基于延迟的方法(例如Method R)可以找到所有的问题。 不过,如果你不熟悉软件内部结构,Method R 就有可能需要花费更多时间。 它们可能更适合已经熟悉它的数据库管理员或应用程序开发人员。

而USE方法的职责和专长包括操作系统(OS)和硬件,它更适合初级或高级系统管理员,当需要快速检查系统健康时,也可以由其他人员使用。

Tools Method

以下介绍一个基于工具的方法流程(我称它作”工具方法”),与USE方法作比较:

  1. 列出可用的性能工具(可以选择性安装或购买其他的)。
  1. 列出每个工具提供的有用的指标
  1. 列出每个工具可能的解释规则

按照这个方法做完后,将得到一个符合标准的清单,它告诉我们要运行的工具,要关注的指标以及如何解释它们。 虽然这相当有效,但有一个问题,它完全依赖于可用(或已知的)的,可以提供系统的不完整视图的工具。 用户也不知道他们得到的是一张不完整的视图 - 所以问题将仍然存在。

而如果使用USE方法,不同的是,USE方法将通过迭代系统资源的方式,来创建一个完整的待确认问题列表,然后搜索工具来回答这些问题。这样构建了一张更完整的视图,未知的部分被记录下来,它们的存在被感知(这一句我理解成前文中提到的:未知 的未知变为已知的未知)。 基于USE,同样可以开发一个清单类似于工具方法(Tool-Method),显示要运行的工具(可用的位置),要关注的指标以及如何解释它。

另一个问题是,工具方法在遍历大量的工具时,将会使寻找瓶颈的任务性能得到分散。而USE方法提供了一种策略,即使是超多的可用工具和指标,也能有效地查找瓶颈和错误。

Conclusion

USE方法是一个简单的,能执行完整的系统健康检查的策略,它可以识别常见的系统瓶颈和错误。它可以在调查的早期部署并快速定位问题范围,如果需要的话,还可以进一步通过其他方法进行更详细的研究。

我在这个篇幅上,解释了USE方法并且提供了通用的指标案例,请参阅左侧导航面板中对应操作系统的示例清单,其建议了应用USE方法的工具和指标。另请参阅基于线程的补充方法,TSA Method。

Acknowledgments

  • 感谢Cary Millsap and Jeff Holt (2003) 在”优化Oracle性能”一文中提到的 Method R方法 (以及其他方法), 使我有了灵感,我应该要把这个方法论写出来。
  • 感谢Sun Microsystems的组织,包括PAE和ISV, 他们将USE方法(那时还没命名)应用于他们的存储设备系列,绘制了标注指标和总线速度的ASCII功能块图表 - 这些都比您想象的要困难(我们应该早些时候询问硬件团队的帮助)。
  • 感谢我的学生们,多年前我授予他们这个方法论,谢谢他们提供给我的使用反馈。
  • 感谢Virtual AGC项目组(The Virtual AGC project),读他们的站点 ibiblio.org 上的文档库,就象是一种娱乐. 尤其是LMA790-2 “Lunar Module LM-10 Through LM-14 Vehicle Familiarization Manual” (48页有功能模块图表), 以及 “阿波罗指导和月球导航模块入门学习指南”, 都很好的解释了执行程序和它的流程图
    (These docs are 109 and 9 Mbytes in size.)
  • 感谢Deirdré Straughan 编辑和提供反馈,这提高了我的认知。
  • 文章顶部的图片,是来自于波音707手册,1969出版。它不是完整的,点击查看完整的版本(译注:为方便阅读,就是下面这张:)

Updates

USE Method updates:(略)

  • It was published in ACMQ as Thinking Methodically about Performance (2012).
  • It was also published in Communications of the ACM as Thinking Methodically about Performance (2013).
  • I presented it in the FISL13 talk The USE Method (2012).
  • I spoke about it at Oaktable World 2012: video, PDF.
  • I included it in the USENIX LISA `12 talk Performance Analysis Methodology.
  • It is covered in my book on Systems Performance, published by Prentice Hall (2013).

More updates (Apr 2014):

  • LuceraHQ are implementing USE Method metrics on SmartOS for performance monitoring of their high performance financial cloud.
  • LuceraHQ正在SmartOS上,为他们高性能金融云的性能监测,实施USE 方法指标
  • I spoke about the USE Method for OS X at MacIT 2014 (slides)。

技术沙龙推荐

点击下方图片即可阅读

从ELK到EFK

最终版 | 深度学习之概述(Overview)

翻译 | 关键CSS和Webpack: 减少阻塞渲染的CSS的自动化解决方案

推荐系统那些事儿

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

docker+jenkins+seneca构建去集中化微服务

发表于 2017-10-17

前言

在微服务架构中,服务发现一直是一件比较复杂的事。而且服务发现式的架构处理不好,容易产生集中化。同时,微服务的提供,不可避免的需要一些负载均衡方案,实现服务的高可用和可扩展,这无疑增加了很多复杂度。

笔者认为,使用异步、基于消息的方式,可能更适合微服务架构。

基于消息的微服务架构,对于所有微服务的部署条件非常简单,只需要能访问到消息服务即可。同时微服务节点的移除和增加不会影响到服务的提供。相比服务发现的架构,简单太多了,简单即是美。

在这次实践中,使用到了seneca,一个nodejs 微服务框架。seneca,使用seneca-amqp-transport插件,可以轻松构建基于消息的微服务。

下面是架构图:

www.processon.com/view/link/5…
在这个架构中,我们使用的是标准的seneca定义的命令规范,这可能是所有微服务都需要遵守的一个规范,至于说使用其他语言,也很简单。封装一个seneca命令规范的库即可。不知道官方有没开发,开发起来难度也不会太大。

接口层比较灵活,可以根据上层应用特性,来决定如何封装传输协议,最后将转化成标准命令发送到消息服务。不建议直接访问消息服务,上层应用应保持灵活。

完整的实践代码:github.com/luaxlou/mic…

1 前期准备

使用docker-machine创建虚拟机。

关于docker的一些基本用法,可以读上一篇文章:docker+consul基于服务发现的极简web架构实践,这里就不再赘述。

依次创建3台虚拟机:

1
2
3
powershell复制代码$ dm create -d "virtualbox” node1
$ dm create -d "virtualbox” node2
$ dm create -d "virtualbox" node3

2 开始构建

搭建Rabbitmq消息服务

消息队列服务,已经成为高并发应用的必备基础服务。我们选用Rabbitmq,你可以换成任意的,遵循amqp协议即可。

使用docker安装很方便,但是生产环境不建议使用docker安装。更推荐的是使用云服务,这样能保证足够高的高可用和扩展性。虽然价格贵点,但是这是唯一的单点,花点钱还是值得的。

直接安装在宿主机上:

1
2
3
4
5
6
7
8
9
10
11
bash复制代码$ docker search rabbitmq

NAME DESCRIPTION STARS OFFICIAL AUTOMATED
rabbitmq RabbitMQ is an open source multi-protocol ... 1466 [OK]
tutum/rabbitmq Base docker image to run a RabbitMQ server 11
frodenas/rabbitmq A Docker Image for RabbitMQ 11 [OK]
sysrun/rpi-rabbitmq RabbitMQ Container for the Raspberry Pi 2 ... 6
aweber/rabbitmq-autocluster RabbitMQ with the Autocluster Plugin 5
gonkulatorlabs/rabbitmq DEPRECATED: See maryville/rabbitmq 5 [OK]
letsxo/rabbitmq RabbitMQ with Management and MQTT plugins. 4 [OK]
bitnami/rabbitmq Bitnami Docker Image for RabbitMQ 3 [OK]
1
applescript复制代码$ docker run -d --name rabbit -p   5672:5672  rabbitmq

这样就启动了一个消息队里服务,并且开放5672端口

安装jenkins

jenkins用于自动集成,不然每次构建是个很麻烦的事。

下面的实践是笔者掉了不少坑之后完成的,jenkins在安装过程中会有不少麻烦,而且在mac下安装也会遇到麻烦。

将jenkins 安装到 node1

1
2
3
4
5
6
7
8
9
awk复制代码$ dm ssh node1

$ mkdir /mnt/sda1/var/jenkins_home
$ sudo chown 1000 /mnt/sda1/var/jenkins_home
$ sudo chown 1000 /var/run/docker.sock

$ docker run -d -v /var/run/docker.sock:/var/run/docker.sock \
-v /mnt/sda1/var/jenkins_home:/var/jenkins_home \
-v $(which docker):/usr/bin/docker -p 8080:8080 jenkins

查看初始密码: $ cat /mnt/sda1/var/jenkins_home/secrets/initialAdminPassword

安装私有的Registry

在mac上安装即可

1
routeros复制代码$ docker run -d -p 5000:5000 registry

文档参考:docs.docker.com/registry/sp…

准备代码

代码使用的是seneca官方的例子,完整的Dockerfile也已经写好。

1
2
3
4
5
6
7
8
9
10
dockerfile复制代码FROM node:alpine

RUN npm install pm2 -g
WORKDIR /usr/src/app

COPY package.json ./
RUN npm install
COPY . .

CMD ["pm2-docker","process.yml"]

为了让nodejs能使用到多核cpu,Dockerfile 集成了pm2,使用pm2来管理node进程。

完整代码:
github.com/luaxlou/mic…

配置自动集成

这里使用了最新版的jenkins,新版的jenkins使用了pipline。一种新的构建方式,使用groovy语法。

写起来是挺优雅的,但是学习成本颇高。因为文档不全及有些文档失效,笔者不得已反编译了pipeline插件,才得以调通。

使用pipeline script

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
javascript复制代码node {
stage('Preparation') {
def r = git('https://github.com/luaxlou/micro-service-practice.git')
}
stage('Build') {
dir('seneca-listener') {
withEnv(["DOCKER_REGISTRY_URL=http://192.168.99.1:5000"]) {

docker.build("seneca-listener").push("latest")

}

}

}

}

开始构建,顺利的话,会看到如下的结果:

image.png

image.png

这是pipeline的特性,可以可视化看到各个阶段的执行情况,算是不小的进步吧。

访问私有Registy的API,就可以看到生成的tag。

curl http://192.168.99.1:5000/v2/seneca-listener/tags/list

最后一步,试试我们的程序

在宿主机发布消息:

1
awk复制代码$ git clone https://github.com/luaxlou/micro-service-practice.git

seneca-clinet 代码是接口层代码的示意,可以根据自己的喜好封装。 同时直接发送了命令代码用于测试。

进入seneca-clinet 目录

1
crmsh复制代码$  AMQP_URL=192.168.99.1:5672 node index.js

这个程序会每隔两秒发送一个命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
javascript复制代码#!/usr/bin/env node
'use strict';

const client = require('seneca')()
.use('seneca-amqp-transport')
.client({
type: 'amqp',
pin: 'cmd:salute',
url: process.env.AMQP_URL
});

setInterval(function() {
client.act('cmd:salute', {
name: 'World',
max: 100,
min: 25
}, (err, res) => {
if (err) {
throw err;
}
console.log(res);
});
}, 2000);

虽然一直在发命令,你很快就会发现命令全部超时了。这是因为还没有消费者,当然这些命令也没有丢失,只不过接口层没有得到及时返回。如果应用层支持异步的模式,每个command都有独立的id,可以保留id后,以后再过来取。这就很灵活了,一切看需求去封装接口层即可。

进入node2

1
2
dockerfile复制代码$ docker run 192.168.99.1:5000/seneca-listener:latest
0|seneca-l | {"kind":"notice","notice":"hello seneca fwunhukrcmzn/1507605332382/16/3.4.2/-","level":"info","seneca":"fwunhukrcmzn/1507605332382/16/3.4.2/-","when":1507605332661}

启动后,回到seneca-clinet,发现之前超时的命令,全部接收到了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
yaml复制代码{ id: 86,
message: 'Hello World!',
from: { pid: 16, file: 'index.js' },
now: 1507605332699 }
{ id: 44,
message: 'Hello World!',
from: { pid: 16, file: 'index.js' },
now: 1507605332701 }
{ id: 56,
message: 'Hello World!',
from: { pid: 16, file: 'index.js' },
now: 1507605332703 }
{ id: 57,
message: 'Hello World!',
from: { pid: 16, file: 'index.js' },
now: 1507605332706 }
{ id: 58,
message: 'Hello World!',
from: { pid: 16, file: 'index.js' },
now: 1507605332707 }

至此,完整架构已经构建完毕。

一些未完的事项

1.自动集成,只需要配置webhook即可。
2.自动部署,因为docker运转的方式,当服务升级时需要重启docker进程。方式有很多,比较粗暴的是直接控制宿主机,或者类似salt这样的工具。
目前来说,没有找到太好的开源方案。个人倾向于自己开发agent,发布有限的API,用于常规的部署或者其他任务,以及可以定时收集服务器的信息,用于监控。这可能会是笔者的下一个开源项目。

总结

这篇文章算是一个新的里程碑,实践的成果将用于后期的架构。docker让我从传统的架构模式中脱离出来,同时也让我吃了不少苦头。但这一切都是值得的。

同时也是一个新的开始,终于从之前的公司出来。未来何去何从,有很多的未知,但我相信都是美好的。

这也许就是人生的魅力。

Hello World!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Scala 技术周刊 第 23 期

发表于 2017-10-15

这里有最新的 Scala 社区动态、技术博文。
微信搜索 「scalacool」关注我们,及时获取最新资讯。


深度阅读

  • Getting Started with Elastic4s programming in Scala
    Elastic4s 介绍
  • “Bootstrapping the Web with Scala Native” by Richard Whaling
    Scala Native 在 Web 方面的应用
  • Solving Dynamic Programming problems using Functional Programming (Part 2)
    函数式编程
  • We need a good name for Scala programmers. Scala doesn’t have a mascot and it’s not the most punnable name.
    为 Scala 程序员征集名字
  • Akka Typed: New Cluster Tools API
    Akka Typed

一周速递

  • Play 2.6.6 发布
  • Lagom 1.3.9 发布

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java Stream的并行实现

发表于 2017-10-15

并行与并发

关于并发与并行,需要弄清楚的是,并行关注于多个任务同时进行,而并发则通过调度来不停的切换多个任务执行,而实质上多个任务不是同时执的。并发,英文单词为:Concurrent。并行的英文单词为:parallel。如果想对并发和并行有一个比较直观的认识,可以参考下面这张图片:

并行与并发

并行与并发

Fork/Join 框架与 Java Stream API

Fork/Join框架属于并行框架,关于Fork/Join框架的一些内容,可以参考这篇文章:Java Fork/Join并行框架。简单来说,Fork/Join框架可以将大的任务切分为足够小的任务,然后将小任务分配给不同的线程来执行,而线程之间通过工作窃取算法来协调资源,提前昨晚任务的线程可以去“窃取”其他还没有做完任务的线程的任务,而每一个线程都会持有一个双端队列,里面存储着分配给自己的任务,Fork/Join框架在实现上,为了防止线程之间的竞争,线程在消费分配给自己的任务时,是从队列头取任务的,而“窃取”线程则从队列尾部取任务。
Fork/Join框架通过fork方法来分割大任务,通过使用join来获取小任务的结果,然后组合成大任务的结果。关于Fork/Join任务模型,可以参考下面的图片:

Fork/Join的任务模型

Fork/Join的任务模型

关于Java Stream API的相关内容,可以参考该文章:Java Streams API。

Stream在实现上使用了Fork/Join框架来实现并发,所以使用Stream我们可以在不知不觉间就使得我们的程序跑得飞快,究其原因就是Stream使用了Fork/Join并发框架来处理任务,当然,你需要显示的指定Stream为parallel,否则Stream默认都是串行流。比如对于Collection,你可以使用parallelStream来转换为一个并发流,或者使用stream方法转换为串行流,然后使用parallel操作使得串行流变为并发流。本文的重点是剖析Stream是如何使用Fork/Join来做并发的。

Stream的并发实现细节

在了解了Fork/Join并发框架和Java Stream之后,首要的问题就是:Stream是如何使用Fork/Join框架来做到并发的?其实对于使用者来说,了解Stream就是通过Fork/Join框架来做的就好了,但是如果想要深入了解一下Fork/Join框架的实践,以及Java Stream的设计方法,那么去读一下实现的源码还是很有必要的,下文中的分析仅代表个人观点!

需要注意的一点是,Java Stream的操作分为两类,也可以分为三类,具体的细节可以参考该文章:Java Streams API。一个简单的判断一个操作是否是Terminal操作还是Intermediate操作的方法是,如果操作返回的是一个新的Stream,那么就是一个Intermediate操作,否则就是一个Terminal操作。

  • Intermediate:一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据操作,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
  • Terminal:一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。
  • 还有一种操作被称为 short-circuiting。用以指:
+ 对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的 Stream,但返回一个 有限的新 Stream。
+ 对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。

Java Stream对四种类型的Terminal操作使用了Fork/Join实现了并发操作,下面的图片展示了这四种操作类型:

支持并行的四种Stream操作

支持并行的四种Stream操作

我们首先来走一遍Stream操作的执行路径,下面的代码是我们想要做的操作流,下文会根据该代码示例来跟踪Stream的执行路径:

1
2
3
4
复制代码        Stream.of(1,2,3,4)
.parallel()
.map(n -> n*2)
.collect(Collectors.toCollection(ArrayList::new));

解释一下,上面的代码想要实现的功能是将(1,2,3,4)这四个数字每一个都变为其自身的两倍,然后收集这些元素到一个ArrayList中返回。这是一个非常简单的功能,下面是上面的操作流的执行路径:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
复制代码
step 1:

public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}

step 2:

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}

step 3:

public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
...
container = evaluate(ReduceOps.makeRef(collector));
...
}

step 4:

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;

return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

step 5:

使用Fork/Join框架执行操作。

上面的五个步骤是经过一些省略的,需要注意的一点是,intermediate类型的操作仅仅将操作加到一个upstream里面,具体的原文描述如下:

1
2
复制代码
Construct a new Stream by appending a stateless intermediate operation to an existing stream.

比如上面我们的操作中的map操作,实际上只是将操作加到一个intermediate链条上面,不会立刻执行。重点是第五步,Stream是如何使用Fork/Join来实现并发的。evaluate这个方法至关重要,在方法里面会分开处理,对于设置了并发标志的操作流,会使用Fork/Join来并发执行操作任务,而对于没有打开并发标志的操作流,则串行执行操作。

Fork/Join框架的核心方法是一个叫做compute的方法,下面分析一个forEach操作如何通过Fork/Join框架来实现并发,通过追踪代码,可以发现forEach的并发版本其实是一个交由一个ForEachTask对象来做,而ForEachTask类中实现了compute方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
复制代码// Similar to AbstractTask but doesn't need to track child tasks
public void compute() {
Spliterator<S> rightSplit = spliterator, leftSplit;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
boolean forkRight = false;
Sink<S> taskSink = sink;
ForEachTask<S, T> task = this;
while (!isShortCircuit || !taskSink.cancellationRequested()) {
if (sizeEstimate <= sizeThreshold ||
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
}
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
task.addToPendingCount(1);
ForEachTask<S, T> taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
taskToFork = task;
task = leftTask;
}
else {
forkRight = true;
taskToFork = leftTask;
}
taskToFork.fork();
sizeEstimate = rightSplit.estimateSize();
}
task.spliterator = null;
task.propagateCompletion();
}
}

在上面的代码中将大任务拆成成了小任务,那哪里收集了这些小任务呢?看下面的代码:

1
2
3
4
5
6
7
8
9
复制代码        @Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}

可以看到调用了invoke方法,而对invoke的描述如下:

1
2
3
4
复制代码     * Commences performing this task, awaits its completion if
* necessary, and returns its result, or throws an (unchecked)
* {@code RuntimeException} or {@code Error} if the underlying
* computation did so.

不是说Fork/Join框架嘛?那有了fork为什么没有join而是invoke呢?下面是对join方法的描述:

1
2
3
4
5
6
7
8
复制代码
* Returns the result of the computation when it {@link #isDone is
* done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
* {@code Error}, not {@code ExecutionException}, and that
* interrupts of the calling thread do <em>not</em> cause the
* method to abruptly return by throwing {@code
* InterruptedException}.

根据join的描述,我们知道还可以使用get方法来获取结果,但是get方法会抛出异常而join和invoke方法都不会抛出异常,而是将异常报告给ForkJoinTask,让ForkJoinTask来抛出异常。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Netty线程模型及EventLoop详解

发表于 2017-10-15

线程模型与并发

什么是线程模型呢?线程模型指定了线程管理的模型。在进行并发编程的过程中,我们需要小心的处理多个线程之间的同步关系,而一个好的线程模型可以大大减少管理多个线程的成本。在阅读本文之前,你可以选择性的阅读下面列出的文章,来快速了解和回顾java中的并发编程内容:

  • Java线程池详解(一)
  • Java线程池详解(二)
  • Java调度线程池ScheduleExecutorService
  • Java调度线程池ScheduleExecutorService(续)
  • Java中的ThreadLocal和 InheritableThreadLocal
  • Java AQS
  • Java可重入锁详解

Reactor线程模型

Reactor是一种经典的线程模型,Reactor线程模型分为单线程模型、多线程模型以及主从多线程模型。下面分别分析一下各个Reactor线程模型的优缺点。首先是Reactor单线程模型,下面的图片展示了这个线程模型的结构:

Reactor单线程模型

Reactor单线程模型

Reactor单线程模型仅使用一个线程来处理所有的事情,包括客户端的连接和到服务器的连接,以及所有连接产生的读写事件,这种线程模型需要使用异步非阻塞I/O,使得每一个操作都不会发生阻塞,Handler为具体的处理事件的处理器,而Acceptor为连接的接收者,作为服务端接收来自客户端的链接请求。这样的线程模型理论上可以仅仅使用一个线程就完成所有的事件处理,显得线程的利用率非常高,而且因为只有一个线程在工作,所有不会产生在多线程环境下会发生的各种多线程之间的并发问题,架构简单明了,线程模型的简单性决定了线程管理工作的简单性。但是这样的线程模型存在很多不足,比如:

  • 仅利用一个线程来处理事件,对于目前普遍多核心的机器来说太过浪费资源
  • 一个线程同时处理N个连接,管理起来较为复杂,而且性能也无法得到保证,这是以线程管理的简洁换取来的事件管理的复杂性,而且是在性能无 法得到保证的前提下换取的,在大流量的应用场景下根本没有实用性
  • 根据第二条,当处理的这个线程负载过重之后,处理速度会变慢,会有大量的事件堆积,甚至超时,而超时的情况下,客户端往往会重新发送请求,这样的情况下,这个单线程的模型就会成为整个系统的瓶颈
  • 单线程模型的一个致命缺钱就是可靠性问题,因为仅有一个线程在工作,如果这个线程出错了无法正常执行任务了,那么整个系统就会停止响应,也就是系统会因为这个单线程模型而变得不可用,这在绝大部分场景(所有)下是不允许出现的

介于上面的种种缺陷,Reactor演变出了第二种模型,也就是Reactor多线程模型,下面展示了这种模型:

Reactor多线程模型

Reactor多线程模型

可以发现,多线程模型下,接收链接和处理请求作为两部分分离了,而Acceptor使用单独的线程来接收请求,做好准备后就交给事件处理的handler来处理,而handler使用了一个线程池来实现,这个线程池可以使用Executor框架实现的线程池来实现,所以,一个连接会交给一个handler线程来复杂其上面的所有事件,需要注意,一个连接只会由一个线程来处理,而多个连接可能会由一个handler线程来处理,关键在于一个连接上的所有事件都只会由一个线程来处理,这样的好处就是消除了不必要的并发同步的麻烦。Reactor多线程模型似乎已经可以很好的工作在我们的项目中了,但是还有一个问题没有解决,那就是,多线程模型下任然只有一个线程来处理客户端的连接请求,那如果这个线程挂了,那整个系统任然会变为不可用,而且,因为仅仅由一个线程来负责客户端的连接请求,如果连接之后要做一些验证之类复杂耗时操作再提交给handler线程来处理的话,就会出现性能问题。

Reactor多线程模型对Reactor单线程模型做了一些改进,但是在某些场景下任然有所缺陷,所以就有了第三种Reactor模型,Reactor主从多线程模型,下面展示了这种模型的架构:

Reactor主从多线程模型

Reactor主从多线程模型

Reactor多线程模型解决了Reactor单线程模型和Reactor多线程模型中存在的问题,解决了handler的性能问题,以及Acceptor的安全以及性能问题,Netty就使用了这种线程模型来处理事件。

Netty线程模型

在了解了线程模型以及Reactor线程模型之后,我们来看一下Netty的线程模型是怎么样的。首先,Netty使用EventLoop来处理连接上的读写事件,而一个连接上的所有请求都保证在一个EventLoop中被处理,一个EventLoop中只有一个Thread,所以也就实现了一个连接上的所有事件只会在一个线程中被执行。一个EventLoopGroup包含多个EventLoop,可以把一个EventLoop当做是Reactor线程模型中的一个线程,而一个EventLoopGroup类似于一个ExecutorService,当然,这只是为了更好的理解Netty的线程模型,它们之间是没有等价关系的,后面的分析中会详细讲到。下面的图片展示了Netty的线程模型:

Netty线程模型

Netty线程模型

首先看一下Netty服务端启动的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(your_handler_name, your_handler_instance);
}
});

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

Netty的服务端使用了两个EventLoopGroup,而第一个EventLoopGroup通常只有一个EventLoop,通常叫做bossGroup,负责客户端的连接请求,然后打开Channel,交给后面的EventLoopGroup中的一个EventLoop来负责这个Channel上的所有读写事件,一个Channel只会被一个EventLoop处理,而一个EventLoop可能会被分配给多个Channel来负责上面的事件,当然,Netty不仅支持NI/O,还支持OI/O,所以两者的EventLoop分配方式有所区别,下面分别展示了NI/O和OI/O的分配方式:

Netty NIO分配EventLoop模型

Netty NIO分配EventLoop模型

Netty OIO分配EventLoop模型

Netty OIO分配EventLoop模型

在NI/O非阻塞模式下,Netty将负责为每个Channel分配一个EventLoop,一旦一个EventLoop呗分配给了一个Channel,那么在它的整个生命周期中都使用这个EventLoop,但是多个Channel将可能共享一个EventLoop,所以和Thread相关的ThreadLocal的使用就要特别注意,因为有多个Channel在使用该Thread来处理读写时间。在阻塞IO模式下,考虑到一个Channel将会阻塞,所以不太可能将一个EventLoop共用于多个Channel之间,所以,每一个Channel都将被分配一个EventLoop,并且反过来也成立,也就是一个EventLoop将只会被绑定到一个Channel上来处理这个Channel上的读写事件。无论是非阻塞模式还是阻塞模式,一个Channel都将会保证一个Channel上的所有读写事件都只会在一个EventLoop上被处理。

Netty EventLoop

上文中分析了Reactor线程模型以及Netty的线程模型,在Netty中,EventLoop是一个极为重要的组件,它翻译过来称为事件循环,一个EventLoop将被分配给一个Channel,来负责这个Channel的整个生命周期之内的所有事件,下面来分析一下EventLoop的结构和实现细节。首先展示了EventLoop的类图:

EventLoop类图

EventLoop类图

从EventLoop的类图中可以发现,其实EventLoop继承了Java的ScheduledExecutorService,也就是调度线程池,所以,EventLoop应当有ScheduledExecutorService提供的所有功能。那为什么需要继承ScheduledExecutorService呢,也就是为什么需要延时调度功能,那是因为,在Netty中,有可能用户线程和Netty的I/O线程同时操作网络资源,而为了减少并发锁竞争,Netty将用户线程的任务包装成Netty的task,然后向Netty的I/O任务一样去执行它们。有些时候我们需要延时执行任务,或者周期性执行任务,那么就需要调度功能。这是Netty在设计上的考虑,为我们极大的简化的编程方法。

EventLoop是一个接口,它在继承了ScheduledExecutorService等多个类的同时,仅仅提供了一个方法parent,这个方法返回它属于哪个EventLoopGroup。本文只分析非阻塞模式,而阻塞模式留到未来某个合适的时候再做分析总结。在上文中展示的服务端启动的代码中我们发现我们使用的EventLoop是一个子类NioEventLoopGroup,下面就来分析一下NioEventLoopGroup这个类。首先展示一下NioEventLoopGroup的类图:

 NioEventLoopGroup类图

NioEventLoopGroup类图

可以发现,NioEventLoopGroup的实现非常的复杂,但是只要我们清楚了Netty的线程模型,我们就可以有入口去分析它的代码。首先,我们知道每个EventLoop只要一个Thread来处理事件,那我们就来找到那个Thread在什么地方。可以在SingleThreadEventExecutor类中找到thread,它的初始化在doStartThread这个方法中,而这个方法被startThread方法调用,而startThread 这个方法被execute方法调用,也就是提交任务的入口,这个方法是Executor接口的唯一方法。也就是说,所有我们通过EventLoop的execute方法提交的任务都将被这个Thread线程来执行。我们还知道一个事实,EventLoop是一个循环执行来消耗Channel事件的类,那么它必然会有一个类似循环的方法来作为任务,来提交给这个Thread来执行,而这可以在doStartThread方法中被发现,因为这个方法非常重要,所以下面展示了它的实现细节,但是去掉了一些代码来减少代码量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
复制代码
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}

try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
terminationFuture.setSuccess(null);
}
}
}
}
});
}

上面所提到的事件循环就是通过SingleThreadEventExecutor.this.run()这句话来触发的。这个run方法的具体实现在NioEventLoop中,下面展示了它的实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
复制代码
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

首先,我们来分析一下NioEventLoop的相关细节,在一个无限循环里面,只有在遇到shutdown的情况下才会停止循环。然后在循环里会询问是否有事件,如果没有,则继续循环,如果有事件,那么就开始处理时间。上文中我们提到,在事件循环中我们不仅要处理IO事件,还要处理非I/O事件。Netty中可以设置用于I/O操作和非I/O操作的时间占比,默认各位50%,也就是说,如果某次I/O操作的时间花了100ms,那么这次循环中非I/O得任务也可以花费100ms。Netty中的I/O时间处理通过processSelectedKeys方法来进行,而非I/O操作通过runAllTasks反复来进行,首先来看runAllTasks方法,虽然设定了一个可以运行的时间参数,但是实际上Netty并不保证能精确的确保非I/O任务只运行设定的毫秒,下面来看下runAllTasks的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
复制代码
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);

runTasks ++;

// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}

// 将任务运行起来
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}

可以看到,这个方法是在每运行了64个任务之后再进行比较的,如果超出了设定的运行时间则退出,否则再运行64个任务再比较。所以,Netty强烈要求不要在I/O线程中运行阻塞任务,因为阻塞任务将会阻塞住Netty的事件循环,从而造成事件堆积的现象。现在回头看处理I/O任务的processSelectedKeys方法,跟踪代码之后发现最后实际处理I/O事件的一个方法为processSelectedKey,下面展示了它的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
复制代码
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}

try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

这个方法运行的流程为:

  1. 从Channel上获取一个unsafe对象,这个对象 是用来进行NIO操作的一系列系统级API,关于Netty的Channel的深层次分析将在另外的篇章中进行
  2. 从Channel上获取了eventLoop,而这个eventLoop是什么时候分配给Channel的细节在后文中进行分析
  3. 根据事件调用底层API来处理事件

下面,我们分析一下是什么时候将一个EventLoop分配给一个Channel的,并且这个EventLoop的那个唯一的Thread是什么时候被赋值的。在这个问题上,服务端的流程和客户端的流程可能不太一样,对于服务端来说,首先需要bind一个端口,然后在进行Accept进来的连接,而客户端需要进行connect到服务端。先来分析一下服务端。

还是看上面提供的服务端的示例代码,其中启动的代码为下面这句代码:

1
2
3
复制代码
// Start the server.
ChannelFuture f = b.bind(PORT).sync();

也就是我们网络编程中的bind操作,这个操作会发生什么呢?追踪代码如下:

1
2
3
4
5
6
7
8
9
复制代码 
-> AbstractBootstrap.bind(port)
-> AbstractBootstrap.bind(address)
-> AbstractBootstrap.doBind(final SocketAddress localAddress)
-> AbstractBootstrap.initAndRegister
-> AbstractBootstrap.doBind0
-> SingleThreadEventExecutor.execute
-> SingleThreadEventExecutor.startThread()
-> SingleThreadEventExecutor.doStartThread

EventLoop在AbstractBootstrap.initAndRegister中获得了一个新的Channel,然后在AbstractBootstrap.doBind0 方法里面调用接下来的方法来初始化EventLoop的Thread的工作,并且将EventLoop的时间循环打开了,可以开始接收客户端的连接请求了。下面来分析一下客户端的流程。

一个客户端的启动代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
复制代码
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});

// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();

// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}

其中启动的关键代码为:

1
2
3
复制代码
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();

下面是connect的调用流程:

1
2
3
4
5
6
7
复制代码 -> Bootstrap.doResolveAndConnect
-> AbstractBootstrap.initAndRegister
-> Bootstrap.doResolveAndConnect0
-> Bootstrap.doConnect
-> SingleThreadEventExecutor.execute
-> SingleThreadEventExecutor.startThread()
-> SingleThreadEventExecutor.doStartThread

后半部分和服务端的启动过程是一致的,而区别在于服务端是通过bind操作来启动的,而客户端是通过connect操作来启动的。执行到此,客户端和服务端的EventLoop都已经启动起来,服务端可以接受客户端的连接并且处理Channel上的读写事件,而客户端可以去连接远程服务端来请求数据。

EventLoopGroup

到目前为止,我们已经知道了Reactor多个线程模型,并且知道了一个EventLoop会负责一个Channel的生命周期内的所有事件,并且知道了服务端和客户端是如何启动这个EventLoop得,但是还有一个问题没有解决,那就是一个EventLoop是如何被分配给一个Channel的。下文就来分析这个分配的原理和过程。而对于阻塞I/O模型的分配和非阻塞I/O模型的分配是不一样的,在上文中也提到这个内容,所以本文只分析对于非阻塞I/O模型的分配。

EventLoopGroup是用来管理EventLoop的对象,一个EventLoopGroup里面有多个EventLoop,下面展示了EventLoopGroup的类图:

EventLoopGroup类图

EventLoopGroup类图

我们从实际的代码出发来分析EventLoopGroup。上文中已经展示了客户端和服务端的启动代码,其中有类似的代码如下:

1
2
3
复制代码
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

上文中我们分析了EventLoop被启动的过程,我们肯定,EventLoop是在分配之后启动的,因为对于服务端而言,bind是一个最开始的网络操作,对于客户端来说,connect也是最开始的网络操作,在这之前是没有关于网络I/O的操作的,所以,EventLoop的分配和启动是在这两个过程或者之后的流程中进行的,但是EventLoop的分配肯定是在启动之前的,但是EventLoop的分配和启动在bind和connect中进行,那么我们可以肯定,EventLoop的分配也是在这两个方法中进行的。为了证明这个假设,回头再看一下服务端的EventLoop的启动过程,其中有一个方法值得我们注意:AbstractBootstrap.initAndRegister,我们进行了init部分的分析,而register部分我们还没有分析,下面就对服务端来进行register部分的分析,下面展示了register的调用链路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
复制代码
-> Bootstrap.doResolveAndConnect
-> AbstractBootstrap.initAndRegister
-> EventLoopGroup.register
-> MultithreadEventLoopGroup.register
-> SingleThreadEventLoop.register
-> Channel.register
-> AbstractUnsafe.register

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

最后展示了AbstractUnsafe.register这个方法,在这里初始化了一个EventLoop,需要记住的一点是,EventLoopGroup中的是EventLoop,不然在追踪代码的时候会迷失。现在来正式看一下NioEventLoopGroup这个类,它的它继承了MultithreadEventExecutorGroup这个类,而我们在初始化EventLoopGroup的时候传递进去的参数,也就是我们希望这个EventLoopGroup拥有的EventLoop数量,会在MultithreadEventExecutorGroup这个类中初始化,并且是在构造函数中初始化的,如果在new
EventLoopGroup的时候没有任何参数,那么默认的EventLoop的数量是机器CPU数量的两倍。现在我们来看一下MultithreadEventExecutorGroup这个类的一个重要的构造函数,这个构造函数初始化了EventLoopGroup的EventLoop。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
复制代码
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}

chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

一个较为重要的方法为newChild,这是初始化一个EventLoop的方法,下面是它的具体实现,假设我们使用NioEventLoop:

1
2
3
4
5
复制代码
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

我们现在知道了EventLoopGroup管理着很多的EventLoop,上文中我们仅仅分析了分配的流程,但是分配的策略还没有分析,现在来分析一下EventLoopGroup是如何分配EventLoop给Channel的,我们仅分析非阻塞I/O下的分配策略,阻塞模式下的分配策略可以参考非阻塞下的分配策略。

在MultithreadEventLoopGroup.register方法中,调用了next()方法,我们来看一下这个流程:

1
2
3
4
5
6
复制代码
-> MultithreadEventExecutorGroup.next()

public EventExecutor next() {
return chooser.next();
}

chooser是什么东西?

1
2
复制代码
private final EventExecutorChooserFactory.EventExecutorChooser chooser;

它是怎么初始化的呢?

1
2
3
4
5
6
7
8
复制代码
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

这是它初始化最后调用的方法,这个方法在DefaultEventExecutorChooserFactory中被实现,这个参数是MultithreadEventExecutorGroup类中的children,也就是EventLoopGroup中的所有EventLoop,那这个newChooser得分配方法就是如果EventLoop的数量是2的n次方,那么就使用PowerOfTwoEventExecutorChooser来分配,否则使用GenericEventExecutorChooser来分配。这两个策略类的分配方法实现分别如下:

1
2
3
4
5
6
7
8
9
10
复制代码     
1、PowerOfTwoEventExecutorChooser
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}

2、GenericEventExecutorChooser
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}

所以,到此为止,我们可以解决为什么一个EventLoop会被分配给多个Channel的疑惑。本文到此也就结束了。篇幅较长,内容涉及到Reactor的三种线程模型,然后分析了Netty的线程模型,然后分析了Netty的EventLoop,以及EventLoopGroup,以及分析了EventLoop是怎么被分配给一个Channel的,和一个EventLoop是如何启动起来来处理事件的。最后分析了EventLoopGroup分配EventLoop的策略,对于本文涉及的内容的更为深入的分析总结,将在未来的某个适宜的时刻进行。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java中的ThreadLocal和 Inheritable

发表于 2017-10-15

ThreadLocal

ThreadLocal从字面理解就是线程本地变量,貌似是一种线程私有的缓存变量的容器。为了说明ThreadLocal的特点,举个例子:比如有三个人,每个人比作一个线程,它们都需要一个袋子来装捡到的东西,也就是每个线程都希望自己有一个容器,当然,自己的捡到的东西肯定不希望和别人分享啊,也就是希望这个容器对其他人(线程)是不可见的,如果现在只有一个袋子,那怎么办?

  1. 每个人在捡东西之前一定会先抢到那个唯一的袋子,然后再捡东西,如果使用袋子的时间到了,就会马上把里面的东西消费掉,然后把袋子放到原来的地方,然后再次去抢袋子。这个方案是使用锁来避免线程竞争问题的,三个线程需要竞争同一个共享变量。
  2. 我们假设现在不是只有一个袋子了,而是有三个袋子,那么就可以给每个人安排一个袋子,然后每个人的袋子里面的对象是对其他人不可见的,这样的好处是解决了多个人竞争同一个袋子的问题。这个方案就是使用ThreadLocal来避免不必要的线程竞争的。

大概了解了ThreadLocal,下面来看看它的使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
复制代码
private static class UnsafeThreadClass {

private int i;

UnsafeThreadClass(int i) {
this.i = i;
}

int getAndIncrement() {
return ++ i;
}

@Override
public String toString() {
return "[" + Thread.currentThread().getName() + "]" + i;
}
}

private static ThreadLocal<UnsafeThreadClass> threadLocal = new ThreadLocal<>();

static class ThreadLocalRunner extends Thread {
@Override
public void run() {

UnsafeThreadClass unsafeThreadClass = threadLocal.get();

if (unsafeThreadClass == null) {
unsafeThreadClass = new UnsafeThreadClass(0);
threadLocal.set(unsafeThreadClass);
}

unsafeThreadClass.getAndIncrement();

System.out.println(unsafeThreadClass);
}

}

上面的例子仅仅是为了说明ThreadLocal可以为每个线程保存一个本地变量,这个变量不会受到其他线程的干扰,你可以使用多个ThreadLocal来让线程保存多个变量,下面我们分析一下ThreadLocal的具体实现细节,首先展示了ThreadLocal提供的一些方法,我们重点关注的是get、set、remove方法。

ThreadLocal方法

ThreadLocal方法

首先,我们需要new一个ThreadLocal对象,那么ThreadLocal的构造函数做了什么呢?

1
2
3
4
5
6
7
复制代码
/**
* Creates a thread local variable.
* @see #withInitial(java.util.function.Supplier)
*/
public ThreadLocal() {
}

很遗憾它什么都没做,那么初始化的过程势必是在首次set的时候做的,我们来看一下set方法的细节:

1
2
3
4
5
6
7
8
9
复制代码
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

看起来首先根据当前线程获取到了一个ThreadLocalMap,getMap方法是做了什么?

1
2
3
4
复制代码
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

非常的简洁,是和Thread与生俱来的,我们看一下Thread中的相关定义:

1
2
3
4
5
6
7
8
9
10
复制代码
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

/*
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

关于inheritableThreadLocals将在下一小节再学习总结。

获得了线程的ThreadLocalMap之后,如果不为null,说明不是首次set,直接set就可以了,注意key是this,也就是当前的ThreadLocal啊不是Thread。如果为空呢?说明还没有初始化,那么就需要执行createMap这个方法:

1
2
3
4
复制代码
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

没什么特别的,就是初始化线程的threadLocals,然后设定key-value。

下面分析一下get的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

和set一样,首先根据当前线程获取ThreadLocalMap,然后判断是否为null,如果为null,说明ThreadLocalMap还没有被初始化啊,那么就返回方法setInitialValue的结果,这个方法做了什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}

protected T initialValue() {
return null;
}

最后会返回null,但是会做一些初始化的工作,和set一样。在get里面,如果返回的ThreadLocalMap不为null,则说明ThreadLocalMap已经被初始化了,那么就可以正常根据ThreadLocal作为key获取了。

当线程退出时,会清理ThreadLocal,可以看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码
/**
* This method is called by the system to give a Thread
* a chance to clean up before it actually exits.
*/
private void exit() {
if (group != null) {
group.threadTerminated(this);
group = null;
}
/* Aggressively null out all reference fields: see bug 4006245 */
target = null;
/* Speed the release of some of these resources */
threadLocals = null;
inheritableThreadLocals = null;
inheritedAccessControlContext = null;
blocker = null;
uncaughtExceptionHandler = null;
}

这里做了大量“Help GC”的工作。包括我们本节所讲的threadLocals和下一小节要讲的inheritableThreadLocals都会被清理。

如果我们想要显示的清理ThreadLocal,可以使用remove方法:

1
2
3
4
5
6
复制代码
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}

逻辑较为直接,很好理解。

InheritableThreadLocal

ThreadLocal固然很好,但是子线程并不能取到父线程的ThreadLocal的变量,比如下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码
private static ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<>();
private static InheritableThreadLocal<Integer> inheritableThreadLocal =
new InheritableThreadLocal<>();

public static void main(String[] args) throws InterruptedException {

integerThreadLocal.set(1001); // father
inheritableThreadLocal.set(1002); // father

new Thread(() -> System.out.println(Thread.currentThread().getName() + ":"
+ integerThreadLocal.get() + "/"
+ inheritableThreadLocal.get())).start();

}

//output:
Thread-0:null/1002

使用ThreadLocal不能继承父线程的ThreadLocal的内容,而使用InheritableThreadLocal时可以做到的,这就可以很好的在父子线程之间传递数据了。下面我们分析一下InheritableThreadLocal的实现细节,下面展示了InheritableThreadLocal提供的方法:

InheritableThreadLocal方法

InheritableThreadLocal方法

InheritableThreadLocal继承了ThreadLocal,然后重写了上面三个方法,所以除了上面三个方法之外,其他所有对InheritableThreadLocal的调用都是对ThreadLocal的调用,没有什么特别的。我们上文中提到了Thread类,里面有我们本文关心的两个成员,我们来看一下再Thread中做了哪些工作,我们跟踪一下new一个Thread的调用路径:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码
new Thread()

init(ThreadGroup g, Runnable target, String name, long stackSize)


init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals)

->
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

createInheritedMap(ThreadLocalMap parentMap)


ThreadLocalMap(ThreadLocalMap parentMap)

上面列出了最为关键的代码,可以看到,最后会调用ThreadLocal的createInheritedMap方法,而该方法会新建一个ThreadLocalMap,看一下构造函数的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];

for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}

parentMap就是父线程的ThreadLocalMap,这个构造函数的意思大概就是将父线程的ThreadLocalMap复制到自己的ThreadLocalMap里面来,这样我们就可以使用InheritableThreadLocal访问到父线程中的变量了。

对ThreadLocal更为具体和深入的分析将在其他的篇章中进行,本文点到即可,为了深入理解ThreadLocal,可以阅读ThreadLocalMap的源码,以及可以在项目中多思考是否可以使用ThreadLocal来做一些事情,比如,如果我们具有这样一种线程模型,一个任务从始至终只会被一个线程执行,那么可以使用ThreadLocal来计算运行该任务的时间。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java可重入锁详解

发表于 2017-10-15

前言

在java中,锁是实现并发的关键组件,多个线程之间的同步关系需要锁来保证,所谓锁,其语义就是资源获取到资源释放的一系列过程,使用lock的意图就是想要进入临界区对共享资源执行操作,使用unlock说明线程已经完成了相关工作或者发生了异常从而离开临界区释放共享资源,可以说,在多线程环境下,锁是一个必不可少的组件。我们最为常用的并发锁是synchronized关键字,在最新的jdk中,synchronized的性能已经有了极大的提升了,而且未来还会对它做更进一步的优化,最为重要的是synchronized使用起来特别方便,基本不需要要我们考虑太多的内容,只需要将临界区的代码放在synchronized关键字里面,然后设定好需要锁定的对象,synchronized就会自动为进入的并发线程lock和unlock。在大多数情况下,我们写并发代码使用synchronized就足够了,而且使用synchronized也是首选,不过如果我们希望更加灵活的使用锁来做并发,那么java还提供了一个借口Lock,本文并不会对synchronized进行分析总结,本文的重点在Lock接口,以及实现了Lock接口的一些子类的分析总结。

为了本文的完整性,可以参考Java同步框架AbstractQueuedSynchronizer,这个俗称为AQS的东西是Lock接口实现的根本,它实现了Lock的全部语义,可以说,java的Lock接口的子类就是借助AQS来实现了lock和unlock的,理解了AQS,就可以很好的理解java中的锁了。

Lock接口以及ReadWriteLock接口

下面首先展示出了Lock接口的内容,然后是ReadWriteLock的接口,本文主要分析这两个接口的几个子类的实现细节。

Lock接口

Lock接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}

Lock接口提供了lock和unlock方法,提供加锁和释放锁的语义,lockInterruptibly方法可以响应中断,lock方法会阻塞线程直到获取到锁,而tryLock方法则会立刻返回,返回true代表获取锁成功,而返回false则说明获取不到锁。newCondition方法返回一个条件变量,一个条件变量也可以做线程间通信来同步线程。多个线程可以等待在同一个条件变量上,一些线程会在某些情况下通知等待在条件变量上的线程,而有些变量在某些情况下会加入到条件变量上的等待队列中去。

ReadWriteLock是读写锁,可以对共享变量的读写提供并发支持,ReadWriteLock接口的两个方法分别返回一个读锁和一个写锁。本文将基于上面提到的两个接口Lock和ReadWriteLock,对Lock的子类ReentrantLock和ReadWriteLock的子类ReentrantReadWriteLock进行一些分析总结,以备未来不时之需。

ReentrantLock

在Java同步框架AbstractQueuedSynchronizer提到了两个概念,一个是独占锁,一个是共享锁,所谓独占锁就是只能有一个线程获取到锁,其他线程必须在这个锁释放了锁之后才能竞争而获得锁。而共享锁则可以允许多个线程获取到锁。具体的分析不再本文的分析范围之内。

ReentrantLock翻译过来为可重入锁,它的可重入性表现在同一个线程可以多次获得锁,而不同线程依然不可多次获得锁,这在下文中会进行分析。下文会分析它是如何借助AQS来实现lock和unlock的,本文只关注核心方法,比如lock和unlock,而不会去详细的描述所有的方法。ReentrantLock分为公平锁和非公平锁,公平锁保证等待时间最长的线程将优先获得锁,而非公平锁并不会保证多个线程获得锁的顺序,但是非公平锁的并发性能表现更好,ReentrantLock默认使用非公平锁。下面分公平锁和非公平锁来分析一下ReentrantLock的代码。

锁Sync

在文章Java同步框架AbstractQueuedSynchronizer中已经提到了如何通过AQS来实现锁的方法,那就是继承AbstractQueuedSynchronizer类,然后使用它提供的方法来实现自己的锁。ReentrantLock的Sync也是通过这个方法来实现锁的。

Sync有一个抽象方法lock,其子类FairSync和NonfairSync分别实现了公平上锁和非公平上锁。nonfairTryAcquire方法用于提供可重入的非公平上锁,之所以把它放在Sync中而不是在子类NonfairSync中(FairSync中有公平的可重入上锁版本的实现),是因为nonfairTryAcquire不仅在NonfairSync中被使用了,而且在ReentrantLock.tryLock里面也使用到了。对于它的分析留到NonfairSync里面再分析。

Sync中还需要注意的一个方法是tryRelease,执行这个方法说明线程在离开临界区,下面是tryRelease方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

tryRelease方法重写了父类的tryRelease方法,而父类的tryRelease方法在release方法中被调用,而release方法最后会被用于实现ReentrantLock的unlock方法。所以理解了该方法,就理解了ReentrantLock的unlock逻辑。

从上面展示的代码分析,getState方法获取当前的共享变量,getState方法的返回值代表了有多少线程获取了该条件变量,而release代表着想要释放的次数,然后根据这两个值计算出最新的state值,接着判断当前线程是否独占了锁,如果不是,那么就抛出异常,否则继续接下来的流程。如果最新的state为0了,说明锁已经被释放了,可以被其他线程获取了。然后更新state值。

公平锁FairSync

FairSync实现了公平锁的lock和tryAcquire方法,下面分别看一下这两个方法的实现细节:

1
2
3
4
复制代码
final void lock() {
acquire(1);
}

可以看到,FairSync的lock实现使用了AQS提供的acquire方法,这个方法的详细解析见Java同步框架AbstractQueuedSynchronizer。

下面是tryAcquire方法的细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

可以看到,公平锁的tryAcquire实现和非公平锁的tryAcquire实现的区别在于:公平锁多加了一个判断条件:hasQueuedPredecessors,如果发现有线程在等待获取锁了,那么就直接返回false,否则在继承尝试获取锁,这样就保证了线程是按照排队时间来有限获取锁的。而非公平锁的实现则不考虑是否有节点在排队,会直接去竞争锁,如果获取成功就返回true,否则返回false。

当然,这些分支执行的条件是state为0,也就是说当前没有线程独占着锁,或者获取锁的线程就是当前独占着锁的线程,如果是前者,就按照上面分析的流程进行获取锁,如果是后者,则更新state的值,如果不是上述的两种情况,那么直接返回false说明尝试获取锁失败。

非公平锁NonfairSync

公平锁的lock使用了AQS的acquire,而acquire会将参与锁竞争的线程加入到等待队列中去按顺序获得锁,队列头部的节点代表着当前获得锁的节点,头结点释放锁之后会唤醒其后继节点,然后让后继节点来竞争获取锁,这样就可以保证锁的获取是按照一定的优先级来的。而非公平锁的实现则会首先尝试去竞争锁,如果不成功,再走AQS提供的acquire方法,下面是NonfairSync的lock方法:

1
2
3
4
5
6
7
复制代码
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

非公平锁的tryAcquire方法使用了父类的nonfairTryAcquire方法来实现。

ReentrantLock上锁和释放锁

说完了Sync类和其两个子类,现在来看一下ReentrantLock是如何使用这两个类来实现lock和unlock的。首先是ReentrantLock的构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

默认构造函数使用了非公平锁来提高并发度,一般情况下使用默认构造函数即可。而在一些特殊的情景下,需要使用公平锁的话就传递一个true的参数。下面是lock和unlock方法的细节,lock使用了Sync的lock方法,而unlock使用了AQS的release方法,而release方法使用了其tryRelease方法,而这个方法在Sync类中被重写,上面我们已经有过分析。

1
2
3
4
5
6
7
8
复制代码
public void lock() {
sync.lock();
}

public void unlock() {
sync.release(1);
}

newCondition方法

newCondition这个方法需要一些篇幅来描述一下,而newCondition方法的返回内容涉及AQS类中的内部类ConditionObject,所以也就是分析一下ConditionObject这个类的一些细节。下面的图片展示了ConditionObject这个类的类图,可以看出,它实现了Condition的所有方法。

ConditionObject类图

ConditionObject类图

关于Condition接口的描述,可以参考下面的文档内容:

1
2
3
4
5
6
7
8
9
10
复制代码
* Conditions (also known as condition queues or
* condition variables) provide a means for one thread to
* suspend execution (to wait) until notified by another
* thread that some state condition may now be true. Because access
* to this shared state information occurs in different threads, it
* must be protected, so a lock of some form is associated with the
* condition. The key property that waiting for a condition provides
* is that it atomically releases the associated lock and
* suspends the current thread, just like {@code Object.wait}.

await和await(long time, TimeUnit unit)方法

接下来分析一下ConditionObject类是如何实现Condition接口的方法的。首先是await方法,这个方法的意思是让当前线程等待直到有别的线程signalled,或者被interrupted。调用此方法的线程会被阻塞直到有其他的线程唤醒或者打断它。下面是它的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

首先,如果线程被中断了,那么抛出异常。否则调用addConditionWaiter方法生成一个Node,下面来看一下addConditionWaiter这个方法的细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

lastWaiter是等待在该Condition上的队列末尾的线程,根据代码,首先,如果最后一个线程从Condition上被取消了,并且当前线程并没有在该Condition的等待队列上,那么就将当前线程作为该Condition上等待队列的末尾节点。如果上面的条件不成立,那么就使用当前线程生成一个新的Node,然后将其状态变为Node.CONDITION代表其等待在某个Condition上,然后将该新的节点添加到队列的末尾。

现在回头看await方法,我们发现addConditionWaiter的作用就是将当前线程添加到Condition的等待队列上去。接下来的步骤特别关键。await方法调用了fullyRelease方法,我们来看一下这个方法是干嘛用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

fullyRelease会调用release方法来是释放当前线程的同步状态,并且返回释放之后的状态值,这个值在await方法中作为了acquireQueued方法参数,这个方法在稍后分析。现在来看一下接下来的步骤,在获得了当前线程的state值了之后,就会进入一个while循环中去,while循环停止的条件是isOnSyncQueue(node)这个方法返回true,这个方法是用来判断一个Node是否在AQS的等待队列中的,下面是其方法内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;

return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

也就是说,只要当前线程的Node还在Condition上等待的话,就会一直在while循环中等待,而这个等待被破除的关键是signal方法,后面会分析到。我们现在假设signal方法运行完了,并且当前线程已经被添加到了AQS的SYNC等待队列中去了,那么接下来就使用我们一开始获取到的state值来竞争锁了,而这个竞争就是AQS的逻辑,下面的方法就是这个竞争去获取锁的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

这个方法会自旋的来获得同步变量,这个方法中的循环结束的条件是:

  1. 该节点的前驱节点是头结点,头结点代表的是获得锁的节点,只有它释放了state其他线程才能获得这个变量的所有权
  2. 在条件1的前提下,方法tryAcquire返回true,也就是可以获得同步资源state

整个await方法总结起来就是首先释放当前线程的条件变量,然后获取到释放完了之后的state值,我们假设这就是这个线程当前上下文的一部分内容,然后进入阻塞等待,一直在while循环里面等待,如果当前线程的Node被添加到了Sync队列中去了,那么就可以开始去竞争锁了,否则一直在等待,在await方法的整个过程中,可以相应中断。

上面分析了await方法,await(long time, TimeUnit unit)方法只是在await方法上加了一个超时时间,await会死等直到被添加到Sync队列中去,而await(long time, TimeUnit unit)方法只会等设定的超时时间,如果超时时间到了,会自己去竞争锁。

还有awaitUninterruptibly方法是await方法的简化版本,它不会相应中断。awaitUntil(Date deadline)方法让你可以设定一个deadline时间,如果超过这个时间了还没有被添加到Sync队列中去,那么线程就会自作主张的去竞争锁了。

signal和signalAll方法

上面分析了如何使用await等一系列方法来block线程,现在来分析如何使线程冲破block从而参与到获取锁的竞争中去。首先分析一下signal方法,使用这个方法可以使得线程被添加到Sync队列中去竞争锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
复制代码
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

signal方法首先要唤醒的是等待在它的Condition的等待队列上的第一个节点,signal方法调用了doSignal方法,而doSignal方法调用了transferForSignal方法,transferForSignal方法调用enq方法将节点添加到了Sync队列中去,至此,await方法的while循环将不满足继续循环的条件,会执行循环之后的流程,也就是会去竞争锁,而之后的流程已经在Java同步框架AbstractQueuedSynchronizer中有分析,不在此赘述。

signalAll方法的意思是把所有等待在条件变量上的线程都唤醒去竞争锁,下面是它的流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

在方法doSignalAll中遍历每一个等待在条件变量上的Node,然后调用transferForSignal方法将它们添加到Sync队列中去。关于Condition的内容就分析这么多,介于后文还要对java的可重入读写锁进行分析,所以篇幅不宜过长,日后会对Condition进行更为深入的学习和总结。

ReentrantReadWriteLock

ReentrantReadWriteLock即可重入读写锁,下文将分析它在什么情况下是可重入的,而在什么情况下是独占的。ReentrantReadWriteLock 类实现了ReadWriteLock接口,它提供的读写锁是分离的,读锁和写锁分别是独立的锁。而读锁和写锁的实现也是不一样的,ReentrantReadWriteLock使用了两个内部类ReadLock和WriteLock来分别表示读锁和写锁,而这两种锁又依赖于基于AQS的类Sync来实现,Sync也是一个内部类,它继承了AQS类来实现了lock和unlock的语义。首先来分析一下其Sync内部类。

Sync内部类

首要解决的一个问题是,我们知道,AQS是使用了一个int类型的值来表示同步变量的,现在要使用一个int值来表示读锁和写锁两种类型的同步,怎么办呢?我们知道,一个int是32位的,ReentrantReadWriteLock使用高16位代表了读锁同步变量,而低16位代表了写锁同步变量,所以读锁与写锁的可重入数量限定在了(2^16-1)个,当然AQS还有一个使用long变量来实现的版本AbstractQueuedLongSynchronizer,它的实现和AQS除了使用了long类型的变量来代表同步变量之外没有区别。下面我们来看一下Sync是如何获取重入的读线程数量和写线程数量的:

1
2
3
4
5
6
7
8
9
10
复制代码
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

我们来实践一个,比如现在的c是589826,则 (589826 >>> 16 = 9),说明有9个读可重入数量,而(589826 & (1 << 16 - 1)) = 2,说明有写重入的数量为2。需要注意的一点是,读锁可以有多个线程获取,而写锁只允许一个线程获取,那如何使用16位来代表多个读锁呢?ReentrantReadWriteLock使用了ThreadLocal来保存每个线程的重入数量,关于ThreadLocal的分析总结,可以参考
Java中的ThreadLocal和 InheritableThreadLocal,ReentrantReadWriteLock的做法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

private transient HoldCounter cachedHoldCounter;

Sync类实现了一些子类通用了方法,下面重点分析几个方法。

tryRelease和tryReleaseShared方法

tryRelease方法重写了AQS的tryRelease方法,而tryRelease这个方法会在release方法中使用到,也就是在unlock的时候用到。下面展示了它的细节:

1
2
3
4
5
6
7
8
9
10
11
复制代码
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

tryRelease很好理解,它的任务就是去更新state值,它调用了我们上面分析过的exclusiveCount方法来计算写重入的数量。到这里需要提出的是,ReentrantReadWriteLock在实现上实现了读锁和写锁,读锁允许多个线程重入,使用了AQS的共享模式,而写锁只允许一个线程获得锁,使用了AQS的独占模式,所以这个tryRelease方法会在WriteLock的unlock方法中被用到,而ReadLock中的unlock使用的是AQS的releaseShared方法,而这个方法会调用AQS的tryReleaseShared方法,而这个方法在Sync被重写,也就是接下来分析的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
复制代码
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

可以很明显的感觉得出来,读锁的释放要比写锁的释放要麻烦很多,因为写锁只有一个线程获得,而读锁则有多个线程获取。释放需要获取到当前线程的ThreadLocal变量,然后更新它的重入数量,更新state值。可以看到,因为使用了ThreadLocal,使得多个线程的问题变得简单起来,就好像是操作同一个线程一样。

tryAcquire和tryAcquireShared方法

ReadLock在调用lock方法的时候,会调用AQS的releaseShared方法,而releaseShared方法会调用AQS的tryReleaseShared方法,而tryReleaseShared方法在Sync中被重写了。WriteLock在lock的时候,会调用AQS的acquire方法,而acquire方法会调用AQS的tryAcquire方法,而tryAcquire方法在Sync中被重写了,所以接下来分析一下这两个被重写的方法来认识一下WriteLock和ReadLock是如何通过AQS来lock的。

首先是tryAcquire方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
复制代码
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

这个方法用于写锁上锁,我们知道,只有一个线程可以获取到写锁,如果w为0,说明已经有线程获得了读锁,而在有读线程在读取数据的时候,写锁是无法获得的,所以w为0直接回失败,或者w不为0则说明了已经有线程获得了写锁,那么因为只允许有一个线程获取到写锁,所以如果当前线程不是那个获得了写锁的独占锁的话,也就直接失败,否则,如果上面两条检测都通过了,也就是说,当前没有线程获得读锁和写锁,那么判断重入数量是否超过了最大值,如果是则抛出异常,否则上锁成功。上面的分析都是基于c不为0,也就是说已经有线程获得了读锁或者写锁的情况下分析的,那如果c为0呢?说明当前环境下没有线程占有锁,那么接下来就分公平锁和非公平锁了,Sync有两个抽象方法需要子类来实现为公平锁还是非公平锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码
/**
* Returns true if the current thread, when trying to acquire
* the read lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean readerShouldBlock();

/**
* Returns true if the current thread, when trying to acquire
* the write lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean writerShouldBlock();

具体的细节到公平锁和非公平锁的分析上再讲细节。上面分析完了WriteLock使用的lock需要的tryAcquire方法,下面来分析一下ReadLock的lock需要的tryReleaseShared方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
复制代码
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

同样比WriteLock的要复杂很多,这个方法会返回1或者-1代表lock的结果,1代表lock成功了,-1代表lock失败了,来分析一下在上面情况下会失败:

1、如果有线程已经获得了写锁,那么肯定会失败,因为写锁是排斥锁,不允许其他线程获得任意类型的锁 2、如果重入的数量已经超过了限定,那么也会失败,如果你还想要支持更多的重入数量,那么使用AbstractQueuedLongSynchronizer来代替AQS

而在下面的情况下是会成功的:

1、没有线程获得写锁
2、获得写锁的线程就是当前想要获得读锁的线程
3、重入数量没有超过上限

总结起来就是,只要有线程获得了写锁,那么其他线程都获取不到写锁,如果获得写锁的线程想要获取读锁,那么可以成功。在获取读锁的时候,多个线程可以同时获得读锁,读锁是共享锁,而写锁是独占锁。

FairSync和NonFairSync的实现

这两个类仅仅是继承了父类然后实现了两个抽象方法,来表示读线程是否需要阻塞和写线程是否需要阻塞,以这样的方式来达到公平锁和非公平锁的目的。

下面是公平锁的实现:

1
2
3
4
5
6
7
复制代码
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}

可以看出,对于公平锁来说,读锁和写锁都是查看Sync队列中是否有排队的线程,如果没有,则可以放行,否则就得排队。下面是非公平锁的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码

final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}

final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

在非公平锁中,写锁的获取不需要阻塞,而读锁的获取在apparentlyFirstQueuedIsExclusive中判断是否需要阻塞。所谓公平锁和非公平锁只是希望能对所有的线程都不区别对待,但是使用公平锁的代价是吞吐量没有非公平锁那么大,所以,如果我们的需求没有特别的原因,应该使用非公平锁。

ReadLock和WriteLock

上面介绍了Sync类,现在来分析一下ReadLock和WriteLock是如何通过Sync提高的方法来实现lock和unlock的。首先是ReadLock。它的lock和unlock方法如下:

1
2
3
4
5
6
7
8
复制代码
public void lock() {
sync.acquireShared(1);
}

public void unlock() {
sync.releaseShared(1);
}

而WriteLock的lock和unlock方法如下:

1
2
3
4
5
6
7
8
复制代码
public void lock() {
sync.acquire(1);
}

public void unlock() {
sync.release(1);
}

如果想要获取更多关于AQS的相关知识,可以去阅读AQS的源代码,或者参考Java同步框架AbstractQueuedSynchronizer,上文中也对lock和unlock的流程有所分析,再次也不做赘述。

可重入锁使用示例

最后,在分析了ReentrantLock和ReentrantReadWriteLock之后,来看一下如何使用它们。

ReentrantLock使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码
/**
* how to use ReentrantLock lock
*/
class LockX {
private final Lock LOCK = new ReentrantLock(); // non-fair lock

public void lockMethod() {
LOCK.lock();
try {
doBiz();
} catch (Exception e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}

public void doBiz() {

}

}

你需要注意的是你应该总是在try块中执行你的业务代码,然后在finally中unlock掉。

ReentrantReadWriteLock使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
复制代码
abstract class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = loadData();
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}

try {
consumeData(data);
} finally {
rwl.readLock().unlock();
}
}

abstract Object loadData();
abstract void consumeData(Object data);
}

别忘了在lock之后要unlock,否则如果一个写锁被获取之后没有释放的话,就不可能有锁获得锁了除非它自己本身。通用的做法是在try 块中进行业务处理,然后在finally中释放锁。Lock接口的使用相比于synchronized的使用要复杂很多,所以在大部分情况下,你应该使用synchronized来做并发控制,而不是Lock,但是如果想要做更加灵活的锁控制,你就可以选择使用Lock接口的具体实现类来应用,或者继承AQS来实现自己的同步器。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

设计模式也可以这么简单

发表于 2017-10-13

一直想写一篇介绍设计模式的文章,让读者可以很快看完,而且一看就懂,看懂就会用,同时不会将各个模式搞混。自认为本文还是写得不错的😂😂😂,我也花了很多心思来写和做图,力求让读者真的能看着简单同时有所收获。

设计模式是对大家实际工作中写的各种代码进行高层次抽象的总结,其中最出名的当属 Gang of Four (GoF) 的分类了,他们将设计模式分类为 23 种经典的模式,根据用途我们又可以分为三大类,分别为创建型模式、结构型模式和行为型模式。

有一些重要的设计原则在开篇和大家分享下,这些原则将贯通全文:

  1. 面向接口编程,而不是面向实现。这个很重要,也是优雅的、可扩展的代码的第一步,这就不需要多说了吧。
  2. 职责单一原则。每个类都应该只有一个单一的功能,并且该功能应该由这个类完全封装起来。
  3. 对修改关闭,对扩展开放。对修改关闭是说,我们辛辛苦苦加班写出来的代码,该实现的功能和该修复的 bug 都完成了,别人可不能说改就改;对扩展开放就比较好理解了,也就是说在我们写好的代码基础上,很容易实现扩展。

目录

  • 创建型模式
    • 简单工厂模式
    • 工厂模式
    • 抽象工厂模式
    • 单例模式
    • 建造者模式
    • 原型模式
    • 创建型模式总结
  • 结构型模式
    • 代理模式
    • 适配器模式
      • 默认适配器模式
      • 对象适配器模式
      • 类适配器模式
      • 适配器模式总结
    • 桥梁模式
    • 装饰模式
    • 门面模式
    • 组合模式
    • 享元模式
    • 结构型模式总结
  • 行为型模式
    • 策略模式
    • 观察者模式
    • 责任链模式
    • 模板方法模式
    • 行为型模式总结
  • 总结

创建型模式

创建型模式的作用就是创建对象,说到创建一个对象,最熟悉的就是 new 一个对象,然后 set 相关属性。但是,在很多场景下,我们需要给客户端提供更加友好的创建对象的方式,尤其是那种我们定义了类,但是需要提供给其他开发者用的时候。

简单工厂模式

和名字一样简单,非常简单,直接上代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码public class FoodFactory {
public static Food makeFood(String name) {
if (name.equals("noodle")) {
Food noodle = new LanZhouNoodle();
noodle.addSpicy("more");
return noodle;
} else if (name.equals("chicken")) {
Food chicken = new HuangMengChicken();
chicken.addCondiment("potato");
return chicken;
} else {
return null;
}
}
}

其中,LanZhouNoodle 和 HuangMengChicken 都继承自 Food。

简单地说,简单工厂模式通常就是这样,一个工厂类 XxxFactory,里面有一个静态方法,根据我们不同的参数,返回不同的派生自同一个父类(或实现同一接口)的实例对象。

我们强调职责单一原则,一个类只提供一种功能,FoodFactory 的功能就是只要负责生产各种 Food。

工厂模式

简单工厂模式很简单,如果它能满足我们的需要,我觉得就不要折腾了。之所以需要引入工厂模式,是因为我们往往需要使用两个或两个以上的工厂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码public interface FoodFactory {
Food makeFood(String name);
}
public class ChineseFoodFactory implements FoodFactory {
@Override
public Food makeFood(String name) {
if (name.equals("A")) {
return new ChineseFoodA();
} else if (name.equals("B")) {
return new ChineseFoodB();
} else {
return null;
}
}
}
public class AmericanFoodFactory implements FoodFactory {
@Override
public Food makeFood(String name) {
if (name.equals("A")) {
return new AmericanFoodA();
} else if (name.equals("B")) {
return new AmericanFoodB();
} else {
return null;
}
}
}

其中,ChineseFoodA、ChineseFoodB、AmericanFoodA、AmericanFoodB 都派生自 Food。

客户端调用:

1
2
3
4
5
6
7
8
复制代码public class APP {
public static void main(String[] args) {
// 先选择一个具体的工厂
FoodFactory factory = new ChineseFoodFactory();
// 由第一步的工厂产生具体的对象,不同的工厂造出不一样的对象
Food food = factory.makeFood("A");
}
}

虽然都是调用 makeFood(“A”) 制作 A 类食物,但是,不同的工厂生产出来的完全不一样。

第一步,我们需要选取合适的工厂,然后第二步基本上和简单工厂一样。

核心在于,我们需要在第一步选好我们需要的工厂。比如,我们有 LogFactory 接口,实现类有 FileLogFactory 和 KafkaLogFactory,分别对应将日志写入文件和写入 Kafka 中,显然,我们客户端第一步就需要决定到底要实例化 FileLogFactory 还是 KafkaLogFactory,这将决定之后的所有的操作。

虽然简单,不过我也把所有的构件都画到一张图上,这样读者看着比较清晰:

factory-1

抽象工厂模式

当涉及到产品族的时候,就需要引入抽象工厂模式了。

一个经典的例子是造一台电脑。我们先不引入抽象工厂模式,看看怎么实现。

因为电脑是由许多的构件组成的,我们将 CPU 和主板进行抽象,然后 CPU 由 CPUFactory 生产,主板由 MainBoardFactory 生产,然后,我们再将 CPU 和主板搭配起来组合在一起,如下图:

factory-1

这个时候的客户端调用是这样的:

1
2
3
4
5
6
7
8
复制代码// 得到 Intel 的 CPU
CPUFactory cpuFactory = new IntelCPUFactory();
CPU cpu = intelCPUFactory.makeCPU();
// 得到 AMD 的主板
MainBoardFactory mainBoardFactory = new AmdMainBoardFactory();
MainBoard mainBoard = mainBoardFactory.make();
// 组装 CPU 和主板
Computer computer = new Computer(cpu, mainBoard);

单独看 CPU 工厂和主板工厂,它们分别是前面我们说的工厂模式。这种方式也容易扩展,因为要给电脑加硬盘的话,只需要加一个 HardDiskFactory 和相应的实现即可,不需要修改现有的工厂。

但是,这种方式有一个问题,那就是如果 Intel 家产的 CPU 和 AMD 产的主板不能兼容使用,那么这代码就容易出错,因为客户端并不知道它们不兼容,也就会错误地出现随意组合。

下面就是我们要说的产品族的概念,它代表了组成某个产品的一系列附件的集合:

abstract-factory-2

当涉及到这种产品族的问题的时候,就需要抽象工厂模式来支持了。我们不再定义 CPU 工厂、主板工厂、硬盘工厂、显示屏工厂等等,我们直接定义电脑工厂,每个电脑工厂负责生产所有的设备,这样能保证肯定不存在兼容问题。

abstract-factory-3

这个时候,对于客户端来说,不再需要单独挑选 CPU厂商、主板厂商、硬盘厂商等,直接选择一家品牌工厂,品牌工厂会负责生产所有的东西,而且能保证肯定是兼容可用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码public static void main(String[] args) {
// 第一步就要选定一个“大厂”
ComputerFactory cf = new AmdFactory();
// 从这个大厂造 CPU
CPU cpu = cf.makeCPU();
// 从这个大厂造主板
MainBoard board = cf.makeMainBoard();
// 从这个大厂造硬盘
HardDisk hardDisk = cf.makeHardDisk();

// 将同一个厂子出来的 CPU、主板、硬盘组装在一起
Computer result = new Computer(cpu, board, hardDisk);
}

当然,抽象工厂的问题也是显而易见的,比如我们要加个显示器,就需要修改所有的工厂,给所有的工厂都加上制造显示器的方法。这有点违反了对修改关闭,对扩展开放这个设计原则。

单例模式

单例模式用得最多,错得最多。

饿汉模式最简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码public class Singleton {
// 首先,将 new Singleton() 堵死
private Singleton() {};
// 创建私有静态实例,意味着这个类第一次使用的时候就会进行创建
private static Singleton instance = new Singleton();

public static Singleton getInstance() {
return instance;
}
// 瞎写一个静态方法。这里想说的是,如果我们只是要调用 Singleton.getDate(...),
// 本来是不想要生成 Singleton 实例的,不过没办法,已经生成了
public static Date getDate(String mode) {return new Date();}
}

很多人都能说出饿汉模式的缺点,可是我觉得生产过程中,很少碰到这种情况:你定义了一个单例的类,不需要其实例,可是你却把一个或几个你会用到的静态方法塞到这个类中。

饱汉模式最容易出错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码public class Singleton {
// 首先,也是先堵死 new Singleton() 这条路
private Singleton() {}
// 和饿汉模式相比,这边不需要先实例化出来,注意这里的 volatile,它是必须的
private static volatile Singleton instance = null;
public static Singleton getInstance() {
if (instance == null) {
// 加锁
synchronized (Singleton.class) {
// 这一次判断也是必须的,不然会有并发问题
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}

双重加锁,volatile 保证了线程间的可见性,synchronized 对可能的并发问题做同步

很多人不知道怎么写,直接就在 getInstance() 方法签名上加上 synchronized,这就不多说了。

嵌套类最经典,以后大家就用它吧:

1
2
3
4
5
6
7
8
9
10
复制代码public class Singleton3 {
private Singleton3() {}
// 主要是使用了 嵌套类可以访问外部类的静态属性和静态方法 的特性
private static class Holder {
private static Singleton3 instance = new Singleton3();
}
public static Singleton3 getInstance() {
return Holder.instance;
}
}

注意,很多人都会把这个嵌套类说成是静态内部类,严格地说,内部类和嵌套类是不一样的,它们能访问的外部类权限也是不一样的。

最后,一定有人跳出来说用枚举实现单例,是的没错,枚举类很特殊,它在类加载的时候会初始化里面的所有的实例,而且 JVM 保证了它们不会再被实例化,所以它天生就是单例的。不说了,读者自己看着办吧,不建议使用。

建造者模式

经常碰见的 XxxBuilder 的类,通常都是建造者模式的产物。建造者模式其实有很多的变种,但是对于客户端来说,我们的使用通常都是一个模式的:

1
2
复制代码Food food = new FoodBuilder().a().b().c().build();
Food food = Food.builder().a().b().c().build();

套路就是先 new 一个 Builder,然后可以链式地调用一堆方法,最后再调用一次 build() 方法,我们需要的对象就有了。

来一个中规中矩的建造者模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
复制代码class User {
// 下面是“一堆”的属性
private String name;
private String password;
private String nickName;
private int age;
// 构造方法私有化,不然客户端就会直接调用构造方法了
private User(String name, String password, String nickName, int age) {
this.name = name;
this.password = password;
this.nickName = nickName;
this.age = age;
}
// 静态方法,用于生成一个 Builder,这个不一定要有,不过写这个方法是一个很好的习惯,
// 有些代码要求别人写 new User.UserBuilder().a()...build() 看上去就没那么好
public static UserBuilder builder() {
return new UserBuilder();
}

public static class UserBuilder {
// 下面是和 User 一模一样的一堆属性
private String name;
private String password;
private String nickName;
private int age;
private UserBuilder() {
}
// 链式调用设置各个属性值,返回 this,即 UserBuilder
public UserBuilder name(String name) {
this.name = name;
return this;
}
public UserBuilder password(String password) {
this.password = password;
return this;
}
public UserBuilder nickName(String nickName) {
this.nickName = nickName;
return this;
}
public UserBuilder age(int age) {
this.age = age;
return this;
}
// build() 方法负责将 UserBuilder 中设置好的属性“复制”到 User 中。
// 当然,可以在 “复制” 之前做点检验
public User build() {
if (name == null || password == null) {
throw new RuntimeException("用户名和密码必填");
}
if (age <= 0 || age >= 150) {
throw new RuntimeException("年龄不合法");
}
// 还可以做赋予”默认值“的功能
if (nickName == null) {
nickName = name;
}
return new User(name, password, nickName, age);
}
}
}

核心是:先把所有的属性都设置给 Builder,然后 build() 方法的时候,将这些属性复制给实际产生的对象。

看看客户端的调用:

1
2
3
4
5
6
7
8
9
复制代码public class APP {
public static void main(String[] args) {
User d = User.builder()
.name("foo")
.password("pAss12345")
.age(25)
.build();
}
}

说实话,建造者模式的链式写法很吸引人,但是,多写了很多“无用”的 builder 的代码,感觉这个模式没什么用。不过,当属性很多,而且有些必填,有些选填的时候,这个模式会使代码清晰很多。我们可以在 Builder 的构造方法中强制让调用者提供必填字段,还有,在 build() 方法中校验各个参数比在 User 的构造方法中校验,代码要优雅一些。

题外话,强烈建议读者使用 lombok,用了 lombok 以后,上面的一大堆代码会变成如下这样:

1
2
3
4
5
6
7
复制代码@Builder
class User {
private String name;
private String password;
private String nickName;
private int age;
}

怎么样,省下来的时间是不是又可以干点别的了。

当然,如果你只是想要链式写法,不想要建造者模式,有个很简单的办法,User 的 getter 方法不变,所有的 setter 方法都让其 return this 就可以了,然后就可以像下面这样调用:

1
复制代码User user = new User().setName("").setPassword("").setAge(20);

原型模式

这是我要说的创建型模式的最后一个设计模式了。

原型模式很简单:有一个原型实例,基于这个原型实例产生新的实例,也就是“克隆”了。

Object 类中有一个 clone() 方法,它用于生成一个新的对象,当然,如果我们要调用这个方法,java 要求我们的类必须先实现 Cloneable 接口,此接口没有定义任何方法,但是不这么做的话,在 clone() 的时候,会抛出 CloneNotSupportedException 异常。

1
复制代码protected native Object clone() throws CloneNotSupportedException;

java 的克隆是浅克隆,碰到对象引用的时候,克隆出来的对象和原对象中的引用将指向同一个对象。通常实现深克隆的方法是将对象进行序列化,然后再进行反序列化。

原型模式了解到这里我觉得就够了,各种变着法子说这种代码或那种代码是原型模式,没什么意义。

创建型模式总结

创建型模式总体上比较简单,它们的作用就是为了产生实例对象,算是各种工作的第一步了,因为我们写的是面向对象的代码,所以我们第一步当然是需要创建一个对象了。

简单工厂模式最简单;工厂模式在简单工厂模式的基础上增加了选择工厂的维度,需要第一步选择合适的工厂;抽象工厂模式有产品族的概念,如果各个产品是存在兼容性问题的,就要用抽象工厂模式。单例模式就不说了,为了保证全局使用的是同一对象,一方面是安全性考虑,一方面是为了节省资源;建造者模式专门对付属性很多的那种类,为了让代码更优美;原型模式用得最少,了解和 Object 类中的 clone() 方法相关的知识即可。

结构型模式

前面创建型模式介绍了创建对象的一些设计模式,这节介绍的结构型模式旨在通过改变代码结构来达到解耦的目的,使得我们的代码容易维护和扩展。

代理模式

第一个要介绍的代理模式是最常使用的模式之一了,用一个代理来隐藏具体实现类的实现细节,通常还用于在真实的实现的前后添加一部分逻辑。

既然说是代理,那就要对客户端隐藏真实实现,由代理来负责客户端的所有请求。当然,代理只是个代理,它不会完成实际的业务逻辑,而是一层皮而已,但是对于客户端来说,它必须表现得就是客户端需要的真实实现。

理解代理这个词,这个模式其实就简单了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
复制代码public interface FoodService {
Food makeChicken();
Food makeNoodle();
}
public class FoodServiceImpl implements FoodService {
public Food makeChicken() {
Food f = new Chicken()
f.setChicken("1kg");
f.setSpicy("1g");
f.setSalt("3g");
return f;
}
public Food makeNoodle() {
Food f = new Noodle();
f.setNoodle("500g");
f.setSalt("5g");
return f;
}
}
// 代理要表现得“就像是”真实实现类,所以需要实现 FoodService
public class FoodServiceProxy implements FoodService {

// 内部一定要有一个真实的实现类,当然也可以通过构造方法注入
private FoodService foodService = new FoodServiceImpl();

public Food makeChicken() {
System.out.println("我们马上要开始制作鸡肉了");

// 如果我们定义这句为核心代码的话,那么,核心代码是真实实现类做的,
// 代理只是在核心代码前后做些“无足轻重”的事情
Food chicken = foodService.makeChicken();

System.out.println("鸡肉制作完成啦,加点胡椒粉"); // 增强
chicken.addCondiment("pepper");

return fruit;
}
public Food makeNoodle() {
System.out.println("准备制作拉面~");
Food noodle = foodService.makeNoodle();
System.out.println("制作完成啦")
return fruit;
}
}

客户端调用,注意,我们要用代理来实例化接口:

1
2
3
复制代码// 这里用代理类来实例化
FruitService fruitService = new FruitServiceProxy();
fruitService.makeChicken();

proxy

我们发现没有,代理模式说白了就是做 “方法包装” 或做 “方法增强”。在面向切面编程中,算了还是不要吹捧这个名词了,在 AOP 中,其实就是动态代理的过程。比如 Spring 中,我们自己不定义代理类,但是 Spring 会帮我们动态来定义代理,然后把我们定义在 @Before、@After、@Around 中的代码逻辑动态添加到代理中。

说到动态代理,又可以展开说 …… Spring 中实现动态代理有两种,一种是如果我们的类定义了接口,如 UserService 接口和 UserServiceImpl 实现,那么采用 JDK 的动态代理,感兴趣的读者可以去看看 java.lang.reflect.Proxy 类的源码;另一种是我们自己没有定义接口的,Spring 会采用 CGLIB 进行动态代理,它是一个 jar 包,性能还不错。

适配器模式

说完代理模式,说适配器模式,是因为它们很相似,这里可以做个比较。

适配器模式做的就是,有一个接口需要实现,但是我们现成的对象都不满足,需要加一层适配器来进行适配。

适配器模式总体来说分三种:默认适配器模式、对象适配器模式、类适配器模式。先不急着分清楚这几个,先看看例子再说。

默认适配器模式

首先,我们先看看最简单的适配器模式默认适配器模式(Default Adapter)是怎么样的。

我们用 Appache commons-io 包中的 FileAlterationListener 做例子,此接口定义了很多的方法,用于对文件或文件夹进行监控,一旦发生了对应的操作,就会触发相应的方法。

1
2
3
4
5
6
7
8
9
10
复制代码public interface FileAlterationListener {
void onStart(final FileAlterationObserver observer);
void onDirectoryCreate(final File directory);
void onDirectoryChange(final File directory);
void onDirectoryDelete(final File directory);
void onFileCreate(final File file);
void onFileChange(final File file);
void onFileDelete(final File file);
void onStop(final FileAlterationObserver observer);
}

此接口的一大问题是抽象方法太多了,如果我们要用这个接口,意味着我们要实现每一个抽象方法,如果我们只是想要监控文件夹中的文件创建和文件删除事件,可是我们还是不得不实现所有的方法,很明显,这不是我们想要的。

所以,我们需要下面的一个适配器,它用于实现上面的接口,但是所有的方法都是空方法,这样,我们就可以转而定义自己的类来继承下面这个类即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码public class FileAlterationListenerAdaptor implements FileAlterationListener {
public void onStart(final FileAlterationObserver observer) {
}
public void onDirectoryCreate(final File directory) {
}
public void onDirectoryChange(final File directory) {
}
public void onDirectoryDelete(final File directory) {
}
public void onFileCreate(final File file) {
}
public void onFileChange(final File file) {
}
public void onFileDelete(final File file) {
}
public void onStop(final FileAlterationObserver observer) {
}
}

比如我们可以定义以下类,我们仅仅需要实现我们想实现的方法就可以了:

1
2
3
4
5
6
7
8
9
10
复制代码public class FileMonitor extends FileAlterationListenerAdaptor {
public void onFileCreate(final File file) {
// 文件创建
doSomething();
}
public void onFileDelete(final File file) {
// 文件删除
doSomething();
}
}

当然,上面说的只是适配器模式的其中一种,也是最简单的一种,无需多言。下面,再介绍“正统的”适配器模式。

对象适配器模式

来看一个《Head First 设计模式》中的一个例子,我稍微修改了一下,看看怎么将鸡适配成鸭,这样鸡也能当鸭来用。因为,现在鸭这个接口,我们没有合适的实现类可以用,所以需要适配器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码public interface Duck {
public void quack(); // 鸭的呱呱叫
public void fly(); // 飞
}
public interface Cock {
public void gobble(); // 鸡的咕咕叫
public void fly(); // 飞
}
public class WildCock implements Cock {
public void gobble() {
System.out.println("咕咕叫");
}
public void fly() {
System.out.println("鸡也会飞哦");
}
}

鸭接口有 fly() 和 quare() 两个方法,鸡 Cock 如果要冒充鸭,fly() 方法是现成的,但是鸡不会鸭的呱呱叫,没有 quack() 方法。这个时候就需要适配了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码// 毫无疑问,首先,这个适配器肯定需要 implements Duck,这样才能当做鸭来用
public class CockAdapter implements Duck {

Cock cock;
// 构造方法中需要一个鸡的实例,此类就是将这只鸡适配成鸭来用
public CockAdapter(Cock cock) {
this.cock = cock;
}

// 实现鸭的呱呱叫方法
@Override
public void quack() {
// 内部其实是一只鸡的咕咕叫
cock.gobble();
}

@Override
public void fly() {
cock.fly();
}
}

客户端调用很简单了:

1
2
3
4
5
6
7
复制代码public static void main(String[] args) {
// 有一只野鸡
Cock wildCock = new WildCock();
// 成功将野鸡适配成鸭
Duck duck = new CockAdapter(wildCock);
...
}

到这里,大家也就知道了适配器模式是怎么回事了。无非是我们需要一只鸭,但是我们只有一只鸡,这个时候就需要定义一个适配器,由这个适配器来充当鸭,但是适配器里面的方法还是由鸡来实现的。

我们用一个图来简单说明下:

adapter-1

上图应该还是很容易理解的,我就不做更多的解释了。下面,我们看看类适配模式怎么样的。

类适配器模式

废话少说,直接上图:

adapter-1

看到这个图,大家应该很容易理解的吧,通过继承的方法,适配器自动获得了所需要的大部分方法。这个时候,客户端使用更加简单,直接 Target t = new SomeAdapter(); 就可以了。

适配器模式总结

  1. 类适配和对象适配的异同

一个采用继承,一个采用组合;

类适配属于静态实现,对象适配属于组合的动态实现,对象适配需要多实例化一个对象。

总体来说,对象适配用得比较多。
2. 适配器模式和代理模式的异同

比较这两种模式,其实是比较对象适配器模式和代理模式,在代码结构上,它们很相似,都需要一个具体的实现类的实例。但是它们的目的不一样,代理模式做的是增强原方法的活;适配器做的是适配的活,为的是提供“把鸡包装成鸭,然后当做鸭来使用”,而鸡和鸭它们之间原本没有继承关系。

adapter-5

桥梁模式

理解桥梁模式,其实就是理解代码抽象和解耦。

我们首先需要一个桥梁,它是一个接口,定义提供的接口方法。

1
2
3
复制代码public interface DrawAPI {
public void draw(int radius, int x, int y);
}

然后是一系列实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码public class RedPen implements DrawAPI {
@Override
public void draw(int radius, int x, int y) {
System.out.println("用红色笔画图,radius:" + radius + ", x:" + x + ", y:" + y);
}
}
public class GreenPen implements DrawAPI {
@Override
public void draw(int radius, int x, int y) {
System.out.println("用绿色笔画图,radius:" + radius + ", x:" + x + ", y:" + y);
}
}
public class BluePen implements DrawAPI {
@Override
public void draw(int radius, int x, int y) {
System.out.println("用蓝色笔画图,radius:" + radius + ", x:" + x + ", y:" + y);
}
}

定义一个抽象类,此类的实现类都需要使用 DrawAPI:

1
2
3
4
5
6
7
8
复制代码public abstract class Shape {
protected DrawAPI drawAPI;

protected Shape(DrawAPI drawAPI){
this.drawAPI = drawAPI;
}
public abstract void draw();
}

定义抽象类的子类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码// 圆形
public class Circle extends Shape {
private int radius;
public Circle(int radius, DrawAPI drawAPI) {
super(drawAPI);
this.radius = radius;
}
public void draw() {
drawAPI.draw(radius, 0, 0);
}
}
// 长方形
public class Rectangle extends Shape {
private int x;
private int y;

public Rectangle(int x, int y, DrawAPI drawAPI) {
super(drawAPI);
this.x = x;
this.y = y;
}
public void draw() {
drawAPI.draw(0, x, y);
}
}

最后,我们来看客户端演示:

1
2
3
4
5
6
7
复制代码public static void main(String[] args) {
Shape greenCircle = new Circle(10, new GreenPen());
Shape redRectangle = new Rectangle(4, 8, new RedPen());

greenCircle.draw();
redRectangle.draw();
}

可能大家看上面一步步还不是特别清晰,我把所有的东西整合到一张图上:

bridge-1

这回大家应该就知道抽象在哪里,怎么解耦了吧。桥梁模式的优点也是显而易见的,就是非常容易进行扩展。

本节引用了这里的例子,并对其进行了修改。

装饰模式

要把装饰模式说清楚明白,不是件容易的事情。也许读者知道 Java IO 中的几个类是典型的装饰模式的应用,但是读者不一定清楚其中的关系,也许看完就忘了,希望看完这节后,读者可以对其有更深的感悟。

首先,我们先看一个简单的图,看这个图的时候,了解下层次结构就可以了:

decorator-1

我们来说说装饰模式的出发点,从图中可以看到,接口 Component 其实已经有了 ConcreteComponentA 和 ConcreteComponentB 两个实现类了,但是,如果我们要增强这两个实现类的话,我们就可以采用装饰模式,用具体的装饰器来装饰实现类,以达到增强的目的。

从名字来简单解释下装饰器。既然说是装饰,那么往往就是添加小功能这种,而且,我们要满足可以添加多个小功能。最简单的,代理模式就可以实现功能的增强,但是代理不容易实现多个功能的增强,当然你可以说用代理包装代理的方式,但是那样的话代码就复杂了。

首先明白一些简单的概念,从图中我们看到,所有的具体装饰者们 ConcreteDecorator 都可以作为 Component 来使用,因为它们都实现了 Component 中的所有接口。它们和 Component 实现类 ConcreteComponent 的区别是,它们只是装饰者,起装饰作用,也就是即使它们看上去牛逼轰轰,但是它们都只是在具体的实现中加了层皮来装饰而已。

注意这段话中混杂在各个名词中的 Component 和 Decorator,别搞混了。

下面来看看一个例子,先把装饰模式弄清楚,然后再介绍下 java io 中的装饰模式的应用。

最近大街上流行起来了“快乐柠檬”,我们把快乐柠檬的饮料分为三类:红茶、绿茶、咖啡,在这三大类的基础上,又增加了许多的口味,什么金桔柠檬红茶、金桔柠檬珍珠绿茶、芒果红茶、芒果绿茶、芒果珍珠红茶、烤珍珠红茶、烤珍珠芒果绿茶、椰香胚芽咖啡、焦糖可可咖啡等等,每家店都有很长的菜单,但是仔细看下,其实原料也没几样,但是可以搭配出很多组合,如果顾客需要,很多没出现在菜单中的饮料他们也是可以做的。

在这个例子中,红茶、绿茶、咖啡是最基础的饮料,其他的像金桔柠檬、芒果、珍珠、椰果、焦糖等都属于装饰用的。当然,在开发中,我们确实可以像门店一样,开发这些类:LemonBlackTea、LemonGreenTea、MangoBlackTea、MangoLemonGreenTea……但是,很快我们就发现,这样子干肯定是不行的,这会导致我们需要组合出所有的可能,而且如果客人需要在红茶中加双份柠檬怎么办?三份柠檬怎么办?万一有个变态要四份柠檬,所以这种做法是给自己找加班的。

不说废话了,上代码。

首先,定义饮料抽象基类:

1
2
3
4
5
6
复制代码public abstract class Beverage {
// 返回描述
public abstract String getDescription();
// 返回价格
public abstract double cost();
}

然后是三个基础饮料实现类,红茶、绿茶和咖啡:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码public class BlackTea extends Beverage {
public String getDescription() {
return "红茶";
}
public double cost() {
return 10;
}
}
public class GreenTea extends Beverage {
public String getDescription() {
return "绿茶";
}
public double cost() {
return 11;
}
}
...// 咖啡省略

定义调料,也就是装饰者的基类,此类必须继承自 Beverage:

1
2
3
4
复制代码// 调料
public abstract class Condiment extends Beverage {

}

然后我们来定义柠檬、芒果等具体的调料,它们属于装饰者,毫无疑问,这些调料肯定都需要继承 Condiment 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
复制代码public class Lemon extends Condiment {
private Beverage bevarage;
// 这里很关键,需要传入具体的饮料,如需要传入没有被装饰的红茶或绿茶,
// 当然也可以传入已经装饰好的芒果绿茶,这样可以做芒果柠檬绿茶
public Lemon(Beverage bevarage) {
this.bevarage = bevarage;
}
public String getDescription() {
// 装饰
return bevarage.getDescription() + ", 加柠檬";
}
public double cost() {
// 装饰
return beverage.cost() + 2; // 加柠檬需要 2 元
}
}
public class Mango extends Condiment {
private Beverage bevarage;
public Mango(Beverage bevarage) {
this.bevarage = bevarage;
}
public String getDescription() {
return bevarage.getDescription() + ", 加芒果";
}
public double cost() {
return beverage.cost() + 3; // 加芒果需要 3 元
}
}
...// 给每一种调料都加一个类

看客户端调用:

1
2
3
4
5
6
7
8
9
10
复制代码public static void main(String[] args) {
// 首先,我们需要一个基础饮料,红茶、绿茶或咖啡
Beverage beverage = new GreenTea();
// 开始装饰
beverage = new Lemon(beverage); // 先加一份柠檬
beverage = new Mongo(beverage); // 再加一份芒果

System.out.println(beverage.getDescription() + " 价格:¥" + beverage.cost());
//"绿茶, 加柠檬, 加芒果 价格:¥16"
}

如果我们需要芒果珍珠双份柠檬红茶:

1
复制代码Beverage beverage = new Mongo(new Pearl(new Lemon(new Lemon(new BlackTea()))));

是不是很变态?

看看下图可能会清晰一些:

decorator-2

到这里,大家应该已经清楚装饰模式了吧。

下面,我们再来说说 java IO 中的装饰模式。看下图 InputStream 派生出来的部分类:

decorator-3

我们知道 InputStream 代表了输入流,具体的输入来源可以是文件(FileInputStream)、管道(PipedInputStream)、数组(ByteArrayInputStream)等,这些就像前面奶茶的例子中的红茶、绿茶,属于基础输入流。

FilterInputStream 承接了装饰模式的关键节点,其实现类是一系列装饰器,比如 BufferedInputStream 代表用缓冲来装饰,也就使得输入流具有了缓冲的功能,LineNumberInputStream 代表用行号来装饰,在操作的时候就可以取得行号了,DataInputStream 的装饰,使得我们可以从输入流转换为 java 中的基本类型值。

当然,在 java IO 中,如果我们使用装饰器的话,就不太适合面向接口编程了,如:

1
复制代码InputStream inputStream = new LineNumberInputStream(new BufferedInputStream(new FileInputStream("")));

这样的结果是,InputStream 还是不具有读取行号的功能,因为读取行号的方法定义在 LineNumberInputStream 类中。

我们应该像下面这样使用:

1
2
3
复制代码DataInputStream is = new DataInputStream(
new BufferedInputStream(
new FileInputStream("")));

所以说嘛,要找到纯的严格符合设计模式的代码还是比较难的。

门面模式

门面模式(也叫外观模式,Facade Pattern)在许多源码中有使用,比如 slf4j 就可以理解为是门面模式的应用。这是一个简单的设计模式,我们直接上代码再说吧。

首先,我们定义一个接口:

1
2
3
复制代码public interface Shape {
void draw();
}

定义几个实现类:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码public class Circle implements Shape {
@Override
public void draw() {
System.out.println("Circle::draw()");
}
}
public class Rectangle implements Shape {
@Override
public void draw() {
System.out.println("Rectangle::draw()");
}
}

客户端调用:

1
2
3
4
5
6
7
8
9
复制代码public static void main(String[] args) {
// 画一个圆形
Shape circle = new Circle();
circle.draw();

// 画一个长方形
Shape rectangle = new Rectangle();
rectangle.draw();
}

以上是我们常写的代码,我们需要画圆就要先实例化圆,画长方形就需要先实例化一个长方形,然后再调用相应的 draw() 方法。

下面,我们看看怎么用门面模式来让客户端调用更加友好一些。

我们先定义一个门面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码public class ShapeMaker {
private Shape circle;
private Shape rectangle;
private Shape square;
public ShapeMaker() {
circle = new Circle();
rectangle = new Rectangle();
square = new Square();
}
/**
* 下面定义一堆方法,具体应该调用什么方法,由这个门面来决定
*/

public void drawCircle(){
circle.draw();
}
public void drawRectangle(){
rectangle.draw();
}
public void drawSquare(){
square.draw();
}
}

看看现在客户端怎么调用:

1
2
3
4
5
6
7
复制代码public static void main(String[] args) {
ShapeMaker shapeMaker = new ShapeMaker();
// 客户端调用现在更加清晰了
shapeMaker.drawCircle();
shapeMaker.drawRectangle();
shapeMaker.drawSquare();
}

门面模式的优点显而易见,客户端不再需要关注实例化时应该使用哪个实现类,直接调用门面提供的方法就可以了,因为门面类提供的方法的方法名对于客户端来说已经很友好了。

组合模式

组合模式用于表示具有层次结构的数据,使得我们对单个对象和组合对象的访问具有一致性。

直接看一个例子吧,每个员工都有姓名、部门、薪水这些属性,同时还有下属员工集合(虽然可能集合为空),而下属员工和自己的结构是一样的,也有姓名、部门这些属性,同时也有他们的下属员工集合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码public class Employee {
private String name;
private String dept;
private int salary;
private List<Employee> subordinates; // 下属
public Employee(String name,String dept, int sal) {
this.name = name;
this.dept = dept;
this.salary = sal;
subordinates = new ArrayList<Employee>();
}
public void add(Employee e) {
subordinates.add(e);
}
public void remove(Employee e) {
subordinates.remove(e);
}
public List<Employee> getSubordinates(){
return subordinates;
}
public String toString(){
return ("Employee :[ Name : " + name + ", dept : " + dept + ", salary :" + salary+" ]");
}
}

通常,这种类需要定义 add(node)、remove(node)、getChildren() 这些方法。

这说的其实就是组合模式,这种简单的模式我就不做过多介绍了,相信各位读者也不喜欢看我写废话。

享元模式

英文是 Flyweight Pattern,不知道是谁最先翻译的这个词,感觉这翻译真的不好理解,我们试着强行关联起来吧。Flyweight 是轻量级的意思,享元分开来说就是 共享 元器件,也就是复用已经生成的对象,这种做法当然也就是轻量级的了。

复用对象最简单的方式是,用一个 HashMap 来存放每次新生成的对象。每次需要一个对象的时候,先到 HashMap 中看看有没有,如果没有,再生成新的对象,然后将这个对象放入 HashMap 中。

这种简单的代码我就不演示了。

结构型模式总结

前面,我们说了代理模式、适配器模式、桥梁模式、装饰模式、门面模式、组合模式和享元模式。读者是否可以分别把这几个模式说清楚了呢?在说到这些模式的时候,心中是否有一个清晰的图或处理流程在脑海里呢?

代理模式是做方法增强的,适配器模式是把鸡包装成鸭这种用来适配接口的,桥梁模式做到了很好的解耦,装饰模式从名字上就看得出来,适合于装饰类或者说是增强类的场景,门面模式的优点是客户端不需要关心实例化过程,只要调用需要的方法即可,组合模式用于描述具有层次结构的数据,享元模式是为了在特定的场景中缓存已经创建的对象,用于提高性能。

行为型模式

行为型模式关注的是各个类之间的相互作用,将职责划分清楚,使得我们的代码更加地清晰。

策略模式

策略模式太常用了,所以把它放到最前面进行介绍。它比较简单,我就不废话,直接用代码说事吧。

下面设计的场景是,我们需要画一个图形,可选的策略就是用红色笔来画,还是绿色笔来画,或者蓝色笔来画。

首先,先定义一个策略接口:

1
2
3
复制代码public interface Strategy {
public void draw(int radius, int x, int y);
}

然后我们定义具体的几个策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码public class RedPen implements Strategy {
@Override
public void draw(int radius, int x, int y) {
System.out.println("用红色笔画图,radius:" + radius + ", x:" + x + ", y:" + y);
}
}
public class GreenPen implements Strategy {
@Override
public void draw(int radius, int x, int y) {
System.out.println("用绿色笔画图,radius:" + radius + ", x:" + x + ", y:" + y);
}
}
public class BluePen implements Strategy {
@Override
public void draw(int radius, int x, int y) {
System.out.println("用蓝色笔画图,radius:" + radius + ", x:" + x + ", y:" + y);
}
}

使用策略的类:

1
2
3
4
5
6
7
8
9
复制代码public class Context {
private Strategy strategy;
public Context(Strategy strategy){
this.strategy = strategy;
}
public int executeDraw(int radius, int x, int y){
return strategy.draw(radius, x, y);
}
}

客户端演示:

1
2
3
4
复制代码public static void main(String[] args) {
Context context = new Context(new BluePen()); // 使用绿色笔来画
context.executeDraw(10, 0, 0);
}

放到一张图上,让大家看得清晰些:

strategy-1

这个时候,大家有没有联想到结构型模式中的桥梁模式,它们其实非常相似,我把桥梁模式的图拿过来大家对比下:

bridge-1

要我说的话,它们非常相似,桥梁模式在左侧加了一层抽象而已。桥梁模式的耦合更低,结构更复杂一些。

观察者模式

观察者模式对于我们来说,真是再简单不过了。无外乎两个操作,观察者订阅自己关心的主题和主题有数据变化后通知观察者们。

首先,需要定义主题,每个主题需要持有观察者列表的引用,用于在数据变更的时候通知各个观察者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码public class Subject {

private List<Observer> observers = new ArrayList<Observer>();
private int state;
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
// 数据已变更,通知观察者们
notifyAllObservers();
}
public void attach(Observer observer){
observers.add(observer);
}
// 通知观察者们
public void notifyAllObservers(){
for (Observer observer : observers) {
observer.update();
}
}
}

定义观察者接口:

1
2
3
4
复制代码public abstract class Observer {
protected Subject subject;
public abstract void update();
}

其实如果只有一个观察者类的话,接口都不用定义了,不过,通常场景下,既然用到了观察者模式,我们就是希望一个事件出来了,会有多个不同的类需要处理相应的信息。比如,订单修改成功事件,我们希望发短信的类得到通知、发邮件的类得到通知、处理物流信息的类得到通知等。

我们来定义具体的几个观察者类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码public class BinaryObserver extends Observer {
// 在构造方法中进行订阅主题
public BinaryObserver(Subject subject) {
this.subject = subject;
// 通常在构造方法中将 this 发布出去的操作一定要小心
this.subject.attach(this);
}
// 该方法由主题类在数据变更的时候进行调用
@Override
public void update() {
String result = Integer.toBinaryString(subject.getState());
System.out.println("订阅的数据发生变化,新的数据处理为二进制值为:" + result);
}
}
public class HexaObserver extends Observer {
public HexaObserver(Subject subject) {
this.subject = subject;
this.subject.attach(this);
}
@Override
public void update() {
String result = Integer.toHexString(subject.getState()).toUpperCase();
System.out.println("订阅的数据发生变化,新的数据处理为十六进制值为:" + result);
}
}

客户端使用也非常简单:

1
2
3
4
5
6
7
8
9
10
复制代码public static void main(String[] args) {
// 先定义一个主题
Subject subject1 = new Subject();
// 定义观察者
new BinaryObserver(subject1);
new HexaObserver(subject1);

// 模拟数据变更,这个时候,观察者们的 update 方法将会被调用
subject.setState(11);
}

output:

1
2
复制代码订阅的数据发生变化,新的数据处理为二进制值为:1011
订阅的数据发生变化,新的数据处理为十六进制值为:B

当然,jdk 也提供了相似的支持,具体的大家可以参考 java.util.Observable 和 java.util.Observer 这两个类。

实际生产过程中,观察者模式往往用消息中间件来实现,如果要实现单机观察者模式,笔者建议读者使用 Guava 中的 EventBus,它有同步实现也有异步实现,本文主要介绍设计模式,就不展开说了。

责任链模式

责任链通常需要先建立一个单向链表,然后调用方只需要调用头部节点就可以了,后面会自动流转下去。比如流程审批就是一个很好的例子,只要终端用户提交申请,根据申请的内容信息,自动建立一条责任链,然后就可以开始流转了。

有这么一个场景,用户参加一个活动可以领取奖品,但是活动需要进行很多的规则校验然后才能放行,比如首先需要校验用户是否是新用户、今日参与人数是否有限额、全场参与人数是否有限额等等。设定的规则都通过后,才能让用户领走奖品。

如果产品给你这个需求的话,我想大部分人一开始肯定想的就是,用一个 List 来存放所有的规则,然后 foreach 执行一下每个规则就好了。不过,读者也先别急,看看责任链模式和我们说的这个有什么不一样?

首先,我们要定义流程上节点的基类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码public abstract class RuleHandler {

// 后继节点
protected RuleHandler successor;

public abstract void apply(Context context);

public void setSuccessor(RuleHandler successor) {
this.successor = successor;
}
public RuleHandler getSuccessor() {
return successor;
}
}

接下来,我们需要定义具体的每个节点了。

校验用户是否是新用户:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码public class NewUserRuleHandler extends RuleHandler {

public void apply(Context context) {
if (context.isNewUser()) {
// 如果有后继节点的话,传递下去
if (this.getSuccessor() != null) {
this.getSuccessor().apply(context);
}
} else {
throw new RuntimeException("该活动仅限新用户参与");
}
}

}

校验用户所在地区是否可以参与:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码public class LocationRuleHandler extends RuleHandler {
public void apply(Context context) {
boolean allowed = activityService.isSupportedLocation(context.getLocation);
if (allowed) {
if (this.getSuccessor() != null) {
this.getSuccessor().apply(context);
}
} else {
throw new RuntimeException("非常抱歉,您所在的地区无法参与本次活动");
}
}
}

校验奖品是否已领完:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码public class LimitRuleHandler extends RuleHandler {
public void apply(Context context) {
int remainedTimes = activityService.queryRemainedTimes(context); // 查询剩余奖品
if (remainedTimes > 0) {
if (this.getSuccessor() != null) {
this.getSuccessor().apply(userInfo);
}
} else {
throw new RuntimeException("您来得太晚了,奖品被领完了");
}
}
}

客户端:

1
2
3
4
5
6
7
8
9
复制代码public static void main(String[] args) {
RuleHandler newUserHandler = new NewUserRuleHandler();
RuleHandler locationHandler = new LocationRuleHandler();
RuleHandler limitHandler = new LimitRuleHandler();

// 假设本次活动仅校验地区和奖品数量,不校验新老用户
locationHandler.setSuccessor(limitHandler);
locationHandler.apply(context);
}

代码其实很简单,就是先定义好一个链表,然后在通过任意一节点后,如果此节点有后继节点,那么传递下去。

至于它和我们前面说的用一个 List 存放需要执行的规则的做法有什么异同,留给读者自己琢磨吧。

模板方法模式

在含有继承结构的代码中,模板方法模式是非常常用的,这也是在开源代码中大量被使用的。

通常会有一个抽象类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码public abstract class AbstractTemplate {
// 这就是模板方法
public void templateMethod(){
init();
apply(); // 这个是重点
end(); // 可以作为钩子方法
}
protected void init() {
System.out.println("init 抽象层已经实现,子类也可以选择覆写");
}
// 留给子类实现
protected abstract void apply();
protected void end() {
}
}

模板方法中调用了 3 个方法,其中 apply() 是抽象方法,子类必须实现它,其实模板方法中有几个抽象方法完全是自由的,我们也可以将三个方法都设置为抽象方法,让子类来实现。也就是说,模板方法只负责定义第一步应该要做什么,第二步应该做什么,第三步应该做什么,至于怎么做,由子类来实现。

我们写一个实现类:

1
2
3
4
5
6
7
8
复制代码public class ConcreteTemplate extends AbstractTemplate {
public void apply() {
System.out.println("子类实现抽象方法 apply");
}
public void end() {
System.out.println("我们可以把 method3 当做钩子方法来使用,需要的时候覆写就可以了");
}
}

客户端调用演示:

1
2
3
4
5
复制代码public static void main(String[] args) {
AbstractTemplate t = new ConcreteTemplate();
// 调用模板方法
t.templateMethod();
}

代码其实很简单,基本上看到就懂了,关键是要学会用到自己的代码中。

行为型模式总结

行为型模式部分介绍了策略模式、观察者模式、责任链模式和模板方法模式,其实,经典的行为型模式还包括状态模式、命令模式等,但是它们的使用场景比较有限,而且本文篇幅也挺大了,我就不进行介绍了。

总结

学习设计模式的目的是为了让我们的代码更加的优雅、易维护、易扩展。我以前学设计模式的初衷是为了看懂开源代码,也为了写出装B的代码。这次整理这篇文章,让我重新审视了一下各个设计模式,对我自己而言收获还是挺大的。我想,文章的最大收益者一般都是作者本人,为了写一篇文章,需要巩固自己的知识,需要寻找各种资料,而且,自己写过的才最容易记住,也算是我给读者的建议吧。

(全文完)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…951952953…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%