AQS原理学习

本人CSDN地址:blog.csdn.net/qq_43731074…

AQS是指JUC包下的AbstractQueuedSynchronizer的简称,它是Java锁以及一些同步器(JUC包)实现的核心,大部分JUC下的类内部的提供了一个实现了AQS的子类来提供辅助。
AQS还继承了一个AbstractOwnableSynchronizer类,它是AQS的实现的重要基础。
AQS实现主要是依赖一个先进先出(FIFO)的等待队列和一个volatile标记的int型state状态标记。
AQS的FIFO等待队列,是基于双向链表实现的,是典型的CLH队列改造而来
首先来看一下AQS类的一下成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//等待队列头节点
private transient volatile Node head;

//等待队列的尾节点
private transient volatile Node tail;
/**
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks. We instead use them for blocking synchronizers, but
* use the same basic tactic of holding some of the control
* information about a thread in the predecessor of its node. A
* "status" field in each node keeps track of whether a thread
* should block. A node is signalled when its predecessor
* releases. Each node of the queue otherwise serves as a
* specific-notification-style monitor holding a single waiting
* thread. The status field does NOT control whether threads are
* granted locks etc though. A thread may try to acquire if it is
* first in the queue. But being first does not guarantee success;
* it only gives the right to contend. So the currently released
* contender thread may need to rewait.
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
**/
//静态内部类,是实现等待队列的核心类,从上面官方解析也可以看出是一个双向链表实现的一个FIFO队列,该类主要是封装线程和维护线程的一些状态标记
//后面会单独分析该内部类
static final class Node{省略...}

//同步状态标记,只有该状态等于0的时候才表示可以当前是空闲的没有被占用的,此时可以换一个一个节点的线程来占用
private volatile int state;
//省略....
}

