(详细可用)分布式锁实现 Java + redis (一)

(详细可用)分布式锁实现 Java + redis (一)

分布式环境下很多系统需要使用分布式锁 目的不多说了

redis实现分布式锁

1
2
3
4
5
6
7
复制代码官网描述:
https://redis.io/topics/benchmarks
在 Pitfalls and misconceptions 小标题下
截取

Redis is, mostly, a single-threaded server from the POV of commands execution (actually modern versions of Redis use threads for different things)
意思是redis 在执行命令的时候是单线程执行的,所以能保证命令执行的原子性

所以可以使用redis 实现分布式锁

另外:

1
2
3
4
复制代码看了网上许多的基于 redis 的分布式锁实现 , 大多都是用
1.setNx加锁 + DEL命令解锁
2. 使线程休眠一段时间(轮训判断状态)
3. expire 键过期(貌似要解决setnx 命令后,线程出问题使得系统死锁)

优化解决方法 1. setNx 命令 进行加锁

1
2
3
4
复制代码不多说,就是 set if not exist 
如果不存在就set 成功,返回值1
如果已经存在,set 失败 , 返回0
官网: https://redis.io/commands/setnx
  1. 无需使用线程休眠 + 轮训 key 的状态来判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码通过 redis 的 Redis Keyspace Notifications
redis 通过设置键控件状态的监听通知客户端,也就是可以订阅(subscribe channels)
由于是通过 DEL mykey 进行释放锁操作,就可以监听 redis 的键DEL 事件

redis 配置
redis 配置文件下在 EVENT NOTIFICATION 区域
配置 notify-keyspace-events "AKE" 可以监听所有键事件(All Keys Events)
当然也可以具体订阅某个事件

redis 命令
>subscribe __keyevent@0__:del
实现订阅删除事件

可以通过执行:
> PUBLISH __keyevent@0__:del distributelocks
触发订阅事件
或者执行:
>DEL distributelocks (这里删除返回1 即删除成功才会触发)

官网:
https://redis.io/topics/notifications
https://redis.io/commands/del
----------
所以:
可以使用 redis 订阅键空间状态事件,当redis 调用DEL 释放锁, 实现通知其他线程获取锁

当然这个键空间通知跟zk的watch功能不是一样吗?

3.解决死锁

1
2
3
4
5
6
7
8
9
复制代码个人认为:
加个expire 过期时间是解决不了死锁问题的
我的原因是:
当你操作的是数据库数据时,你如果使用了分布式锁,又加了时间
1.如果结束前执行完成,是没有问题的
2.执行一半的情况下,键过期,导致其他(进程下的)线程,获取锁进入
当然这种情况可以拿锁后进行数据查询来判断是否一致

我的解决方式是,可以通过开启定时任务查看分布式锁的键,然后执行对应键(任务)的数据同步操作,或者是解锁操作

上代码

执行资源竞争操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
DistributedLocksService distributedLocksService = applicationContext.getBean(DistributedLocksService.class);
distributedLocksService.robResource("distributedLocksService" , Thread.currentThread());
}
});
t1.start();
ThreadHelper.sleep(400);
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
DistributedLocksService distributedLocksService = applicationContext.getBean(DistributedLocksService.class);
distributedLocksService.robResource("distributedLocksService" , Thread.currentThread());
}
});
t2.start();
ThreadHelper.sleep(400);
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
DistributedLocksService distributedLocksService = applicationContext.getBean(DistributedLocksService.class);
distributedLocksService.robResource("distributedLocksService" , Thread.currentThread());
}
});
t3.start();

DistributedLocksService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
复制代码
/**
* Created by zhuangjiesen on 2017/12/8.
*/
public class DistributedLocksService {
private DistributedLock distributedLock;
/*
*
* 抢占资源
* 涉及竞争
*
* */
public void robResource(String key , Object params ) {
Thread thread = (Thread)params;
try {
System.out.println("1. ------准备获取锁-------- : " + thread.getId());
distributedLock.lock(key);
// dosomething ...
System.out.println("2. ------成功获取锁-------- : " + thread.getId());
} finally {
distributedLock.unLock();
System.out.println("3. ------成功释放锁-------- : " + thread.getId());
}

}
public DistributedLock getDistributedLock() {
return distributedLock;
}
public void setDistributedLock(DistributedLock distributedLock) {
this.distributedLock = distributedLock;
}
}

