ReentrantLock原理分析

1.ReetrantLock是什么?

ReetrantLock是基于Lock接口实现的一种可重入锁,说到重入锁,其实synchronized也是一种可重入锁,
但是它与synchronized有本质的区别,ReetrantLock底层是依靠AQS来实现的,而Synchronized是基于monitor监视器来实现的;

ReetrantLock是有两种模式的,一种是公平锁模式,一种是非公平锁模式,其实就是获取锁的时候是否按照顺序获得。注:源码只分析非公平锁模式。

2.AQS在ReetrantLock中怎么使用的?

ReetrantLock并没有直接继承AQS这个抽象类,而是通过内部定义一个抽象Sync类来继承AQS,然后实现一部分通用的锁需要的方法比如tryRelease方法,然后再通过定义公平类和非公平类来继承这个抽象类Syn,然后实现具体的Lock方法,其实这里是用到了典型的模板方法模式,AQS中定义了加锁的整个算法执行逻辑结构,具体加锁实现留给子类去实现。

下面是它的AQS在ReetrantLock的实现类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
java复制代码//ReetrantLock的全局AQS子类.
private final Sync sync;



//我们看看Syn这个静态抽象类干了那几件事:
//1.首先是继承了AQS这个抽象同步器类
//2.实现了非公平锁尝试获取锁操作的nonfairTryAcquire方法,
//3.实现了tryRelease尝试释放锁方法 (ps:获取锁和释放锁操作都是模板方法的体现)
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

abstract void lock();

// 首先获取当前线程,然后通过getState()方法获取全局state变量(ps:该变量是AQS的一个全局遍历,它可以判断当前锁被某个线程冲入几次)
//然后通过cas修改这个state从0改为1,如果成功则该线程获取锁成功,把他设置为当前锁持有者线程
//如果不成功,则判断当前锁持有者线程是否是自己,如果是者重入,修改state数量。
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程,已经当前锁的状态state
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//如果0则还没有线程获得锁,尝试修改状态,如果成功把自己设置为锁持有者线程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判断当前锁持有者是否是自己,如果是进行重入操作
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//尝试释放锁方法,
//1.计算需要释放释放个数与当前状态的差值,然后判断当前线程释放是锁持有者如果不是抛异常
//2.如果差值等于0者释放成功把锁持有者线程置空,让出位置给下一个线程
//3.最后修改state状态。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
...省略相关判断线程以及状态的简单方法
}

//非公平锁类,Sync的子类,实现了非公平锁加锁方法Lock的业务逻辑
//1.先通过cas修改状态修改成功者把当前线程设置为锁持有者线程
//2.调用acquire获取锁,该方法是AQS实现的
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

final void lock() {
//cas 修改state从0-1如果成功,则获得锁,否则调用acquire方法进行后续尝试获取锁操作。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//尝试获取锁方法,是由AQS的acquire方法调用,(ps:你可以发现模板方法者这里用得很多。)
//然后调用父类Sync的nonfairTryAcquire非公平锁尝试获取方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

/**
* 公平锁,公平锁不会先尝试修改state状态而是通过acquire去获取。
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}
//这个方法是公平锁尝试获取锁的方法
//1.获取当前线程和锁状态state,判断队列里是否有等待线程,如果没有并且cas修改成功则获得锁,修改为持有者线程
//2.判断当前锁持有者是否是自己,如果是进行重入操作
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

3.ReetrantLock的Lock执行流程以及源码分析

reentrantlock-lock.png
上图是Lock的大概流程,下面是部分源码解读:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
java复制代码//1.第一步是调用ReetrantLock的lock方法
public void lock() {
sync.lock();
}
//第二步是调用Sync子类的lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//第三步是调用AQS的acquire方法,先尝试获取锁,如果获取不到则生成一个waiter节点,然后
public final void acquire(int arg) {
//这个if,tryAcquire失败后,进行acquireqQueued去队列来从头节点的下一个节点开始重新tryAcquire获取如果成功则把当前节点中断,获得锁的线程执行。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}




//第四步是调用NonfairSync的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}


//第五步是调用Sync的nonfairTryAcquire方法,该部分源码前面已经介绍了。
final boolean nonfairTryAcquire(int acquires){}




//第六步如果还是失败则调用addWaiter
private Node addWaiter(Node mode) {
//创建当前线程创建成一个waiter节点
Node node = new Node(Thread.currentThread(), mode);
// 获取AQS队列的尾节点,然后把当前node 节点在尾部添加并且修改为节点为当前新node
Node pred = tail;
if (pred != null) {
node.prev = pred;
//这里存在并发操作,可能为节点变为null,就修改失败
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//cas修改为尾节点可能会失败,enq失败后初始化操作
enq(node);
return node;
}
//自旋,并且判断尾节点是否是null,如果是null,则当前没有等待节点则需要初始化一个节点作为头节点,该节点类似于哨兵作用。
//头节点是不干活的
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 创建一个node作为头节点,进行初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
//重新插入前面的node节点。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//第七步,调用AQS的acquireQueued,
//1. 节点添加到队列后,自旋,获取当前node节点的前一个节点的,
//2.如果当前的node的前一个节点是头节点,则证明队列里只有一个线程在等待,进行调用tryAcquire再次尝试获取锁,如果成功则把队列节点置空并且返回重新设置头节点
//3.获取失败,准备挂起,在挂起之前则调用shouldParkAfterFailedAcquire,主要是处理有一些等待节点已经被唤醒者需要移出队列的操作
//4.处理完后调用parkAndCheckInterrupt 进行挂起操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//处理被唤醒的节点,和挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//
if (failed)
cancelAcquire(node);
}
}
//处理已经被唤醒的节点,
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//一直while找到等待状态低于的节点然后把当前节点插入为下一个节点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//调用LockSupport.park挂起当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

简单小结:首先线程尝试获取锁,如果失败,创建一个Node节点,该节点包含当前线程,然后添加到AQS的双向等待队列中,最后再一次进行尝试获取锁,如果失败,则准备挂起当前线程,但是挂起之前会把等待队列中已经被Signal唤醒的节点移除调用再添加到等待队列中,最后挂起。

4.ReetrantLock的unLock执行流程以及源码分析

reentrantLock的unlock.png

上图是解锁的大概流程,下面看一些具体的源码解读:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码第一步:先调用unlock
public void unlock() {
sync.release(1);
}
第二步: 调用AQS的release方法,模板方法
public final boolean release(int arg) {
//调用Syn的tryRelease尝试获取锁。
if (tryRelease(arg)) {
Node h = head;
//如果头节点不是空而且状态不是0则尝试唤醒队列的后续节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
第三步,唤醒后续节点
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//一直遍历找到可以唤醒的jied
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//通过unpark唤醒
if (s != null)
LockSupport.unpark(s.thread);
}

小结: 解锁比较简单,就是判断是否是当前线程,如果是则通过cas修改state,并且判断是否是只重入了一次,如果是则已经释放锁成功,则唤醒队列后续可以唤醒的节点,否则不是重入,则锁持有者线程不变。

其他一些比如Condition条件分析,待续….

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%