ZooKeeper分布式锁原理

原理

  • ZooKeeper通过临时节点实现加锁,解锁,重入等操作。
  • 临时节点续期
    • ZooKeeper的节点是通过session心跳来续期的,比如客户端1创建了一个节点, 那么客户端1会和ZooKeeper服务器创建一个Session,通过这个Session的心跳来维持连接。如果ZooKeeper服务器长时间没收到这个Session的心跳,就认为这个Session过期了,也会把对应的节点删除。简单来说就是:当客户端宕机后,临时节点会随之消亡。****
    • 锁类型:公平锁,顺序抢占。来一个请求新建一个节点名称:node_01,node_02,node_03,01抢到锁后,02等待,01释放后,02抢锁,以此类推。
    • 到期处理:删除临时节点

代码

  • 上锁的入口是 acquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码public boolean acquire(long time, TimeUnit unit) throws Exception {
return internalLock(time, unit);
}

// org.apache.curator.framework.recipes.locks.InterProcessMutex#internalLock
private boolean internalLock(long time, TimeUnit unit) throws Exception {
// 获取当前线程
Thread currentThread = Thread.currentThread();
// 尝试加锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
// 加锁成功的话就放到threadData里
if ( lockPath != null ) {
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}

// 尝试加锁
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
try {
// 创建这个锁
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 这个方法这里先不关心,是多个client抢锁时互斥阻塞等待的代码
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e ) {
//...
}
return null;
}


// 创建这个锁
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
return client
.create()
.creatingParentContainersIfNeeded()
.withProtection()
// 临时节点
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(path, lockNodeBytes);
}
  • 再来看看这个LockData里面是什么东西
1
2
3
4
5
6
7
8
arduino复制代码private static class LockData {
// 锁所属的线程
final Thread owningThread;
// 临时顺序节点的路径
final String lockPath;
// 重入次数 默认为1
final AtomicInteger lockCount = new AtomicInteger(1);
}
  • 互斥逻辑
    • 查找到所有临时顺序节点,然后按照编号从小到大排序
    • 判断当前客户端是不是 children 里的第一个,不是的话就代表不能加锁,那就计算出上一个节点编号,然后开启一个 Watcher 监听这个节点(刚计算出来的上一个节点)
    • wait() 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
try {
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
// 获取path下对应临时顺序节点,并按编号从小到大排序。底层采取的java.util.Comparator#compare来排序的
List<String> children = getSortedChildren();
// 获取当前线程创建的临时顺序节点名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// 这个方法底层就是判断当前节点编号是不是children里的第一个,是的话就能抢锁,不是的话就计算出上一个节点序号是谁,然后下面监听这个节点。(因为按照编号排序了,所以可以得出上一个节点是谁)
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
// 如果当前客户端就是持有锁的客户端,直接返回true
if (predicateResults.getsTheLock() ) {
haveTheLock = true;
} else {
// 如果没抢到锁,则监听上一个节点
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
// 监听器,watcher下面分析
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
// 重点在这了,wait(),等待。也就是说没抢到锁的话就开启监听器然后wait()等待。
wait();
} catch ( KeeperException.NoNodeException e ) {}
}
}
}
}
return haveTheLock;
}
  • 解锁逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
csharp复制代码public void release() throws Exception {
// 获取当前线程
Thread currentThread = Thread.currentThread();
// 获取当前线程的锁对象,从ConcurrentHashMap里获取
LockData lockData = threadData.get(currentThread);
// 锁重入次数-1,然后看看是不是大于0,如果大于0那代表有锁重入,直接-1,不删除锁节点,因为没释放完全。
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 ) {
return;
}
try {
// 如果锁重入次数为0了,那就释放锁
internals.releaseLock(lockData.lockPath);
}
finally {
// 释放完后从ConcurrentHashMap里移除
threadData.remove(currentThread);
}
}

整体流程

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%