Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
为什么需要Lock与Condtion
synchronized提供了便捷性的隐式获取锁释放锁机制(基于JVM机制),但同时降低了开发人员对于锁操控的灵活,Lock与Condition在Java代码层次再实现一个管程模型,提供了synchronized缺少的
- 灵活的条件队列
- 响应中断
- 支持超时
- 非阻塞地获取锁
等灵活特性。
互斥与 等待队列:Lock
Lock总体结构
可以看到Lock接口对外提供API,ReentrantLock是Lock接口基础的实现类。
在ReentrantLock中以内部类的形式实现Sync,对外暴露的接口实际上是对Sync内部类方法的包装。因此Sync是ReentrantLock的最底层结构。
实现细节
AQS队列同步器
Sync的父类为AbstractQueuedSynchronizer队列同步器,具有阻塞功能
实现一把具有阻塞和唤醒功能的锁的核心要素为:
- 需要一个state变量,标记锁的状态,对锁变量的操作需要保证线程安全,用CAS实现
- 需要记录当前哪个线程持有锁
- 需要底层支持对一个线程进行阻塞和唤醒
- 需要一个队列记录所有阻塞的线程,这个队列必须是线程安全的无锁队列,需要用CAS实现
state锁变量与exclusiveOwnerThread
可以看到AbstractQueuedSynchronizer
源码中,使用Integer类型,且带有volatile的变量作为state变量。
state可以大于1,此时表示充入。
1 | arduino复制代码/** |
在顶层抽象类
abstract class AbstractOwnableSynchronizer中定义
exclusiveOwnerThread变量记录了当前持有锁的线程。
1 | arduino复制代码 |
LockSupport
JVM底层提供Native函数Unsafe类中park/unpark实现阻塞和唤醒线程。
LockSupport类对park/unpark做了简单封装和拓展。特别地,unpark可以对某个线程进行精确唤醒,而notify无法指定具体线程。
每个使用LockSupport的线程都会与一个许可关联,如果该许可可用,并且可在进程中使用,则调用park()将会立即返回,否则可能阻塞。如果许可尚不可用,则可以调用 unpark 使其可用。但是注意许可不可重入,也就是说只能调用一次park()方法,否则会一直阻塞。 LockSupport定义了一系列以park开头的方法来阻塞当前线程,unpark(Thread thread)方法来唤醒一个被阻塞的线程。如下:
CLH同步队列
(Craig, Landin, and Hagersten) lock queue
源码如下,可以看出CLH为储存线程对象的双向链表。线程如果获取锁失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当锁释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next)
waitStatus的不同值表示:
- CANCELLED = 1:因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
- SIGNAL = -1:后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
- CONDITION = -2:节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
- PROPAGATE = -3:表示下一次共享式同步状态获取将会无条件地传播下去
1 | arduino复制代码static final class Node { |
同时对队列的操作接口都是用Unsafe类提供的native函数CAS实现:
addWaiter尝试快速添加
1 | ini复制代码private Node addWaiter(Node mode) { |
如果失败则调用enq,enq(Node node)方法中,AQS通过“死循环”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。
compareAndSetTail方法的底层是unsafe类:
1 | kotlin复制代码/** |
公平与非公平
条件队列:Condition
Condition是一种广义上的条件队列。他为线程提供了一种更为灵活的等待/通知模式,线程在调用await方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。
实现细节
每个Condition对象都包含着一个FIFO队列,该队列是Condition对象通知/等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该Condition对象上等待的线程。
Node里面包含了当前线程的引用。Node定义与AQS的CLH同步队列的节点使用的都是同一个类(AbstractQueuedSynchronized.Node静态内部类)。
Condition的队列结构比CLH同步队列的结构简单些,新增过程较为简单只需要将原尾节点的nextWaiter指向新增节点,然后更新lastWaiter即可。
主要接口如下:
await()
首先将当前线程新建一个节点同时加入到条件队列中,然后释放当前线程持有的同步状态。然后则是不断检测该节点代表的线程释放出现在CLH同步队列中(收到signal信号之后就会在AQS队列中检测到),如果不存在则一直挂起,否则参与竞争同步状态。
1 | scss复制代码public final void await() throws InterruptedException { |
signal()
1 | java复制代码public final void signal() { |
整个通知的流程如下:
- 判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件。
- 如果线程已经获取了锁,则将唤醒条件队列的首节点
- 唤醒首节点是先将条件队列中的头节点移出,然后调用AQS的enq(Node node)方法将其安全地移到CLH同步队列中
- 最后判断如果该节点的同步状态是否为Cancel,或者修改状态为Signal失败时,则直接调用LockSupport唤醒该节点的线程。
ReentrantLock调用链
可以看出 ReentrantLock中主要接口都是对sync即AbstractQueuedSynchronizer
ReentrantLock中 Sync引用可以通过构造参数选择FairSync或NonfairSync,即公平与否。
1 | csharp复制代码 public void lock() { sync.acquire(1); } |
acquire
加锁主要通过sync.acquire(1)。
1 | scss复制代码public final void acquire(int arg) { |
tryAcquire
其中
tryAcquire函数是虚方法,实际实现在FairSync和NonfairSync中,且各有不同。
主要功能是检查锁变量state是否无锁并CAS尝试加锁,或持有锁的线程是否为当前线程(可重入)
公平和非公平的主要区别在于是否直接尝试加锁,公平加锁先判断自己是否是队列中的第一个节点。
Lock和tryAcquire中的加锁策略也是FairSync和NonfairSync唯二的不同
非公平Lock会尝试直接CAS抢锁(甚至不想进入等待队列),失败再进入acquire
1 | java复制代码protected boolean tryAcquire(int arg) { |
acquireQueued和shouldParkAfterFailedAcquire
tryAcquire失败后会调用 acquireQueued函数则是对AQS队列进行维护,死循环判断是否为队列中的头节点,再进行加锁。
如果不符合上述条件,线程并不会立马进行阻塞,而是检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,该方法主要靠前驱节点判断当前线程是否应该被阻塞:
- 如果当前线程的前驱节点状态为SINNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞
- 如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false
- 如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false
如果返回true会调用parkAndCheckInterrupt()阻塞并检查是否应该中断。
Lock包中提供lockInterruptibly->acquireInterruptibly->doAcquireInterruptibly->throw new InterruptedException() 该调用链响应中断,实现与原调用链类似,只有doAcquireInterruptibly与acquireQueue不同。
1 | java复制代码final boolean acquireQueued(final Node node, int arg) { |
release
1 | java复制代码public final boolean release(int arg) { |
tryRelease
1 | java复制代码protected final boolean tryRelease(int var1) { |
ReentrantReadWriteLock
本文转载自: 掘金