Java 多线程 阻塞队列 没啥好说的

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

如题 , 阻塞队列真没啥好说的 , 工具类 , 了解功能感觉就可以了

一 . 阻塞队列简述

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

  • 在队列为空时,获取元素的线程会等待队列变为非空。
  • 当队列满时,存储元素的线程会等待队列可用。

阻塞队列常用于生产者和消费者的场景:

  • 生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程
  • 阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

通用概念

  • implements Queue : 基本上都是Queue的实现类 ,即实现了 Queue 的方法
  • 可以通过构造方法初始化容量和排序
  • 构造方法可以传入整个集合

队列类型

  • ArrayBlockingQueue :一个由 数组 结构组成的 有界 阻塞队列。
  • LinkedBlockingQueue :一个由 链表 结构组成的 无界 阻塞队列。
  • PriorityBlockingQueue :一个 支持优先级排序无界 阻塞队列。
  • DelayQueue:一个使用 优先级队列 实现的 无界 阻塞队列。
  • SynchronousQueue:一个 不存储元素 的阻塞队列。
  • LinkedTransferQueue:一个由 链表 结构组成的 无界 阻塞队列。
  • LinkedBlockingDeque:一个由 链表 结构组成的双向阻塞队列。
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码
// 按照类型分类
• 无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque
• 普通阻塞队列:基于数组的ArrayBlockingQueue,基于链表的LinkedBlockingQueue和LinkedBlockingDeque
• 优先级阻塞队列:PriorityBlockingQueue
• 延时阻塞队列:DelayQueue
• 其他阻塞队列:SynchronousQueue和LinkedTransferQueue

处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove() poll() take() poll(time, unit)
检查方法 element() peek() 不可用 不可用

性能对比

  • 1、ArrayBlockingQueue 性能优于LinkedBlockingQueue,但是LinkedBlockingQueue是无界的。
  • 2、ArrayBlockingQueue 和 LinkedBlockingQueue 的 poll方法总是比offer方法快,并发越高,差距越大
  • 3、ArrayBlockingQueue 和 LinkedBlockingQueue 的 性能远高于PriorityBlockingQueue,显然优先队列在比较优先级上的操作上耗费太多
  • 4、PriorityBlockingQueue的 offer方法与 poll方法的性能差距很小,基本维持在近似1:1
线程数 20 50 100 200 500 1000
LinkedBlockingQueue 15,0 31,15 32,16 63,32 203,47 563,110
ArrayBlockingQueue 15,0 16,15 31,15 47,16 125,47 364,68
PriorityBlockingQueue 78,78 172,188 360,422 813,969 3094,2641 6547,5453

二. ArrayBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
JAVA复制代码> 一个由数组实现的有界阻塞队列。该队列采用 FIFO 的原则对元素进行排序添加的 
> ArrayBlockingQueue 为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了
> ArrayBlockingQueue 支持对等待的生产者线程和使用者线程进行排序的可选公平策略
- 但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。
- 公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”。

// 构造器 :
MC- ArrayBlockingQueue(int capacity)
MC- ArrayBlockingQueue(int capacity, boolean fair)

// 抽象类和接口
I- BlockingQueue<E> : 提供了在多线程环境下的出列、入列操作
?- 内部使用可重入锁 ReentrantLock + Condition 来完成多线程环境的并发操作

// 变量
• items 变量,一个定长数组,维护 ArrayBlockingQueue 的元素。
• takeIndex 变量,int ,为 ArrayBlockingQueue 队首位置。
• putIndex 变量,int ,ArrayBlockingQueue 队尾位置。
• count 变量,元素个数。
• lock 变量,ReentrantLock ,ArrayBlockingQueue 出列入列都必须获取该锁,两个步骤共用一个锁。
• notEmpty 变量,非空,即出列条件。
• notFull 变量,未满,即入列条件。

// 入队
M- add(E e) 方法 : 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true , 满了抛出异常
M- offer(E e) 方法 : 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true , 满了返回false
M- offer(E e, long timeout, TimeUnit unit) 方法 : 将指定的元素插入此队列的尾部 , 已满在设定时间内等待
M- put(E e) 方法 : 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间
M- enqueue :
- 正常添加元素 , 到达队尾的时候重定向到队头
- 总数 + 1
- 通知阻塞线程

// 出列
M- poll() 方法:获取并移除此队列的头,如果此队列为空,则返回 null 。
M- poll(long timeout, TimeUnit unit) 方法:获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
M- take() 方法:获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
M- remove(Object o) 方法:从此队列中移除指定元素的单个实例(如果存在)。

