Redis
分布式锁
本文中采用的
redisson
版本使用的3.16.1
。
setnx
+ expire
+ lua
setnx
+expire
负责加锁。lua
用来解锁,如果直接用del
命令直接释放锁,会把其它进程获取到的锁删除, 如果get
+del
,也需要分2次执行,也无法保证原子性。SETNX
是SET if Not eXists 的简写, 当且仅当key
不存在时将key
的值设为value
,返回1
; 若给定的key
已经存在,则SETNX
不做任何动作,返回0
。 缺点:加锁采用setnx
+expire
分两次执行,无法保证原子性。
SET key value [NX] [XX] [EX <seconds>] [PX [millseconds]]
+ lua
加锁代替了
setnx
+expire
需要分2次执行命令操作的方式,保证了原子性。 解锁依旧是用的lua
保证原子性。 缺点:加锁时只作用在一个Redis节点,如果这个master节点由于某些原因宕机或者主从切换,那么就会出现锁丢失的情况 ```
- 获取锁(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000
- 释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
### `Redisson` 的 `Redlock`
> 使用 `redlock` 的前提是每个 `master` 节点都独立并且不存在主从复制,
> 即多个单机/多个 `sentinel` /多个 `cluster` 才表示多个节点,
> 而单纯的一个 `sentinel` / `cluster` 不管几台机器都只看做一个节点。
> 加锁的源码如下:
```java
// 加锁
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,
// 如果锁的KEY不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 执行hset命令
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 执行pexpire命令,设置锁的过期时间(租约时间)
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果锁的KEY已经存在,同时value ==1,当前线程持有锁
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 锁重入次数加1,变成2
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 设置第2次获取锁的过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 返回锁的KEY的失效时间毫秒数
"return redis.call('pttl', KEYS[1]);",
// KEYS[1]:Collections.singletonList(this.getRawName()),锁的key
// ARGV[1]:new Object[]{unit.toMillis(leaseTime),锁的过期时间(租约时间)
// ARGV[2]:this.getLockName(threadId),获取锁时set的唯一值
Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
释放锁的源码如下 :
// 释放锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 如果就是当前线程占有分布式锁,那么将重入次数减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入次数如果还大于0,即锁有重入过,只设置过期时间,不能删除
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
// 重入次数减1后为0,表示分布式锁只获取过1次,删除这个KEY,并发布解锁消息
"else redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
// 5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2],ARGV[3]
Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}