下面看看node类是如何实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
java复制代码    static final class Node {
/**共享标记模式的标记 */
static final Node SHARED = new Node();
/** 独占模式的标记节点 */
static final Node EXCLUSIVE = null;

/** waitStatus状态标记等于-1表示节点以及被取消*/
static final int CANCELLED = 1;
/** 表示后继节点处于等待状态,如果当前节点同步状态是否或者取消则被唤醒 */
static final int SIGNAL = -1;
/** waitStatus表示线程正在condition等待条件上 */
static final int CONDITION = -2;
/** 下一次的共享模式同步状态的获取将会无条件的传播 */
static final int PROPAGATE = -3;
/* waitStatus的初始值时0,使用CAS来修改节点的状态 */
volatile int waitStatus;

/**
* 前驱节点
*/
volatile Node prev;

/**
* 后继节点
*/
volatile Node next;

/**
* 入队等待的线程
*/
volatile Thread thread;

/**
* 链接到下一个节点的等待条件,或特殊的值SHARED,因为条件队列只有在独占模式时才能被访问,
* 所以我们只需要一个简单的连接队列在等待的时候保存节点,然后把它们转移到队列中重新获取
* 因为条件只能是独占性的,我们通过使用特殊的值来表示共享模式
*/
Node nextWaiter;

/**
*判断是否共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* 获取当前节点的前驱节点
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // 指点线程和模式的构造器
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // 指定线程和线程状态的构造器
this.waitStatus = waitStatus;
this.thread = thread;
}
}

AQS 的大体结构图:

在这里插入图片描述
下面以分析ReentrantLock为例分析分析独占模式下,整个获取同步状态成功以及失败的处理过程:

1
2
3
4
5
java复制代码    public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

该方法主要是用于获取同步状态,主要分为几大步,第一步先通过tryAcquire(arg)第一次尝试获取同步状态,该方法AQS并没有实现,而是在相应的子类中实现比如Reentrantlock内部为了实现公平锁和非公平锁分别实现了不同逻辑的tryAcquire,如果该方法尝试获取同步状态失败,就进行第二步进入addWaiter(Node.EXCLUSIVE),该方法主要是把当前获取不到同步状态的线程进行封装成功一个node节点然后进入等待队列中,第三步就是进入等待队列之后,还好重新通过自旋尝试获取同步状态,自旋到一定程度,发现还是没有获取到同步状态则通过LockSupport类进行把线程挂起操作。如果中间出现被中断则线程进行自我中断。
接着看addWaiter方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码    /**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//根据线程和模式,封装成一个node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果不是第一次入队
if (pred != null) {
//尾部添加该节点
node.prev = pred;
//然后通过cas设置为尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果第一次则进入enq方法进行创建
enq(node);
return node;
}

addWaiter方法主要是把无法获取到同步状态的线程封装成一个节点然后进入等待队列,如果是第一次入队,则擦黄金一个空节点作为哨兵节点用于唤醒后继节点的线程,如果不是第一次入队,则把当前线程的封装的节点从尾部添加,然后通过cas把该节点设置为尾节点。
下面来看看enq是如何实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码    private Node enq(final Node node) {
for (;;) {
//获取尾节点
Node t = tail;
if (t == null) { // 如果等于空则创建一个空的node节点,并通过cas设置为头节点
if (compareAndSetHead(new Node()))
同时让尾节点执行同
tail = head;
} else {
//创建完一个队列后,非空状态则进入当前else然后在尾部添加当前线程封装的节点,再通过cas把节点设置成尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

整个方法逻辑:首先通过自旋判断尾节点是否为空,如果为空则创建一个空节点,并且设置为头节点并且尾节点指向头节点,当再次循环的时候,此时尾节点已经不是空了,这时候把传入的node节点进行节点添加,并把设置为尾节点。

接着看acquireQueued在addwaiter方法添加线程进入等待队列是如何通过该方法重新尝试获取同步状态以及失败后是如何处理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码    final boolean acquireQueued(final Node node, int arg) {
//定义一个操作失败标记
boolean failed = true;
try {
//定义一个中断标记默认没不中断
boolean interrupted = false;
//下面是自旋
for (;;) {
//获取当前线程的前驱节点
final Node p = node.predecessor();
//判断是否是头节点,同时再次通过trayAcquire获取同步状态,如果是头节点并且获取同步状态成功(表明当前节点的线程已经被设置为同步状态的拥有者)
if (p == head && tryAcquire(arg)) {
setHead(node);//把当前传入进来的节点node设置为头节点,
p.next = null; // 断开与前驱节点的连接便于GC回收
failed = false;//表表示成功获取同步状态,返回中断标记
return interrupted;
}
//如果当前节点的前驱节点不是头节点,即当前节点不是第一个线程,则尝试判断是否应该把该线程挂起,
//如果适合挂起则调用parkAndCheckInterrupt进行挂起(实现挂起操作是利用LockSupport.park(this))
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果操作失败则放弃获取同步状态
if (failed)
cancelAcquire(node);
}
}

该方法主要逻辑:首先判断当前节点的前驱节点是否是头节点并且通过tryacquire方法再次获取同步状态如果成功则表明该线程是第一个线程则直接执行,并把该节点设置为新的头节点,并把前驱节点断开以便于gc回收,然后返回,第二种情况就是非第一个线程,在进入shouldParkAfterFailedAcquire方法判断是否适合挂起,如果适合则调用parkAndCheckInterrupt方法去执行 LockSupport.park(this);进行挂起操作。
接着看看shouldParkAfterFailedAcquire是如何判断是否应该挂起的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱节点的ws判断处于那种状态。
int ws = pred.waitStatus;
//如果是SIGNAL(-1)表示当前前驱节点释放后会唤醒当前节点node,则可以安全挂起
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 如果前驱节点的状态是>0表示已经被取消或者说放弃了,则跳过该前驱节点,通过选好当前节点的前驱节点一直
* 找到状态ws<0的第一个节点为止,然后把当前node节点连接在该节点的后继节点上
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 如果前驱节点是0或者propagate则表示前驱节点正在占用同步状态此时需要设置一个signal,不能挂起,
* 需要重新确定不能获取之后才可以被挂起
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//返回不能被挂起
return false;
}
//如果判断释放被挂起方法返回true则调用该方法进行挂起操作。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

判断释放适合挂起方法,分三步,第一步判断前驱节点释放状态之后会唤醒node节点,则可以安全的挂起,第二步,如果前驱节点已经放弃获取,则通循环获取第一个还没放弃获取的节点然后让node节点接上去。第三步,如果前驱节点正处于持有同步状态或者是共享模式,则需要设置前驱节点的ws状态标记为SIGNAL表示释放同步状态后唤醒该后承节点。

如果同步状态获取失败,挂起也失败了,则会执行取消获取方法cancelAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码    private void cancelAcquire(Node node) {
// 判断npe
if (node == null)
return;
把节点线程置为null
node.thread = null;

// 获取当前节点的前驱节点并且判断状态设置前驱节点
Node pred = node.prev;
//如果前驱节点ws>0表示已经放弃获取,则循环获取ws小于0的并连接起来
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// 获取前驱接i单独后继节点
Node predNext = pred.next;

// 把node节点的状态标记改完取消状态
node.waitStatus = Node.CANCELLED;

// 如果node是后继节点,则通过cas把前驱节点设置我为尾节点,同时把后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果前驱节点不是头节点,ws小于0以及通过cas设置前驱节点ws状态为signal成功,并且前驱节点线程不为空则
//则把前驱节点的后继节点设置为当前节点的后继节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {//如果是头节点则唤醒他的后继节点
unparkSuccessor(node);
}
//让node节点指向node方便GC
node.next = node; // help GC
}
}

接着看unparkSuccessor是如何唤醒后继节点的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码    private void unparkSuccessor(Node node) {
/*
* 获取node的ws,如果小于0则通过cas设置为0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* 获取当前节点的后继节点
*/
Node s = node.next;
//判断s,如果等于null并且ws>0放弃获取
if (s == null || s.waitStatus > 0) {
s = null;//置为null
//从尾节点开始循环获取前驱节点直到获取到最前面一个ws小于0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
然后如果s节点不为null,证明获取到了最前面一个ws小于0的节点,则直接唤醒该节点的线程。
if (s != null)
LockSupport.unpark(s.thread);
}

下面分析AQS中release()方法是如何释放同步状态并且唤醒线程来执行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码    public final boolean release(int arg) {
//tryrelease方法和tryacquire方法是一对的,该方法也是由子类来实现
if (tryRelease(arg)) {
Node h = head;//获取头节点
//判断释放不等于空,或者状态释放不等于0,如果都符合则通过unparkSuccessor方法唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//这里举例reentrantlock方法中非公平锁实现的tryRelease方法
protected final boolean tryRelease(int releases) {
//获取AQS的同步状态值,然后减去释放的数值
int c = getState() - releases;
//判断当前线程释放是同步状态拥有者,如果不是则抛出一个异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//标记释放失败
boolean free = false;
//如果c==0表明同步状态完全释放了(存在重入的情况同步状态可能大于1)
if (c == 0) {
free = true;//设置释放成功
setExclusiveOwnerThread(null);//把同步状态持有线程修改为null
}
//重新设置AQS状态(这里可能是0或者大余1(重入的情况))
setState(c);
return free;
}

到这里,独占模式同步状态的获取以及释放过程都分析完,其他的待续。。。。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%