// 核心总结 :
M- offer : 通过 ReentrantLock 上锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- finally -> lock.unlock();

// 关键点 :
1 创建后,容量将无法更改
2 尝试将元素放入满队列将导致操作阻塞
3 尝试从空队列中取出一个元素]也会类似地被阻塞
4 支持可选的公平性策略

三 . DelayQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码支持延时获取元素的无界阻塞队列。里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素
- 如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。
- 也就是说只有在延迟期到时才能够从队列中取元素。

// 作用 :
• 缓存:清掉缓存中超时的缓存数据
• 任务超时处理

// 关键 :
1. 可重入锁ReentrantLock
2. 用于阻塞和通知的Condition对象
3. 根据Delay时间排序的优先级队列:PriorityQueue
4. 用于优化阻塞通知的线程元素leader

// 结构 :
E- AbstractQueue
I- BlockingQueue
M- offer() : 往PriorityQueue中添加元素
- 向 PriorityQueue中插入元素
- 判断当前元素是否为对首元素,如果是的话则设置leader=null , 唤醒所有线程
M- take()
- 获取队首 --- q.peek
IF- 队首为空 , 阻塞 ,等待off 唤醒
ELSE-
获取队首的超时时间 , 已过期则出对
- 如果存在其他线程操作 ,阻塞 , 不存在其他线程 , 独占
- 超时阻塞 --- available.awaitNanos(delay);
- 唤醒阻塞线程

// 使用方式 :
// Step 1 : new 一个
DelayQueue queue = new DelayQueue();

// Step 2 : 加东西
queue.offer(createUserDelayQueueTO());

四 . SynchronousQueue

  1. SynchronousQueue没有容量。
  • 与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然
  1. 因为没有容量,所以对应 peek, contains, clear, isEmpty … 等方法其实是无效的。
  • 例如clear是不执行任何操作的,contains始终返回false,peek始终返回null。
  1. SynchronousQueue分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。
  2. 若使用 TransferQueue, 则队列中永远会存在一个 dummy node(这点后面详细阐述)。

SynchronousQueue非常适合做交换工作,生产者的线程和消费者的线程同步以传递某些信息、事件或者任务

1
2
3
4
5
java复制代码C- SynchronousQueue
E- AbstractQueue
I- BlockingQueue
C- TransferQueue
?- 实现公平性策略的核心类,其节点为QNode

五 . LinkedBlockingDeque

  • 一个有链表组成的双向阻塞队列,与前面的阻塞队列相比它支持从两端插入和移出元素
    • 以first结尾的表示从对头操作,以last结尾的表示从对尾操作。
  • 支持FIFO、FILO两种操作方式

LinkedBlockingQueue是一个阻塞队列

  • 内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。
    • 基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。
    • 头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据
      • 当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。
      • 这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
java复制代码

// 简介
是先进先出队列FIFO。
采用ReentrantLock保证线程安全

// 操作结果
增加 : 队列满 >
put -> 一直阻塞
add -> 抛出异常
offer -> 返回false

删除 : 队列为空
remove -> NoSuchElementException
poll -> 返回false
take -> 阻塞



