「这是我参与11月更文挑战的第7天,活动详情查看:2021最后一次更文挑战」
MySql 实现分布式锁
使用 Mysql
实现分布式锁在实际开发中的应用场景比较少,一般只有在性能要求不是很高,不想要引入别的组件的时候才会使用。它最大的特点就是理解起来比较容易。
它主要有以下三种实现方式:
- 基于表记录实现
- 借助
mysql
的悲观锁实现
基于表记录实现
先创建一张表:
1 | sql复制代码CREATE TABLE `mysql_lock` ( |
申请锁操作就是在表中插入一条对应的记录:
1 | sql复制代码INSERT INTO mysql_lock (resource, description) VALUES (1, '申请资源:1'); |
释放锁操作就是删除插入的那一条表记录即可:
1 | sql复制代码DELETE FROM mysql_lock WHERE resource = 1; |
实现原理说明:
我们在创建表的时候给 resource
加了唯一约束,它是不能重复插入的,这样就实现了互斥性。前面我们讲分布式锁的时候详细说明了分布式锁应该拥有的特性,上面的例子没有实现其他的特性,下面我们来说一下具体的优化方案:
- 对于超时时间:可以写一个定时清理过期资源的程序
- 对于可重入性,独占性:可以添加一个字段来记录线程的编号,如果是同一线程允许再次获取锁,每次删除数据库记录的时候校验线程编号,保证独占性,自己的锁只能自己解开。
- 对于
mysql
的可靠性:可以设置主备或者集群来防止单点故障。 - 这边还有一个小问题,就是每次去获取锁的时候,线程不是阻塞的只去插入向库里面插入一次,失败了也不重试,这个都需要在代码逻辑中自己实现了。
借助 mysql 悲观锁实现
为了提高分布式锁的效率,可以使用查询语句,借助 for update
关键字来给被查询的记录添加行锁中悲观锁,这样别的线程就没有办法对这条记录进行任何操作,从而达到保护共享资源的目的。
使用行锁需要注意的点
mysql
默认是会自动提交事务的,应该手动禁止一下:SET AUTOCOMMIT = 0;
- 行锁是建立在索引的基础上的,如果查询时候不是走的索引的话,行锁会升级为表锁进行全表扫描。
我们继续使用上面那张表来进行说明:
- 申请锁操作:
SELECT * FROM mysql_lock WHERE id = 1 FOR UPDATE;
只要可以查的出来就是申请成功的,没有获取到的会被阻塞,阻塞的超时时可以通过设置mysql
的innodb_lock_wait_timeout
来进行设置。注意WHERE id = 1
这个查询条件是走索引的。 - 释放锁操作:
COMMIT;
事务提交之后别的线程就可以查的这条记录了。
说明:这边简单提供了一下
mysql
实现分布式锁的两种思路,实际开发不建议使用,要使用的话建议使用第二种,像上面说的分布式锁应该实现的特性使用数据库的话,好多需要开发者手动去实现,不太友好。
Zookeeper 实现分布式锁
在使用 zookeeper
实现分布式锁之前我们先来了解一点前置知识:
zk 的节点类型
- 持久化节点:客户端断开连接,节点还在
- 持久化顺序节点:在持久化节点的基础上保证有序性
- 临时节点:客户端断开连接,节点就删除了
- 临时顺序节点:在临时节点的基础上保证有序性
实现 zk 分布式锁的思路
- 利用
zk
同级节点的唯一特性可以实现锁的互斥性 - 利用
zk
临时节点的特性可以避免正在占用锁的线程没释放锁就因为一些不可抗力因素宕机导致其他线程无法获取锁最终死锁的问题。 - 利用
zk
的 节点的watcher
事件可以轻松实现通知其他线程争抢锁的功能。 - 利用
zk
顺序节点的特性,可以实现公平锁,按照申请锁的顺序来唤醒阻塞的线程,防止羊群效应。
羊群效应:当并发量巨大的时候,只有一个线程会获得锁,很多线程就会阻塞,当获得锁的那个线程释放锁之后,其他所有在等待的线程就会一起争抢锁,可能会因为瞬间启动的线程过多而导致服务器挂到的情况,这就是所谓的羊群效应。
自定义 zk 分布式锁
代码示例:
1 | java复制代码package com.aha.lock.zk; |
代码解析:
implements Lock
: 实现Lock
遵循JUC
提供的规范。- 使用
zkClient.createEphemeralSequential()
创建临时顺序节点,避免羊群效应和拥有锁线程意外挂掉造成死锁的问题。 - 阻塞线程这边使用的是
CountDownLatch
。当有线程争抢到锁之后,其他的线程会被CountDownLatch
的await
方法给阻塞,当被阻塞线程的前置节点被删除,就说明当前节点应该被唤醒,因为顺序节点是有序的,所以只唤醒当前节点就可以了。这边唤醒方法使用的是zkClient.subscribeDataChanges(beforeNodePath.get(), zkDataListener);
在检测到前置节点被删除之后使用CountDownLatch
的countDown
方法,当countDown()
变成 0 之后就会唤醒线程。 nodePath
和beforeNodePath
应该是线程私有变量,这样才能保证,每个线程记录自己的nodePath
和beforeNodePath
。- 具体的实现细节可以参考代码中的注释。
步骤 3 问题说明:
- 当
countDown()
变成 0 之后就会唤醒所有的线程,这边应该实现成唤醒自己下一个节点- 当
countDwon()
变成 0 之后需要重新new
这个对象,不然await
方法是没有办法重新阻塞线程的。
测试自定义 zk
分布式锁:
1 | java复制代码package com.aha.lock.zk; |
使用 curator 的 zk 分布式锁
curator 提供的几种分布式锁方案
InterProcessMutex
:分布式可重入排它锁InterProcessSemaphoreMutex
:分布式排它锁InterProcessReadWriteLock
:分布式读写锁
InterProcessMutex 使用实例
配置 curatorFramework
客户端
1 | yaml复制代码zookeeper: |
1 | java复制代码package com.aha.config; |
1 | java复制代码package com.aha.client; |
模拟 50 个线程争抢锁:
1 | java复制代码package com.aha.lock.service; |
1 | java复制代码package com.aha.lock.controller; |
本文转载自: 掘金