Redis分布式锁

redisson源码解析

Posted by Brade on July 31, 2021

Redis 分布式锁

本文中采用的 redisson 版本使用的 3.16.1

setnx + expire + lua

setnx + expire 负责加锁。 lua 用来解锁,如果直接用 del 命令直接释放锁,会把其它进程获取到的锁删除, 如果 get + del,也需要分2次执行,也无法保证原子性。 SETNX 是SET if Not eXists 的简写, 当且仅当 key 不存在时将 key 的值设为 value,返回 1 ; 若给定的 key 已经存在,则 SETNX 不做任何动作,返回 0 。 缺点:加锁采用setnx + expire 分两次执行,无法保证原子性。

SET key value [NX] [XX] [EX <seconds>] [PX [millseconds]] + lua

加锁代替了 setnx + expire 需要分2次执行命令操作的方式,保证了原子性。 解锁依旧是用的 lua 保证原子性。 缺点:加锁时只作用在一个Redis节点,如果这个master节点由于某些原因宕机或者主从切换,那么就会出现锁丢失的情况 ```

- 获取锁(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000

- 释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end


###  `Redisson` 的 `Redlock`
> 使用 `redlock` 的前提是每个 `master` 节点都独立并且不存在主从复制,
> 即多个单机/多个 `sentinel` /多个 `cluster` 才表示多个节点,
> 而单纯的一个 `sentinel` / `cluster` 不管几台机器都只看做一个节点。
> 加锁的源码如下:

```java

    // 加锁
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command, 
        // 如果锁的KEY不存在
        "if (redis.call('exists', KEYS[1]) == 0) then " +
            // 执行hset命令
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            // 执行pexpire命令,设置锁的过期时间(租约时间)
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
        "end; " +
         // 如果锁的KEY已经存在,同时value ==1,当前线程持有锁   
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            // 锁重入次数加1,变成2
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            // 设置第2次获取锁的过期时间
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
        "end; " +
         // 返回锁的KEY的失效时间毫秒数   
        "return redis.call('pttl', KEYS[1]);", 
        // KEYS[1]:Collections.singletonList(this.getRawName()),锁的key
        // ARGV[1]:new Object[]{unit.toMillis(leaseTime),锁的过期时间(租约时间)
        // ARGV[2]:this.getLockName(threadId),获取锁时set的唯一值
        Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
    }

释放锁的源码如下 :


    // 释放锁
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
         // 如果锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
         "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
         "end; " +
         // 如果就是当前线程占有分布式锁,那么将重入次数减1
         "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
         // 重入次数如果还大于0,即锁有重入过,只设置过期时间,不能删除   
         "if (counter > 0) then " +
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
         // 重入次数减1后为0,表示分布式锁只获取过1次,删除这个KEY,并发布解锁消息    
         "else redis.call('del', KEYS[1]); " +
         "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
         "end; " +
         "return nil;",
            // 5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2],ARGV[3]
         Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
    }