// 源码分析
LinkedBlockingQueue
C- static class Node<E> : 核心静态内部类 , 表示一个节点
|- E item : 节点原始
|- Node<E> next : 下一节点
F- int capacity : 容量界限
F- AtomicInteger count : 当前元素个数
F- Node<E> head :头节点
F- Node<E> last : 尾节点
F- ReentrantLock takeLock : take,poll等获取锁
F- Condition notEmpty : 等待任务的等待队列
F- ReentrantLock putLock : put,offer等插入锁
F- Condition notFull : 等待插入的等待队列
MC- LinkedBlockingQueue() : 最大数量
MC- LinkedBlockingQueue(int capacity) : 指定数量
MC- LinkedBlockingQueue(Collection<? extends E> c) : 指定集合
M- signalNotEmpty : 表示等待take。put/offer调用,否则通常不会锁定takeLock
|- 获取 tackLock : this.takeLock
|- 锁定takeLock -> takeLock.lock();
|- 唤醒take 线程等待队列 -> notEmpty.signal();
|- 释放锁 -> takeLock.unlock();
M- signalNotFull : 表示等待put,take/poll 调用
|- 获取putLock : this.putLock;
|- 锁定putLock -> putLock.lock();
|- 唤醒插入线程等待队列 -> notFull.signal();
|- 释放锁
M- enqueue : 在队列尾部插入
|- last = last.next = node;
M- E dequeue():移除队列头
|- 保留头指针
|- 获取当前链表的第一个元素
|- 头指针指向第一个元素
|- 获取第一个元素的值并且移除第一个
|- 返回第一个元素的值
M- fullyLock : 锁定putLock和takeLock
|- putLock.lock();
|- takeLock.lock();
M- fullyUnlock : 先解锁takeLock,再解锁putLock
|- putLock.unlock();
M- offer: 将给定的元素设置到队列中,如果设置成功返回true
|- 非空判断 , 获取计数器
|- 判断队列是否已满 -> 返回 Boolean
|- 新建节点
|- 获取插入锁 , 并且锁定
|- 队列未满 -> 插入 -> 计数
|- 如果未满 ,继续唤醒插入线程
|- 解锁
|- 如果对了为空 ,获取线程锁阻塞
M- offer(E e, long timeout, TimeUnit unit) :给定的时间内设置到队列中
M- put(E e) : 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞 , 直到队列中有多余的空间
|- 核心1 : putLock.lockInterruptibly();
-> 设置前加锁
|- 核心2 : notFull.await();
-> 队列满时等待
M- take() : 从队列中获取值,如果队列中没有值
M- peek() : 非阻塞的获取队列中的第一个元素,不出队列
M- poll() : 非阻塞的获取队列中的值,未获取到返回null。
M- poll(long timeout, TimeUnit unit) :在给定的时间里,从队列中获取值
M- remove(Object o):从队列中移除指定的值。将两把锁都锁定。
M- clear():清空队列。
M- drainTo(Collection c):将队列中值,全部移除,并发设置到给定的集合中。

六 . LinkedTransferQueue

  • LinkedTransferQueue是一个由链表组成的的无界阻塞队列
  • 它是ConcurrentLinkedQueue、SynchronousQueue (公平模式下)、无界的LinkedBlockingQueues等的超集。

与其他BlockingQueue相比,他多实现了一个接口TransferQueue, 该接口是对BlockingQueue的一种补充,多了tryTranfer()和transfer()两类方法:

  • tranfer():若当前存在一个正在等待获取的消费者线程,即立刻移交之。
    • 否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素
  • tryTranfer(): 若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e;
    • 若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作

七 . PriorityBlockingQueue

1
2
3
4
java复制代码- PriorityBlockingQueue是支持优先级的无界队列。
- 默认情况下采用自然顺序排序,当然也可以通过自定义Comparator来指定元素的排序顺序。
- PriorityBlockingQueue内部采用二叉堆的实现方式,整个处理过程并不是特别复杂。
- 添加操作则是不断“上冒”,而删除操作则是不断“下掉”。

八 . ArrayBlockingQueue 与 LinkedBlockingQueue 的区别

Queue 阻塞与否 是否有界 线程安全保障 适用场景 注意事项
ArrayBlockingQueue 阻塞 有界 一把全局锁 生产消费模型,平衡两边处理速度 用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)
LinkedBlockingQueue 阻塞 可配置 存取采用 2 把锁 生产消费模型,平衡两边处理速度 无界的时候注意内存溢出问题,用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加 JVM 垃圾回收的负担。

九 . 双端队列

而 ArrayDeque、LinkedBlockingDeque 就是双端队列,类名以 Deque 结尾

正如阻塞队列适用于生产者消费者模式,双端队列同样适用与另一种模式,即工作密取。

  • 在生产者-消费者设计中,所有消费者共享一个工作队列,而在工作密取中,每个消费者都有各自的双端队列。
  • 如果一个消费者完成了自己双端队列中的全部工作,那么他就可以从其他消费者的双端队列末尾秘密的获取工作。
    • 具有更好的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。
  • 在大多数时候,他们都只是访问自己的双端队列,从而极大的减少了竞争
  • 当工作者线程需要访问另一个队列时,它会从队列的尾部而不是头部获取工作,因此进一步降低了队列上的竞争

十 . 队列对象

1
2
3
4
5
6
7
java复制代码> 阻塞队列 : 阻塞队列有普通的先进先出队列,

> 包括基于数组的ArrayBlockingQueue
> 基于链表的LinkedBlockingQueue/LinkedBlockingDeque
> 基于堆的优先级阻塞队列PriorityBlockingQueue
> 可用于定时任务的延时阻塞队列DelayQueue
> 用于特殊场景的阻塞队列SynchronousQueue和LinkedTransferQueue

