深入浅出SynchronousQueue队列(三) 前言 T

这是我参与11月更文挑战的第7天,活动详情查看:2021最后一次更文挑战

前言

本文继续讲解SynchronousQueue队列的非公平策略,SynchronousQueue通过两个内部类实现了公平策略和非公平策略的无缓存阻塞队列,每种操作都需要对应的互补操作同时进行才能完成,例如,入队操作必然对应出队操作,在不涉及超时和中断的情况下,必须等待另一个线程进行出队操作,相互匹配才能执行,否则就阻塞等待。

TransferStack

不同于公平策略下的操作,只有一种状态需要注意:

取消操作:match == this;

SNode

SNode基于栈的节点实现,变量与QNode不同,其中match在两个操作匹配上之后可以通过这个变量找到其匹配的节点,节点类型mode在使用上也有所不同,其他参数可参考TransferQueue的QNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
java复制代码    static final class SNode {
// next指向栈中下一个元素
volatile SNode next;
// 和当前节点匹配的节点
volatile SNode match;
// 等待线程
volatile Thread waiter;
// 节点内容
Object item;
// 节点类型
int mode;

SNode(Object item) {
this.item = item;
}

// CAS更新next字段
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}


// 尝试s节点与当前节点进行匹配,成功则唤醒等待线程继续执行
boolean tryMatch(SNode s) {
// match == null 表明当前节点未被其他节点匹配上
// cas更新match字段为s
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
// 当前节点等待线程未被其他线程操作
if (w != null) { // waiters need at most one unpark
// 唤醒等待线程同时将waiter置空
waiter = null;
LockSupport.unpark(w);
}
return true;
}
// 判断当前节点是否已与s进行匹配
return match == s;
}

// 尝试取消操作 将match置为this
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}

// 判断tryCancel是否操作成功
boolean isCancelled() {
return match == this;
}

// 获取match和next在对象中的偏移量
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

变量部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
php复制代码    // 数据请求操作 如take操作 代表未被匹配上的消费者
static final int REQUEST = 0;
// 数据保存操作 如put操作 代表未被匹配上的生产者
static final int DATA = 1;
// 有节点与其匹配,相当于已经有互补操作,使用上不是直接使用,可参考后面的源码部分
static final int FULFILLING = 2;

// 栈顶指针
volatile SNode head;

private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferStack.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
} catch (Exception e) {
throw new Error(e);
}
}

CAS操作

CAS更新栈顶指针,比较简单

1
2
3
4
typescript复制代码boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}

判断街道是否匹配

判断m对应的节点是否已经被匹配,和FULFILLING进行位与操作,判断m对应的栈节点处于FULFILLING状态,即已经匹配上了,在transfer里与栈顶节点非相同操作时会入栈一个节点,此节点的mode和普通节点不一样,会通过FULFILLING|mode操作更新mode,故这里最低位来区分是保存数据还是请求数据,高位来区分此节点是否是已经找到匹配节点的节点。

1
2
arduino复制代码/** Returns true if m has fulfilling bit set. */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

snode节点

创建或重置SNode节点,如果为空则创建新的SNode节点,不为空则重置节点的mode和next属性。

1
2
3
4
5
6
ini复制代码    static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}

transfer

和公平模式下的TransferQueue.transfer入队和出队操作类似,统一使用一个方法,即实现接口中的transfer方法来完成。不同点在于3个条件分支:

栈为空或栈顶元素操作类型和当前操作类型相同,入栈阻塞等待;

栈顶非匹配互补节点(匹配互补节点:已经和其他节点匹配上了,mode值高位为1),进行匹配操作;

