基于 Redis 实现可重入分布式锁

分布式锁

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

SETNX key value

Set key to hold string value if key does not exist. In that case, it is equal to SET. When key already holds a value, no operation is performed. SETNX is short for “SET if Not eXists”.
Return value
Integer reply, specifically:

  • 1 if the key was set
  • 0 if the key was not set

Examples

1
2
3
4
5
6
7
redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
redis>

以上是官方文档,意思简单说就是:如果 key 不存在,则 set 成功,如果 key 已经存在,则 set 失败。根据 SETNX 的这个特性,我们可以使用它实现分布式锁。

必须考虑的问题

处理死锁

当一个客户端获取锁成功之后,假如它崩溃了导致它再也无法和 Redis 节点通信,那么它就会一直持有这个锁,导致其它客户端永远无法获得锁了,因此锁必须要有一个自动释放的时间。
通常我们会把获取锁的操作分成两个 Redis 命令:

1
2
3
4
redis> setnx LOCK 7978ff8a-170c-4422-ab17-6a5d846acd92
(integer) 1
redis> expire LOCK 30
(integer) 1

如果客户端在执行完 setnx LOCK 7978ff8a-170c-4422-ab17-6a5d846acd92 命令后由于某种原因,客户端宕机了,那么这时这把锁并没有过期时间,导致其它客户端永远无法获得锁了。
因此对于锁的过期时间设置不能分为两步操作,Spring Boot 的 StringRedisTemplate 并没提供原子性操作,一条命令设置 key、value、expire,Redis 官方提供的 Jedis 客户端中的 JedisCommands 接口就可以实现这个操作,如下:

1
2
3
4
5
6
7
private boolean setNX(String key, String value, long expire) {
String result = stringRedisTemplate.execute((RedisCallback<String>) connection -> {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
return commands.set(key, value, "NX", "PX", expire);
});
return "OK".equals(result);
}

锁被其他线程释放

如果不加任何处理即简单使用 SETNX 实现 Redis 分布式锁,就会遇到一个问题:如果线程 C1 获得锁,但由于业务处理时间过长,锁在线程 C1 还未处理完业务之前已经过期了,这时线程 C2 获得锁,在线程 C2 处理业务期间线程 C1 完成业务执行释放锁操作,但这时线程 C2 仍在处理业务线程 C1 释放了线程 C2 的锁,导致线程 C2 业务处理实际上没有锁提供保护机制;同理线程 C2 可能释放线程 C3 的锁,从而导致严重的问题。

因此每个线程释放锁的时候只能释放自己的锁,即锁必须要有一个拥有者的标记,并且也需要保证释放锁的原子性操作。

锁拥有着的标志我们可以用 UUID 在实现,将其在获取锁的时候作为 value 值 set 到 Redis中。释放锁的时候先判断锁对应的 UUID 是否与线程中的 UUID 相同,相同时才做删除操作。

从 Redis 2.6.0 版本开始,通过内置的 Lua 解释器,可以使用 EVAL 命令对 Lua 脚本进行求值,通过 Lua 脚本来达到释放锁的原子操作,脚本如下:

1
2
3
4
5
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end

Spring Boot 可以通过 StringRedisTemplate 执行以下代码执行 Lua 脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean deleteKey(List<String> keys, List<String> args) {
Object result = stringRedisTemplate.execute((RedisCallback<Object>) connection -> {
Object nativeConnection = connection.getNativeConnection();
// 单机模式
if (nativeConnection instanceof Jedis) {
return ((Jedis) nativeConnection).eval(LUA_UNLOCK_SCRIPT, keys, args);
}
// 集群模式
else if (nativeConnection instanceof JedisCluster) {
return ((JedisCluster) nativeConnection).eval(LUA_UNLOCK_SCRIPT, keys, args);
}
return 0L;
});
return result != null && Long.parseLong(result.toString()) > 0;
}

可重入锁

可重入锁指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,如果没有可重入锁的支持,在第二次尝试获得锁时将会进入死锁状态。
ThreadLocal 适用于每个线程需要自己独立的实例且该实例需要在多个方法中被使用,也即变量在线程间隔离而在方法或类间共享的场景。
在这里,我们可以利用在 Redis 保存的 value 值进行判断,获得锁后我们将 UUID 存入 ThreadLocal 中,同一线程再次尝试获取锁的时候将 ThreadLocal 中的 UUID 与 Redis 的 value 比较,如果相同则表示这把锁所以该线程,即实现可重入锁。

分布式锁实现

定义 Redis 分布式锁接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Redis 分布式锁接口
*
* @author Leaves
* @version 1.0.0
* @date 2018/8/8
*/
public interface IRedisDistributedLock {

boolean lock(String key);

boolean lock(String key, long waitMillis);

boolean lock(String key, long waitMillis, long sleepMillis);

boolean lock(String key, long expire, long waitMillis, long sleepMillis);

boolean lock(String key, long expire, long waitMillis, long sleepMillis, int retries);

boolean release(String key);
}

RedisDistributedLockImpl 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import com.lm.util.ApplicationContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCommands;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