十一 . CopyOnWriteArrayList

CopyOnWrite容器即写时复制的容器。
当我们往容器添加元素的时候,先将当前容器进行Copy,复制出一个新的容器,
然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
java复制代码
优缺点
|- 优点:
读操作性能很高,比较适用于读多写少的并发场景。
Java的list在遍历时,若中途有别的线程对list容器进行修改,则会抛出ConcurrentModificationException异常。
而CopyOnWriteArrayList由于其"读写分离"的思想,遍历和修改操作分别作用在不同的list容器
?- 所以在使用迭代器进行遍历时候,也就不会抛出ConcurrentModificationException异常。

|- 缺点:
内存占用问题,执行写操作时会发生数组拷贝
无法保证实时性,Vector对于读写操作均加锁同步,可以保证读和写的强一致性。
而CopyOnWriteArrayList由于其实现策略的原因,写和读分别作用在新老不同容器上
?- 在写操作执行过程中,读不会阻塞但读取到的却是老容器的数据。

|- 使用场景 :
CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景。

CopyOnWriteArrayList
F- ReentrantLock lock = new ReentrantLock() --> 重入锁
F- volatile Object[] array; --> 只能通过 getArray/setArray 访问的数组
M- Object[] getArray() --> 获取数组,非私有方法以便于CopyOnWriteArraySet类的访问
M- setArray(Object[] a) --> 设置数组
M- CopyOnWriteArrayList -- 创建一个空数组
M- CopyOnWriteArrayList(Collection<? extends E> c)
?- 创建一个包含指定集合的数组
B- 如果c的类类型为CopyOnWriteArrayList
|- 直接获取其数组
E- 如果不是
|- 通过 toArray 转数组
|- 如果c.toArray返回的不是 Object[]类型,则通过数组拷贝
|- 设置数组 : setArray(elements);
M- CopyOnWriteArrayList(E[] toCopyIn) : 创建包含给定数组副本的列表
M- size() : 获取数量
M- isEmpty() : 判断列表元素是否为空
M- eq(Object o1, Object o2) : 判断o1 o2是否相等
M- indexOf(Object o, Object[] elements,int index, int fence)
B- 为null , for 循环迭代第一个 null
E- 不为 null ,for 循环 eq
M- lastIndexOf :索引倒叙
M- contains : IndexOf 判断
M- clone : 浅拷贝
|- 重置锁定
|- 返回clone 属性
M- toArray
M- get : 获取原数组中元素
M- set:用指定的元素替换列表中指定位置的元素
|- 获取当前锁并且锁定
|- 获取元素数组
|- 获取老的值
B- 如果老的值和给定值不相等
|- 原数组拷贝 , 将新数组中的索引位置修改为新值
|- 将原数组替换为新数组
E- 否则
|- setArray(elements);
|- 返回老的值
M- add(E e) : 将指定的元素追加到此列表的末尾
|- 获取重入锁 ,锁定
|- 获取原数组
|- 原数组拷贝 并增加一个空位
|- 将指定元素增加到新数组新增的空位中
|- 新数组替换原数组
M- remove :
|- 获取锁并且锁定
|- 获取原数组
|- 获取要删除的元素值 , 获取要移动的值
B- 如果为0,则删除的是最后一个元素
-> setArray(Arrays.copyOf(elements, len - 1));
E- 否则 复制拷贝
|- 新建数组
|- 将原数组中,索引index之前的所有数据,拷贝到新数组中
|- 将元素组,索引index+1 之后的numMoved个元素,复制到新数组,索引index之后
|- 替换原数组
|- 返回老的值 ,最后释放锁
C- COWSubList : 内部视图类

更新记录

  • 20210727 : 修改格局

致谢

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码
芋道源码 : http://www.iocoder.cn/JUC/sike/aqs-3/

https://mp.weixin.qq.com/s?__biz=MzIxOTI1NTk5Nw==&mid=2650047475&idx=1&sn=4349ee6ac5e882c536238ed1237e5ba2&chksm=8fde2621b8a9af37df7d0c0a7ef3178d0253abf1e76a682134fce2f2218c93337c7de57835b7&scene=21#wechat_redirect

https://blog.csdn.net/javazejian/article/details/70768369

死磕系列 , 我的多线程导师
http://cmsblogs.com/?cat=151

// JVM 源码 C
https://www.jianshu.com/p/a604f1a9f875

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%