被按在地上摩擦的AQS-加锁过程

「这是我参与11月更文挑战的第4天,活动详情查看:2021最后一次更文挑战

引言

谈到并发编程,就不得不谈ReentrantLock,谈到ReentrantLock就会问实现原理,谈到原理就引出AQS(AbstractQueuedSynchronized),然后就被按在地上无情的摩擦。这篇文章主要讲解加锁过程,下一篇写释放锁过程。

ReentrantLock使用

1
2
3
4
5
6
7
8
9
10
11
java复制代码Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
while(条件判断表达式) {
condition.wait();
}
// 处理逻辑
} finally {
lock.unlock();
}

lock.lock()显示的获取锁,并在finally块中显示的释放锁,目的是保证在获取到锁之后,最终能够被释放。

lock()方法调用过程(默认非公平锁)

非公平锁调用lock方法的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

final void lock() {
// 表示当前没有占有锁的线程,将锁的拥有者设置为当前线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// 获取锁
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

下图是ReentrantLock.lock的调用过程图。
Untitled-2021-11-08-0921.png

AQS加锁实现

AQS(AbstractQueuedSynchronizer):是JDK提供的同步框架,内部维护了一个FIFO双向队列(即CLH同步队列)。通过此队列实现同步状态的管理(volatile修饰的state状态,用于标志是否持有锁)。

Node

先了解AQS维护的队列节点结构,下面是队列节点Node的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码static final class Node {
/** 共享节点 */
static final Node SHARED = new Node();

/** 独占节点 */
static final Node EXCLUSIVE = null;

/** 因为超时或者中断,节点会被设置成取消状态,被取消的节点不会参与到竞争中,
会一直是取消状态不会改变 */
static final int CANCELLED = 1;

/** 后继节点处于等待状态,如果当前节点释放了同步状态或者被取消,
会通知后继节点,使其得以运行 */
static final int SIGNAL = -1;

/** 节点在等待条件队列中,节点线程等待在condition上,当其他线程对condition
调用了signal后,该节点将会从等待队列中进入同步队列中,获取同步状态 */
static final int CONDITION = -2;

/**
* 下一次共享式同步状态获取会无条件的传播下去
*/
static final int PROPAGATE = -3;

/** 等待状态 */
volatile int waitStatus;

/** 前驱节点 */
volatile Node prev;

/** 后继节点 */
volatile Node next;

/** 获取同步状态的线程 */
volatile Thread thread;

/**
* 下一个条件队列等待节点
*/
Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

FIFO队列(CLH队列)

队列用head,tail和state三个变量来维护,源码如下:

1
2
3
4
5
6
7
8
java复制代码/** 头节点 */
private transient volatile Node head;

/** 尾节点 */
private transient volatile Node tail;

/** 同步状态 */
private volatile int state;

结构示意图如下:
FIFO-2021-11-08-1427.png

compareAndSetState调用

首先尝试获取锁,调用compareAndSetState方法,期待值为0,新值为1。使用unsafecompareAndSwapInt方法,通过一次CAS操作来修改state属性。

CAS操作即内存拿到volatile修饰的state属性值,与期望值0对比,如果取到的值为0,则执行+1操作,将state修改为1。其中还涉及知识点volatile修饰变量保证线程间可见,以及CAS操作的经典ABA问题。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
java复制代码/**
* 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值。
* 这个操作具有volatile读和写的内存语义。
* @param expect 期望值
* @param update 新值
* @return false返回表示实际值不等于预期值,true表示成功
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

如果此方法执行成功,则调用setExclusiveOwnerThread方法将让线程占有锁,此时state已经置为1。

acquire调用

进入此方法说明,当前已经有其他线程占有锁了。由于此种加锁方式是非公平锁,进入方法后,首先尝试获取锁,如果获取不到锁,那么再将当前线程置于队列中,让当前线程中断执行。

非公平锁在此方法中首先展示不公平,这种不公平是对在队列中的线程来说的。就像我们去银行办业务,如果我是VIP用户,我可以越过等待的用户先办理,这对于其他等待用户不公平。

1
2
3
4
5
java复制代码public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire

调用此方法来尝试获取锁。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
/** 首先获取state状态,此时第一个判断c==0后的操作是因为,
有可能在执行过程中,其他线程释放了锁,那么state为0,则直接让当前线程持有锁
*/
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/** 如果当前线程就是持有锁的线程,那么state+1,
此处提现了可重入锁的概念,每次线程重入该锁就重复此操作*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

此处setState(nextc),只是单纯让state+1,而没有用CAS操作。

addWaiter

负责把当前无法获得锁的线程包装为一个Node添加到队尾。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

其中参数mode是独占锁还是共享锁,默认为null,独占锁。追加到队尾的动作分两步:

1.如果当前队尾已经存在(tail!=null),则使用CAS把当前线程追加到队尾。

2.如果当前Tail为null或则线程调用CAS设置队尾失败,则通过enq方法继续追加。

enq

通过for循环和CAS操作自旋过程,将当前线程加入队列中,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果队尾是null,则说明队列空了,将当前线程设置为头尾节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 队尾非空,通过CAS将当前线程加入队尾。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquireQueued

此方法是对addWaiter的补充,将加入队列的线程中断执行,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码final boolean acquireQueued(final Node node, int arg) {
// 操作失败标志
boolean failed = true;
try {
// 线程中断标志
boolean interrupted = false;
for (;;) {
// 当前节点的prev节点
final Node p = node.predecessor();
// 如果前一节点是头结点,并且尝试获取同步状态成功
if (p == head && tryAcquire(arg)) {
// 将当前当前线程设置成头结点
setHead(node);
// 将prev移除队列
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断当前线程是否需要阻塞 && 阻塞当前线程并且检验线程中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 取消获取同步状态
if (failed)
cancelAcquire(node);
}
}

p == head && tryAcquire(arg),这里的判断也显示了非公平的意义。队里中有等待线程还要尝试获取锁。

shouldParkAfterFailedAcquire

此方法是阻塞线程前最后的检查操作,通过prev节点的等待状态判断当前线程是否应该被阻塞,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 拿到prev节点的等待状态
int ws = pred.waitStatus;

if (ws == Node.SIGNAL)
/*
* 如果prev的status是signal,表示prev处于等待状态,可以阻塞当前线程,
* 当prev释放了同步状态或者取消了,会通知当前节点。
*/
return true;
if (ws > 0) {
/*
* status > 0,表示为取消状态,需要将取消状态的节点从队列中移除
* 直到找到一个状态不是取消的节点为止
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 除了以上情况,通过CAS将prev的status设置成signal
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

parkAndCheckInterrupt

如果程序走到这个位置,那么就说明已经将当前线程加入队列中,可以让线程中断了。线程阻塞通过调用LockSupport.park()完成,而LockSupport.park()则调用sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。

1
2
3
4
java复制代码private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

至此加锁过程完成。

总结

ReentrantLock加锁是通过AQS实现,AQS中维护了一个FIFO的队列,当存在锁竞争时构建队列,构建过程中使用CAS和自旋,保证线程能够进入队列。已经进入队列的线程需要阻塞,使用LockSupport.park()方法完成,阻塞线程能够让CPU更专注于执行持有锁的线程,而不是将资源浪费在尝试获取锁的自旋过程中。

以上是对ReentrantLock加锁的过程分析,希望大佬多提意见。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%