并发编程(二)

这是我参与11月更文挑战的第20天,活动详情查看:2021最后一次更文挑战

本文主要讲BlockingQueue

BlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public interface BlockingQueue<E> extends Queue<E> {

boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//剩余容量, 无界队列的时候是Integer.Max
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
//批量从队列中取出元素,无阻塞
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}

add,offer, put都是插入, add和offer无阻塞,但是在队列满时add会抛出异常,offer不会。put会阻塞,抛出中断异常。

remove非阻塞, poll和toke都会阻塞, 并抛出中断异常。

ArrayBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
java复制代码public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

数组实现的环型队列,在构造函数中必须传入数组容量,分公平和非公平

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码
/** The queued items */
final Object[] items;

//队头指针
int takeIndex;

//队尾指针
int putIndex;

//当前队列中元素个数
int count;

//锁和条件
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
  • take
1
2
3
4
5
6
7
8
9
10
11
java复制代码public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
  • put
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
java复制代码private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}

LinkedBlockingQueue

基于单向链表的阻塞队列,队头和队尾是两个指针分开操作的, 使用了2把锁和2个条件。同时有一个AtomicInteger的原子变量记录count数。 构造函数中可以指定总容量, 默认Integer.MAX_VALUE

1
2
3
4
5
6
7
8
java复制代码public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码private final int capacity;
//原子变量
private final AtomicInteger count = new AtomicInteger();
//单向链表头
transient Node<E> head;
//单向链表尾
private transient Node<E> last;
//两把锁 两个条件
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
  • take
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal(); //如果还有元素通知其他take线程
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
  • put
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();//通知其他put线程
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

LinkedBlockingQueue和ArrayBlockingQueue的差异:

  • 为了提供并发度, LinkedBlockingQueue分别控制队头队尾操作, put和put之间, take和take之间是互斥的,put和take之间不互斥, 但是count变量必须是原子的
  • 因为各拿了一个锁, 当需要调用对方的signal时,必须加上对方的锁,比如signalNotEmpty
1
2
3
4
5
6
7
8
9
java复制代码private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

PriorityBlockingQueue

按照元素优先级从小到大出队列,元素要可以比较大小实现Comparable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码//数组实现二叉小根堆
private transient Object[] queue;
private transient int size;
private transient Comparator<? super E> comparator;
//锁和非空条件
private final ReentrantLock lock;
private final Condition notEmpty;
private transient volatile int allocationSpinLock;
private PriorityQueue<E> q;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}

如果不指定大小,内部会设置11, 当超过11会自动扩容。

  • put
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap); //扩容
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);//自带的comparable
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
  • take
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0]; //永远是0
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
//调整二叉堆
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}

和ArrayBlockingQueue机制类似, 主要区别在于使用数组实现了二叉堆,并且没有notFull条件, 能执行自动扩容

DelayQueue

延迟队列, 按照时间从小到大出队的PriorityQueue。放入DelayQueue的元素必须实现Delayed接口。

1
2
3
java复制代码public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

如果getDelay的返回值小于或等于0,说明元素到期。Delayed接口实现Comparable接口, 基于getDelay的返回值比较元素的大小。

1
2
3
4
5
java复制代码//一把锁和一个非空条件
private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
//优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
  • take
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek(); //取出堆顶元素
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null) //如果其他线程在等着,无限期等待。
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

只有在队列为空的时候才阻塞,如果堆顶元素的延迟时间没到,也会阻塞。使用Thread leader记录等待堆顶元素的第一个线程。

  • put
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

SynchronousQueue

本身没有容量,先调用put,线程会阻塞, 知道另一个线程调用了take,两个线程同时解锁。反之亦然。

1
2
3
4
5
6
7
java复制代码public SynchronousQueue() {
this(false);
}

public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

公平先进先出,不公平先进后出

  • put
1
2
3
4
5
6
7
java复制代码public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
  • take
1
2
3
4
5
6
7
java复制代码public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}

核心是transfer接口。

1
2
3
java复制代码abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}

根据e, 如果不是null,判定是消费者,否则是生产者。

如果是公平-队列模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
java复制代码E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);

for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin

//队列为空或者当前线程和队列中元素是同一个模式
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData); //新建节点
if (!t.casNext(null, s)) // failed to link in
continue;

advanceTail(t, s); // swing tail and wait 后移tail进入阻塞状态
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}

if (!s.isOffList()) { // not already unlinked 确定处于队列中的第一个元素
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;

} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read

Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}

advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}

TransferQueue基于单向链表实现的队列,初始head和tail指向空节点

如果是非公平-栈模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码 E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;

for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill 待匹配
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller 待匹配
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%