深入浅出SynchronousQueue队列(二) 前言 内

这是我参与11月更文挑战的第6天,活动详情查看:2021最后一次更文挑战

前言

本文继续讲解SynchronousQueue队列的公平策略下的内部实现,不废话,直接看源码。

内部实现类

公平策略下的实现类TransferQueue。

TransferQueue

基于Transferer实现公平策略下的实现类TransferQueue,公平策略需要先进先出,这里queue也表明其结构特点,内部通过QNode类实现链表的队列形态,通过CAS操作更新链表元素。

有两种状态需要注意:

取消操作(被外部中断或者超时):item == this;

出队操作(已成功匹配,找到互补操作):next == this;

构造方法

头尾节点初始化操作

1
2
3
4
5
6
ini复制代码        TransferQueue() {
// 初始化一个值为null的QNode,初始化头尾节点
QNode h = new QNode(null, false);
head = h;
tail = h;
}

QNode

QNode即为队列的链表实现,其中的变量属性isData保存的是每次的操作动作而不仅仅是入队的值,入队操作以QNode保存,出队操作也是如此,变量则通过CAS操作更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
typescript复制代码        static final class QNode {
// next指向链表下一个节点
volatile QNode next;
// 队列元素的值
volatile Object item;
// 保存等待的线程
volatile Thread waiter;
// 是否有数据,队列元素的类型标识,入队时有数据值为true,出队时无数据值为false
final boolean isData;

QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// cas操作更新next
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// cas操作更新item
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

// cas操作取消,把此时的QNode的item赋值为当前的QNode
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}

// 判断是否取消成功,然后tryCancel操作后进行判断
boolean isCancelled() {
return item == this;
}

// 判断当前节点是否已处于离队状态,这里看到是将节点next指向自己
boolean isOffList() {
return next == this;
}

// 获取item和next的偏移量,操作CAS使用
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

变量部分

队头队尾元素引用设置,需要注意的是cleanMe节点的含义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码        // 队头
transient volatile QNode head;
// 队尾
transient volatile QNode tail;

// 标记节点,清理链表尾部节点时,不直接删除尾部节点,而是将尾节点的前驱节点next指向设置为cleanMe
// 防止此时向尾部插入节点的线程失败导致出现数据问题
transient volatile QNode cleanMe;

// 偏移量获取
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final long cleanMeOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
cleanMeOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("cleanMe"));
} catch (Exception e) {
throw new Error(e);
}
}

CAS操作

CAS更新变量操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typescript复制代码        // 尝试将nh更新为新的队头
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
// 原头节点next指向更新为自己,使得h为离队状态,isOffList方法为true
h.next = h; // forget old next
}

// 尝试更新队尾节点
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}

// 尝试更新cleanMe节点
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}

transfer