/**
* Redis 分布式锁
*
* @author Leaves
* @version 1.0.0
* @date 2018/8/8
*/
@Component
public class RedisDistributedLockImpl implements IRedisDistributedLock {

private final Logger logger = LoggerFactory.getLogger(RedisDistributedLockImpl.class);

private StringRedisTemplate stringRedisTemplate = ApplicationContextUtil.getBean(StringRedisTemplate.class);

/**
* 锁前缀
*/
private static final String ROOT_KEY = "LOCK_";

/**
* 过期时间,ms
*/
private static final long EXPIRE = 15000L;

/**
* 最长等待时间,ms
*/
private static final long WAIT_MILLIS = 10000L;

/**
* 重试等待时间,ms
*/
private static final long SLEEP_MILLIS = 500L;

/**
* 最多重试次数
*/
private static final int RETRIES = Integer.MAX_VALUE;

/**
* 使用 ThreadLocal 存储 key 的 value 值,防止同步问题
*/
private ThreadLocal<String> threadLocal = new ThreadLocal<>();

/**
* 原子操作释放锁 Lua 脚本
*/
private static final String LUA_UNLOCK_SCRIPT = "if redis.call(\"get\", KEYS[1]) == ARGV[1] " +
"then " +
"return redis.call(\"del\", KEYS[1]) " +
"else " +
"return 0 " +
"end";

@Override
public boolean lock(String key) {
return setLock(key, EXPIRE, WAIT_MILLIS, SLEEP_MILLIS, RETRIES);
}

@Override
public boolean lock(String key, long waitMillis) {
return setLock(key, EXPIRE, waitMillis, SLEEP_MILLIS, RETRIES);
}

@Override
public boolean lock(String key, long waitMillis, long sleepMillis) {
return setLock(key, EXPIRE, waitMillis, SLEEP_MILLIS, RETRIES);
}

@Override
public boolean lock(String key, long expire, long waitMillis, long sleepMillis) {
return setLock(key, expire, waitMillis, sleepMillis, RETRIES);
}

@Override
public boolean lock(String key, long expire, long waitMillis, long sleepMillis, int retries) {
return setLock(key, expire, waitMillis, sleepMillis, retries);
}

/**
* 获取 Redis 锁
*
* @param key 锁名称
* @param expire 锁过期时间
* @param retries 最多重试次数
* @param sleepMillis 重试等待时间
* @param waitMillis 最长等待时间
* @return
*/
private boolean setLock(String key, long expire, long waitMillis, long sleepMillis, int retries) {
//检查 key 是否为空
if (key == null || "".equals(key)) {
return false;
}

try {
long startTime = System.currentTimeMillis();
key = ROOT_KEY + key;

//可重入锁判断
String v = threadLocal.get();
if (v != null && isReentrantLock(key, v)) {
return true;
}

//获取锁
String value = UUID.randomUUID().toString();
while (!this.setNX(key, value, expire)) {
//超过最大重试次数后获取锁失败
if (retries-- < 1) {
return false;
}

//等待下一次尝试
Thread.sleep(sleepMillis);

//超过最长等待时间后获取锁失败
if (System.currentTimeMillis() - startTime > waitMillis) {
return false;
}
}

threadLocal.set(value);
return true;

} catch (Exception e) {
logger.error("redis lock get: {}", e.getMessage());
return false;
}
}

/**
* SET if Not eXists
*/
private boolean setNX(String key, String value, long expire) {
String result = stringRedisTemplate.execute((RedisCallback<String>) connection -> {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
return commands.set(key, value, "NX", "PX", expire);
});

return "OK".equals(result);
}

/**
* 可重入锁判断
*/
private boolean isReentrantLock(String key, String v) {
ValueOperations kvValueOperations = stringRedisTemplate.opsForValue();
String value = (String) kvValueOperations.get(key);
if (value == null) {
return false;
}

return v.equals(value);
}

/**
* 释放锁
*/
@Override
public boolean release(String key) {
if (key == null || "".equals(key)) {
return false;
}

List<String> keys = new ArrayList<>(1);
keys.add(ROOT_KEY + key);
List<String> args = new ArrayList<>(1);
args.add(threadLocal.get());
threadLocal.remove();

try {
return deleteKey(keys, args);
} catch (Exception e) {
logger.error("redis lock release: {}", e.getMessage());
}
return false;
}

/**
* 删除 redis key
* <p>集群模式和单机模式执行脚本方法一样,但没有共同的接口
* <p>使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而 redis 锁自动过期失效的时候误删其他线程的锁
*/
private boolean deleteKey(List<String> keys, List<String> args) {
Object result = stringRedisTemplate.execute((RedisCallback<Object>) connection -> {
Object nativeConnection = connection.getNativeConnection();

// 单机模式
if (nativeConnection instanceof Jedis) {
return ((Jedis) nativeConnection).eval(LUA_UNLOCK_SCRIPT, keys, args);
}

// 集群模式
else if (nativeConnection instanceof JedisCluster) {
return ((JedisCluster) nativeConnection).eval(LUA_UNLOCK_SCRIPT, keys, args);
}

return 0L;
});

return result != null && Long.parseLong(result.toString()) > 0;
}
}

后记

可以基于 AOP 定义 Redis 分布式锁注解,在方法级别上使用更加方便。

参考与感谢

Redis 官方文档:SETNX key value

If these articles are helpful to you, you can donate comment here.