这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战
Semaphore
信号量, 提供了资源数量的并发控制, 存在公平锁和非公平锁。Semaphore是共享锁,当初始资源为1时,退化成排它锁, 内部有Sync,FairSync,NonfairSync
- acquire
1 | java复制代码public void acquire() throws InterruptedException { |
- release
1 | java复制代码public void release() { |
CountDownLatch
基于AQS, 没有公平和非公平的区别
- await
1 | java复制代码public void await() throws InterruptedException { |
- countDown
1 | java复制代码public void countDown() { |
CyclicBarrier
基于ReetrantLock和Condition实现
1 | java复制代码public class CyclicBarrier { |
- await
1 | java复制代码 public int await() throws InterruptedException, BrokenBarrierException { |
CyclicBarrier可以重用的,会响应中断, 如果没有达到parties, 此时收到中断信号,阻塞的线程也会被唤醒,ount被设置成初始值。
Exchanger
用于线程之间交换数据, 使用CAS和park/unpark,内部有两个内部类,Participant和Node
- Node
1 | java复制代码//伪共享和缓存行填充 |
- Participant
1 | java复制代码 static final class Participant extends ThreadLocal<Node> { |
每个对象调用exchange方法交换数据,先创建Node对象。这个对象是对线程的包装, 里面有线程要交换的数据,对方线程交换来的数据,线程本身。一个Node只能交换一个线程的数据,并行的交换数据用Node数组。
- exchange
1 | java复制代码 public V exchange(V x) throws InterruptedException { |
单个交换使用slotExchange, 多个交换使用arenaExchange
1 | java复制代码private final Object slotExchange(Object item, boolean timed, long ns) { |
1 | java复制代码private final Object arenaExchange(Object item, boolean timed, long ns) { |
Phaser
可以替代CyclicBarrier和CountDownLatch
对应CountDownLatch的是awaitAdvance和arrive
对应CyclicBarrier的是awaitAdvance和arriveAndAwaitAdvance
动态调整线程个数
运行期间动态调整线程个数
1 | java复制代码register() |
层次
多个Phaser组成树状结构
1 | java复制代码private final Phaser parent; |
在Phaser内部记录自己的父节点,没有记录子节点
State变量
phaser没有基于AQS
64位的State变量分为四部分: 最高位0表示未同步完成,1表示同步完成, 初始为0。接下来的31位表示轮数, 接下来的16位标识总线程数,最后的16位标识未到达线程数。
- 初始化
1 | java复制代码public Phaser(Phaser parent, int parties) { |
当parties=0时, state被赋予1,当不等于0时,把phase左移32位, 把parties左移16位, parties作为最低的16位,三个值做或操作赋值给state
阻塞使用的是Treiber stack的数据结构, Treiber Stack是一个无锁的栈,单向链表, 出栈入栈都在链表头部,只要有一个head指针。
1 | java复制代码static final class QNode implements ForkJoinPool.ManagedBlocker { |
1 | java复制代码private final AtomicReference<QNode> evenQ; |
使用两个链表,减少并发冲突,当phase为奇数, 阻塞线程在oddQ,否则在evenQ里。
1 | java复制代码private void releaseWaiters(int phase) { |
- arrive
1 | java复制代码public int arrive() { |
1 | java复制代码private int doArrive(int adjust) { |
- awaitAdvance
1 | java复制代码public int awaitAdvance(int phase) { |
- awaitAdvanceInterruptibly
1 | java复制代码public int awaitAdvanceInterruptibly(int phase) |
1 | java复制代码private int internalAwaitAdvance(int phase, QNode node) { |
这里调用了ForkJoinPool.ManagedBlock方法,目的是把node对应的线程阻塞
Atomic
AtomicInteger/AtomicLong
使用CAS达到原子的效果
1 | java复制代码public final int getAndIncrement() { |
AtomicBoolean/AtomicReference
让比较和设置值是原子操作
1 | java复制代码public final boolean compareAndSet(boolean expect, boolean update) { |
在Unsafe只提供了三种类型的CAS操作: int,long,Object
1 | java复制代码public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); |
AtomicStampedReference/AtomicMarkableReference
ABA 问题的解决
通过版本号
1 | java复制代码public boolean compareAndSet(V expectedReference, |
AtomicMarkableReference和AtomicStampedReference类似,只是AtomicMarkableReference的Pair是Boolean类型, AtomicStampedReference是整型的累加
AtomicIntegerFiledUpdater
类似的有AtomicLongFiledUpdater, AtomicReferenceFiledUpdater.
应用场景: 无法修改类,实现对成员变量的原子操作
AtomicIntegerFiledUpdater是抽象类,通过newUpdater获取实例。表示类的某个成员,而不是对象的某个成员。
1 | java复制代码 public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass, String fieldName) { |
1 | java复制代码public int getAndIncrement(T obj) { |
限制条件: 成员变量必须是volatile的int类型(不能是Integer包装类)
AtomicIntegerArray
AtomicLongArray, AtomicReferenceArray
使用和原理与AtomicIntegerFiledUpdater一样
LongAdder
JDK8提供了LongAdder, LongAccumulator, DoubleAdder和DoubleAccumulator,都继承了Striped64
原理
把一个Long型分成base和多个cell, 获取值的时候对base和多个cell求sum
1 | java复制代码public long sum() { |
最终一致性
类似ConcurrentHashMap, 只有最终一致性,没有强一致性。
- increment/decrement
1 | java复制代码public void increment() { |
1 | java复制代码final void longAccumulate(long x, LongBinaryOperator fn, |
LongAccumulator可以自定义操作符。
1 | java复制代码@FunctionalInterface |
本文转载自: 掘金