入队和出队操作,都是用一个方法,即实现接口中的transfer方法来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
scss复制代码    @SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {

QNode s = null; // constructed/reused as needed
// e为null时相当于出队操作isData为false,入队操作为true
boolean isData = (e != null);

for (;;) {
// 获取最新的尾节点和头节点
QNode t = tail;
QNode h = head;
// 头,尾节点为空,未初始化,则循环spin
if (t == null || h == null)
continue;
// 首尾节点相同则为空队列或尾节点类型和新操作的类型相同,都是入队操作或出队操作
// 一入队和一出队直接进入else匹配上不会再保存在链表中
if (h == t || t.isData == isData) {
QNode tn = t.next;
// 尾节点已经被其他线程更新修改,则重新循环判断
if (t != tail)
continue;
// 如果tn不为空,说明其他线程已经添加了节点,尝试更新尾节点,重新循环判断
if (tn != null) {
advanceTail(t, tn);
continue;
}
// 设置超时时间并且超时时间小于等于0则直接返回null
if (timed && nanos <= 0)
return null;
// s为null则初始化节点s
if (s == null)
s = new QNode(e, isData);
// 尝试将s添加到尾节点的next上,失败则重新循环
if (!t.casNext(null, s))
continue;
// 尝试更新尾节点,尾节点此时为s
advanceTail(t, s);
// 通过awaitFulfill方法自旋阻塞找到匹配操作的节点item
Object x = awaitFulfill(s, e, timed, nanos);
// 表示当前线程已经中断或者超时,在awaitFulfill超时或者中断时更新s.item指向自己
if (x == s) {
// 清理节点,取消本次操作
clean(t, s);
return null;
}
// 判断s是否已从队列移除
if (!s.isOffList()) {
// 未被从队列清除则尝试更新队头
advanceHead(t, s);
// 当前线程为出队操作时,s节点取消操作
if (x != null)
s.item = s;
// 清除等待线程
s.waiter = null;
}
return (x != null) ? (E)x : e;

// 与上次队列操作非同一类型操作
} else {
QNode m = h.next;
// 头节点或尾节点被其他线程更新或者为空队列则循环操作
if (t != tail || m == null || h != head)
continue;
// 头节点的下一个节点对应的item
Object x = m.item;
// 同类型,被取消操作或更新item失败则更新头节点指向重新操作
if (isData == (x != null) || // m already fulfilled 相同类型操作说明m已经被其他线程操作匹配
x == m || // m cancelled 取消操作标识
// CAS更新item为匹配上的操作值,比如当前是出队操作,m为入队操作x为入队的值,那么此时要替换为出队值null
// CAS操作失败
!m.casItem(x, e)) {
// 删除匹配上的头节点,更新头节点
advanceHead(h, m);
continue;
}
// 更新头节点
advanceHead(h, m);
// 释放m的等待线程锁使得m操作结束
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}

阻塞等待唤醒(awaitFulfill)

在transfer相同类型操作时被调用,正常情况下(不算超时和中断)阻塞线程直到与之匹配的操作到来再继续执行。例如此时是入队操作,上次也是入队操作,在未设置超时时,这里可能需要自旋或阻塞等待一个出队操作来唤醒本次入队操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
scss复制代码        // 自旋或阻塞直到超时或被唤醒匹配上节点
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 获取超时时间点
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前线程
Thread w = Thread.currentThread();
// 仅在head.next==s时才使用spins(自旋次数),同时判断是否设置了超时
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 判断当前线程是否中断,外部中断操作
if (w.isInterrupted())
// 尝试将s节点的item设置为s自己
s.tryCancel(e);
Object x = s.item;
// s的item已经改变,直接返回x
// 没改变的情况下即没有匹配的操作,有匹配上的item即x将被改变,取消时如上也会改变
if (x != e)
return x;
// 线程超时将s节点的item设置为s自己
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 需要自旋时循环
if (spins > 0)
--spins;
// 设置s的等待线程
else if (s.waiter == null)
s.waiter = w;
// 未设置超时,直接阻塞
else if (!timed)
LockSupport.park(this);
// 设置超时时间阻塞
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}

clean方法

整体处理过程如下:

如果删除的不是尾节点,则用pred.casNext(s, s.next) 方式来进行清理;

如果删除的是队尾节点,若cleanMe为空,则将其前继节点pred更新为cleanMe, 为下次删除做准备;

如果cleanMe不为空,则根据cleanMe删除上次需要删除的节点, 然后将cleanMe置空, 如果此次pred非之前cleanMe,则cleanMe置为pred,为下一次删除操作做准备。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
ini复制代码    // 中断取消操作将pred节点代替s节点,修改前后节点之间的关联
void clean(QNode pred, QNode s) {
// 清理前先将等待线程置空
s.waiter = null;
// pred与s的前后关系
while (pred.next == s) {
QNode h = head;
QNode hn = h.next;
// hn非空且被取消操作,更新头节点为hn
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
// 尾节点
QNode t = tail;
// 空队列返回
if (t == h)
return;
// 尾节点下一个
QNode tn = t.next;
// 尾节点已被其他线程更新
if (t != tail)
continue;
// 非空 更新尾节点
if (tn != null) {
advanceTail(t, tn);
continue;
}
// s不是尾节点
if (s != t) {
// s的下一个节点
QNode sn = s.next;
// 更新pred节点后一个节点为s的下一个节点,相当于删除s在链表中的关系
if (sn == s || pred.casNext(s, sn))
return;
}
// 执行到这里说明s为尾节点则需要处理cleanMe节点
QNode dp = cleanMe;
if (dp != null) {
QNode d = dp.next;
QNode dn;
if (d == null || // 清除节点为null,相当于已经清理了
d == dp || // dp节点处于离队状态
!d.isCancelled() || // 清除节点被取消
(d != t && // 清除节点非尾节点
(dn = d.next) != null && // 清除节点下一节点非null
dn != d && // 清除节点下一节点在队列中
dp.casNext(d, dn))) // 清理d与其他节点的关系
casCleanMe(dp, null); // 清理完毕设置为null
// 相当于s为需要清理的节点,上边已经清理过了,不需要再次清理
if (dp == pred)
return;
// 更新cleanMe为pred,为下次清理准备
} else if (casCleanMe(null, pred))
return;
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%