DistributedLock.java 抽象分布式锁接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码
/**
*
* 分布式锁接口
* Created by zhuangjiesen on 2017/12/8.
*/
public interface DistributedLock {
public final static String DEFAULT_LOCK_KEY = "DEFAULT_LOCK_KEY_OF_DISTRIBUTEDLOCK";
public void lock(String key) ;
public boolean tryLock(String key) ;
public void unLock() ;
public void unLock(String key) ;
/*
* 用来执行分布式锁的一些问题,比如lock 之后,服务挂了,重启之后其他线程都拿不到锁了
* 为了解决这个情况,可以使用一个定时任务,检测锁状态
* */
public void scheduleTask();
}

redis 实现分布式锁 JedisDistributedLock.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
复制代码

/**
*
*
* redis 实现分布式锁
* Created by zhuangjiesen on 2017/12/8.
*/
public class JedisDistributedLock implements DistributedLock , ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
private ApplicationContext applicationContext;
private static ThreadLocal<String> lockKeys = new ThreadLocal<>();
private static ConcurrentHashMap<String , Object> disLocks = new ConcurrentHashMap<>();
private JedisPool jedisPool;

@Override
public void lock(String key) {
Jedis jedis = jedisPool.getResource();
Long count = jedis.setnx( key , "1");
if (count != null && count.longValue() > 0) {
System.out.println("--lock--1. 获取到分布式锁.......");
lockKeys.set(key);
} else {
System.out.println("--lock--2. not .......未获取到分布式锁.......");
Object lock = null;
if ((lock = disLocks.get(key)) == null) {
// double check
synchronized (disLocks) {
if ((lock = disLocks.get(key)) == null) {
lock = new Object();
disLocks.put(key , lock);
}
}
}
synchronized (lock) {
try {
//线程休眠
lock.wait();
System.out.println("-------------------------");
lock(key);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

@Override
public boolean tryLock(String key) {
Jedis jedis = jedisPool.getResource();
Long count = jedis.setnx( key , "1");
if (count != null && count.longValue() > 0) {
return true;
}
return false;
}

@Override
public void unLock() {
try {
String key = lockKeys.get();
unLock(key);
} finally {
lockKeys.remove();
}
}

@Override
public void unLock(String key) {
Jedis jedis = jedisPool.getResource();
Long del = jedis.del(key);
Object lock = null;
if (del != null && del.longValue() > 0 && (lock = disLocks.remove(key)) != null) {
//解锁
synchronized (lock) {
lock.notifyAll();
}
}
System.out.println("--unLock--3. key : " + key + " del : " + del);
}

@Override
public void scheduleTask() {
//执行轮训死锁问题
}

@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
this.applicationContext = context;
}


/*
* spring 容器加载完毕调用
*
* */
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() == null) {
jedisPool = applicationContext.getBean(JedisPool.class);
Jedis jedis = jedisPool.getResource();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
jedis.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
super.onMessage(channel, message);
Object lock = null;
System.out.println(" 1. channel : " + channel);
System.out.println(" 2. message : " + message);
if ("__keyevent@0__:del".equals(channel) && (lock = disLocks.remove(message)) != null) {
//解锁
synchronized (lock) {
lock.notifyAll();
}
}
}
} , "__keyevent@0__:del");
}
});
t1.start();
}
}
}

线程模拟执行时间 ThreadHelper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码
public class ThreadHelper {
/**
* 休眠模拟 rpc执行时间
* @param timeßß
*/
public static void sleep(long timeßß){
try {

Thread.currentThread().sleep(timeßß);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}

jedis 配置

spring-jedis.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码    <context:property-placeholder location="classpath:jedis.properties" />


<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig" >
<property name="maxTotal" value="50" />
<property name="minIdle" value="50" />
</bean>

<bean id="jedisPool" class="redis.clients.jedis.JedisPool" >
<constructor-arg index="0" ref="jedisPoolConfig" />
<constructor-arg index="1" value="${standardalone.host}" />
<constructor-arg index="2" value="${standardalone.port}" />
<constructor-arg index="3" value="${standardalone.timeout}" />
<constructor-arg index="4" value="${standardalone.password}" />
</bean>

jedis.properties

1
2
3
4
复制代码standardalone.host=192.168.130.130
standardalone.port=6379
standardalone.timeout=500
standardalone.password=redis

applicationContext.xml 中配置jedis 和 分布式锁的service

1
2
3
4
5
复制代码
<bean id="distributedLocksService" class="com.java.service.DistributedLocksService" >
<property name="distributedLock" ref="distributedLock"></property>
</bean>
<bean id="distributedLock" class="com.java.core.lock.impl.JedisDistributedLock" ></bean>

附上项目源码:
github.com/zhuangjiese…

目录:

1
2
3
4
5
6
7
复制代码处理类:
com.java.service.DistributedLocksService
分布式锁实现目录:
com.java.core.lock

main方法 DistributedLocksApp.java下
com.java.main.DistributedLocksApp.java

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%