这是我参与11月更文挑战的第20天,活动详情查看:2021最后一次更文挑战
本文主要讲BlockingQueue
BlockingQueue
1 | java复制代码public interface BlockingQueue<E> extends Queue<E> { |
add,offer, put都是插入, add和offer无阻塞,但是在队列满时add会抛出异常,offer不会。put会阻塞,抛出中断异常。
remove非阻塞, poll和toke都会阻塞, 并抛出中断异常。
ArrayBlockingQueue
1 | java复制代码public ArrayBlockingQueue(int capacity) { |
数组实现的环型队列,在构造函数中必须传入数组容量,分公平和非公平
1 | java复制代码 |
- take
1 | java复制代码public E take() throws InterruptedException { |
1 | java复制代码private E dequeue() { |
- put
1 | java复制代码public void put(E e) throws InterruptedException { |
1 | java复制代码private void enqueue(E x) { |
LinkedBlockingQueue
基于单向链表的阻塞队列,队头和队尾是两个指针分开操作的, 使用了2把锁和2个条件。同时有一个AtomicInteger的原子变量记录count数。 构造函数中可以指定总容量, 默认Integer.MAX_VALUE
1 | java复制代码public LinkedBlockingQueue() { |
1 | java复制代码private final int capacity; |
- take
1 | java复制代码public E take() throws InterruptedException { |
- put
1 | java复制代码public void put(E e) throws InterruptedException { |
LinkedBlockingQueue和ArrayBlockingQueue的差异:
- 为了提供并发度, LinkedBlockingQueue分别控制队头队尾操作, put和put之间, take和take之间是互斥的,put和take之间不互斥, 但是count变量必须是原子的
- 因为各拿了一个锁, 当需要调用对方的signal时,必须加上对方的锁,比如signalNotEmpty
1 | java复制代码private void signalNotEmpty() { |
PriorityBlockingQueue
按照元素优先级从小到大出队列,元素要可以比较大小实现Comparable接口
1 | java复制代码//数组实现二叉小根堆 |
如果不指定大小,内部会设置11, 当超过11会自动扩容。
- put
1 | java复制代码public void put(E e) { |
- take
1 | java复制代码public E take() throws InterruptedException { |
1 | java复制代码private E dequeue() { |
和ArrayBlockingQueue机制类似, 主要区别在于使用数组实现了二叉堆,并且没有notFull条件, 能执行自动扩容
DelayQueue
延迟队列, 按照时间从小到大出队的PriorityQueue。放入DelayQueue的元素必须实现Delayed接口。
1 | java复制代码public interface Delayed extends Comparable<Delayed> { |
如果getDelay的返回值小于或等于0,说明元素到期。Delayed接口实现Comparable接口, 基于getDelay的返回值比较元素的大小。
1 | java复制代码//一把锁和一个非空条件 |
- take
1 | java复制代码public E take() throws InterruptedException { |
只有在队列为空的时候才阻塞,如果堆顶元素的延迟时间没到,也会阻塞。使用Thread leader记录等待堆顶元素的第一个线程。
- put
1 | java复制代码public void put(E e) { |
SynchronousQueue
本身没有容量,先调用put,线程会阻塞, 知道另一个线程调用了take,两个线程同时解锁。反之亦然。
1 | java复制代码public SynchronousQueue() { |
公平先进先出,不公平先进后出
- put
1 | java复制代码public void put(E e) throws InterruptedException { |
- take
1 | java复制代码public E take() throws InterruptedException { |
核心是transfer接口。
1 | java复制代码abstract static class Transferer<E> { |
根据e, 如果不是null,判定是消费者,否则是生产者。
如果是公平-队列模式
1 | java复制代码E transfer(E e, boolean timed, long nanos) { |
TransferQueue基于单向链表实现的队列,初始head和tail指向空节点
如果是非公平-栈模式
1 | java复制代码 E transfer(E e, boolean timed, long nanos) { |
本文转载自: 掘金