帮助已经匹配的栈顶节点操作;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
scss复制代码    @SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {

SNode s = null; // constructed/reused as needed
// 节点类型,是put还是take操作,即是保存数据还是请求数据
int mode = (e == null) ? REQUEST : DATA;

for (;;) {
// 获取栈顶指针
SNode h = head;
// 栈为空
// 或栈顶节点和当前操作节点为相同操作
if (h == null || h.mode == mode) {
// 设置超时时间且超时时间小于等于0
if (timed && nanos <= 0) {
if (h != null && h.isCancelled())
// 栈顶非空且栈顶节点为取消操作状态
// 出栈,尝试将栈顶节点更新
casHead(h, h.next);
else
return null;
// 创建节点,尝试更新栈顶节点
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 通过awaitFulfill方法自旋阻塞找到匹配操作的节点
SNode m = awaitFulfill(s, timed, nanos);
// 取消或超时
if (m == s) {
// 清理节点,取消本次操作
clean(s);
return null;
}
// 栈顶节点更新为s的next元素
// 出栈栈顶2个节点元素,帮助更新栈顶元素为第三个节点元素即为s.next
if ((h = head) != null && h.next == s)
casHead(h, s.next);
// 判断下,如果当前是请求数据,即take操作,返回m.item值,即返回匹配节点的item
// 当前是保存数据,即put操作,返回s.item
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 与栈顶节点非相同操作,栈顶元素非匹配互补节点
} else if (!isFulfilling(h.mode)) {
// 栈顶元素处于取消操作状态
if (h.isCancelled())
// 尝试出栈更新栈顶元素
casHead(h, h.next);
// 入栈新创建的节点,同时FULFILLING|mode 位与操作
// s的mode为10或者11
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 进入这里表明s已经为栈顶节点,而且s.next是其匹配节点
// 循环直到匹配上
for (;;) {
SNode m = s.next;
// 空则可能被其他线程匹配上了则更新头节点为null,重新进入外层循环
if (m == null) {
casHead(s, null);
// 这里s节点需置空,因为比较特殊,mode不同于普通节点
// 重新循环时根据情况重新创建节点
s = null;
break;
}
//
SNode mn = m.next;
// 尝试m与s进行匹配,实际上是更新m节点的match为s,同时唤醒m的等待线程
if (m.tryMatch(s)) {
// 成功则出栈栈顶两个元素,即更新栈顶节点
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// 未匹配上,可能被其他节点匹配上了,尝试更新s的next指针,再继续匹配
s.casNext(m, mn);
}
}
// 不满足上边两个条件,即此时栈顶为匹配节点,还未匹配完成,这里帮忙完成匹配出栈操作
// 注意,这里只是帮助更新head和next并不做其他操作,参考上面方法的处理
} else {
SNode m = h.next;
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}

阻塞等待唤醒(awaitFulfill)

与TransferQueue.awaitFulfill类似,在当前操作同之前操作相同时,未设置操作时间同时未被外部线程中断则需阻塞等待匹配节点唤醒当前阻塞的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
scss复制代码    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 获取超时时间点
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前线程
Thread w = Thread.currentThread();
// shouldSpin判断是否需要进行自旋
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 判断当前线程是否中断,外部中断操作,相当于取消本次操作
if (w.isInterrupted())
// 尝试将s节点的match设置为s自己
s.tryCancel();
SNode m = s.match;
// match非空则表示当前节点已经被匹配match匹配上
if (m != null)
return m;
// 超时配置处理
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 自旋spins
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
// 设置等待线程
else if (s.waiter == null)
s.waiter = w;
// 未设置超时,直接阻塞
else if (!timed)
LockSupport.park(this);
// 设置超时时间阻塞
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}

是否需要自旋操作(shouldSpin)

判断是否需要自旋操作,满足下列情况之一即需要自旋:

栈顶节点等于s节点;

栈顶节点为空;

栈顶节点为已和其他节点匹配的节点;

1
2
3
4
ini复制代码    boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}

清理操作(clean)

清理操作,清理栈节点s的关联关系,同时会清理整个栈节点的取消操作节点,无cleanMe节点,比TransferQueue.clean操作要简单许多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ini复制代码   
void clean(SNode s) {
// item,waiter 置空
s.item = null;
s.waiter = null;

// s的下一个节点处于取消操作状态,则past指向past的下一个节点
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;

// 头节点被取消操作则进行将next节点更新为头节点
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);

// 头节点处理完会把栈节点中每个节点检查一遍,更新前后节点的关系
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}

总结

SynchronousQueue的非公平策略的内部实现就是这样,要注意的是对于mode部分状态的处理,通过高位和低位分别区分是否已匹配和是什么类型的操作(生产者还是消费者)。其实需要记住的是其操作必须是成双成对的,在无超时无中断的情况下,一个线程执行入队操作,必然需要另一个线程执行出队操作,此时两操作互相匹配,同时完成操作。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%