# redisson **Repository Path**: MyCodeStudio/redisson ## Basic Information - **Project Name**: redisson - **Description**: redisson笔记 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 3 - **Created**: 2022-05-25 - **Last Updated**: 2022-06-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 分布式锁 ### redisson #### 1. 可重入锁-非公平锁 ##### 1.1 加锁逻辑 reids cluster默认有16384个槽位,客户端通过对加锁key使用crc16算法进行hash拿到对应的slot槽位,再通过槽位拿到对应cluster node节点的地址,接着执行lua脚本。如果加锁成功,启动后台线程,定时对锁进行续约(看门狗) ```lua // 当前无锁,加锁并返回 if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // 当前有锁,持有锁的是自己,设置自增值和过期时间,并返回 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // 当前有锁,持有锁的不是自己,返回锁过期时间 return redis.call('pttl', KEYS[1]); ``` ##### 1.2 可重入锁如何进行续约的 redis中存在该锁,就延长有效期为30秒,续约是看门狗来做的,前面的加锁逻辑执行成功之后,如果线程拿到了锁,会启动一个后台线程(看门狗),每隔10秒对锁进行一次续约,每次续约将锁的有效时间设置为30秒 ```lua if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1 end return 0 ``` ##### 1.3 可重入锁的互斥阻塞 如果客户端A加锁成功,此时客户端B过来加锁,肯定是加锁失败的。lua脚本执行返回锁的过期时间,然后客户端B订阅锁信道,阻塞自己直至接受锁释放通知,开始循环获取锁,直至成功 ##### 1.4 可重入锁的释放锁 客户端释放锁,如果锁被重入过,将hash值-1,重新设置过期时间。如果锁未被重入且成功被释放,移除看门狗(续期定时任务) ```lua // 锁不存在, 发布消息到指定通道中 if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end; // 锁存在, 持有锁的不是自己 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; // 持有锁的是自己, hash值-1 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); // 锁被重入过, 重置过期时间为30s if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else // 锁被释放, 删除key, 发送消息到指定通道中 redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil; ``` ##### 1.5 可重入锁的获取锁超时与自动释放 获取锁超时:执行加锁代码,成功返回;失败之后判断是否获取锁超时(当前时间-获取锁时的时间),如果超时就退出。否则将自己注册到锁信道上,再判断是否获取锁超时。进入循环尝试获取锁。每次循环都会判断是否超时,达到最大超时时间之后还没获取到锁就会退出。 自动释放:和重入锁一样的逻辑,只不过执行lua脚本时,过期参数变成了我们设置的过期时间。执行完成之后,可重入锁是会调用看门狗进行自动续约的。而自动释放模式下,不会调用看门狗对锁进行自动续约,从而达到锁到期自动释放的目的。 ##### 1.6 总结 获取锁:在redis中设置hash值,生命周期为30s 维持加锁:后台看门狗每隔10s重置锁的生命周期为30s 锁重入:锁可以重入,重入的时候设置锁值自增1 互斥阻塞:锁被别人获取时,会一直阻塞直至加锁成功 锁释放:hash锁值-1,如果完全释放掉,锁就被释放 宕机自动释放锁:持有锁客户端挂掉后,看门狗也会挂掉,锁最多被持有30s就会过期 #### 2. 可重入锁-公平锁 ##### 2.1 加锁逻辑 客户端A加锁。进入while循环中,从排队队列中获取队头元素,但是首次进入队列是空的,返回false,退出while循环。此时进入加锁逻辑,anyLock锁不存在且redisson_lock_queue:{anyLock}队列不存在,设置anyLock的kv并设置过期时间 客户端B加锁,此时锁被客户端A持有着,进入while循环中,获取排队队头元素,队头元素不存在,返回false,退出while循环。进入加锁逻辑,anyLock锁存在且持有锁的不是自己。进入排队逻辑,当前排队对头元素不存在,获取锁当前过期时间ttl,设置分数set的key为值为uuid-thread2,分数(也就是过期时间)为当前时间+5000+ttl,并把当前线程uuid-thread2入队成为队头 客户端C加锁。进入到while循环中,获取此时排队头元素uuid-thread2的分数(过期时间),退出while循环。进入加锁逻辑,anyLock锁存在且持有锁的不是自己。进入排队逻辑,排队队列队头元素存在且不是自己,设置ttl为队头元素分数与当前时间的差值,设置分数set的key为uuid-thread3,分数(也就是过期时间)为当前时间+5000+ttl,并把当前线程uuid-thread3入队,跟在uuid-thread2的后方 队头元素拿到锁之后,会把自己从队列中移除 ```lua while true do local firstThreadId2 = redis.call('lindex', KEYS[2], 0); if firstThreadId2 == false then break; end; local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2)); if timeout <= tonumber(ARGV[4]) then redis.call('zrem', KEYS[3], firstThreadId2); redis.call('lpop', KEYS[2]); else break; end; end; if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then redis.call('lpop', KEYS[2]); redis.call('zrem', KEYS[3], ARGV[2]); redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; local firstThreadId = redis.call('lindex', KEYS[2], 0); local ttl; if firstThreadId ~= false and firstThreadId ~= ARGV[2] then ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]); else ttl = redis.call('pttl', KEYS[1]); end; local timeout = ttl tonumber(ARGV[3]); if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then redis.call('rpush', KEYS[2], ARGV[2]); end; return ttl; ``` keys --> anyLock, redisson_lock_queue:{anyLock}, redisson_lock_timeout:{anyLock} vals --> internalLockLeaseTime, uuid-threadid, currentTime + threadWaitTime, currentTime ##### 2.2 重入加锁 锁存在且持有锁的是自己,设置hash-key的val自增1,并设置过期时间为30s hexists anyLock uuid-thread1 == 1 // 成立 hincrby anyLock uuid-thread1 1 pexpire anyLock 30000 ##### 2.3 排队分数刷新 我得出的结论是排队队列中,每个元素与队头元素的分数差值都为5000,且在重新加锁的时候刷新自己的分数。 客户端B再次加锁,锁存在且持有锁的不是自己。刷新排队分数为ttl(anyLock过期时间) + currentTime + 5000。这里有个小细节,zadd的时候set中存在key,只会刷新分数 客户端C再次加锁,锁存在且持有锁的不是自己。刷新排队分数为ttl(队头元素分数-当前时间) + currentTime + 5000。这里换算一下就是队头元素分数 + 5000 ##### 2.4 ~~队列重排~~ 新版本官方认为是bug,进行了修复 ##### 2.5 释放锁 释放锁逻辑,进入while循环,如果排队队列不存在元素或者队头元素是有效的(剔除无效队头元素),退出while循环。进入释放锁逻辑,使hash的val自增-1,拿到值如果为负数,说明锁已经释放成功了(锁可能被重入过),删除锁对应的key。此时如果队列中存在排队元素,发送消息到对应信道中唤醒排队元素。释放锁成功后,移除看门狗 ```lua while true do local firstThreadId2 = redis.call('lindex', KEYS[2], 0); if firstThreadId2 == false then break; end; local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2)); if timeout <= tonumber(ARGV[4]) then // 剔除掉队列中超时元素 redis.call('zrem', KEYS[3], firstThreadId2); redis.call('lpop', KEYS[2]); else break; end; end; // 容错处理 if (redis.call('exists', KEYS[1]) == 0) then local nextThreadId = redis.call('lindex', KEYS[2], 0); if nextThreadId ~= false then redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); end; return 1; end; if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then // 重入锁释放锁,刷新过期时间 redis.call('pexpire', KEYS[1], ARGV[2]); return 0; end; redis.call('del', KEYS[1]); local nextThreadId = redis.call('lindex', KEYS[2], 0); if nextThreadId ~= false then redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); end; return 1; ``` #### 3. 联锁 将多个RLock对象关联成为一个联锁,联锁加锁成功即包含的子锁(RLock)都加锁成功。 加锁逻辑,每把子锁的加锁等待时间为1500ms,根据子锁数量拿到一个基础等待时间,进入while循环去尝试给全部子锁加锁,直至成功。尝试给全部子锁加锁时,依次调用每个子锁的尝试加锁方法,在基础等待时间内全部加锁成功才算成功,否则为失败,失败时会释放所有加锁成功的子锁 解锁逻辑,对联锁中的所有子锁依次进行解锁,同步等待解锁执行完成 ``` public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { // 基础等待时间 = 每把锁等待时间1500ms * 锁数量 long baseWaitTime = locks.size() * 1500; long waitTime = -1; if (leaseTime == -1) { waitTime = baseWaitTime; unit = TimeUnit.MILLISECONDS; } else { …… } // 循环直至加锁成功 while (true) { if (tryLock(waitTime, leaseTime, unit)) { return; } } } ``` #### 4. 红锁 红锁,超半数节点加锁成功即为加锁成功 加锁逻辑,红锁继承自联锁,重写了一些方法。加锁逻辑和联锁大致相同,只不过更改了子锁的加锁等待时间为每把锁1500ms。子锁在基础等待时间内全部和超半数加锁成功视为红锁加锁成功,每把子锁加锁失败后会增加失败计数,加锁失败数超过指定数量或超过基础等待时间视为红锁加锁失败,失败时会释放所有加锁成功的子锁 这里超过基础等待时间会退到外层循环重新执行`tryLock`,而子锁失败数超过最大失败数后,会重置`iterator`重新新进迭代加锁 解锁逻辑和联锁相同 ``` RedissonMultiLock public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { …… // 加锁失败后的处理 if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { // 成功加锁节点超半数,视为成功 break; } if (failedLocksLimit == 0) { // 释放加锁成功的节点 unlockInner(acquiredLocks); if (waitTime == -1 && leaseTime == -1) { return false; } failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // reset iterator // 重置迭代器 while (iterator.hasPrevious()) { iterator.previous(); } } else { failedLocksLimit--; } …… } RedissonRedLock // 失败加锁限制 protected int failedLocksLimit() { return locks.size() - minLocksAmount(locks); } // 最少加锁成功数 protected int minLocksAmount(final List locks) { return locks.size()/2 + 1; } // 最大加锁等待时间 protected long calcLockWaitTime(long remainTime) { return Math.max(remainTime / locks.size(), 1); } ``` #### 5. 读写锁 分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态 ##### 5.1 读锁加锁 读锁加锁,客户端A首次加读锁,当前没有人加锁,获取的mode肯定是false,开始设置读模式。 hset anyRWLock mode read hset anyRWLock thread01 1 set {anyRWLock}:thread01:rwlock_timeout:1 1 pexpire {anyRWLock}:thread01:rwlock_timeout:1 30000 pexpire anyRWLock 30000 加锁成功之后,返回null,开启看门狗进行定时续约 ```lua // 读锁加锁 // 获取模式 -> read、write local mode = redis.call('hget', KEYS[1], 'mode'); // 没有设置过读写模式 if (mode == false) then // 设置为读模式 redis.call('hset', KEYS[1], 'mode', 'read'); redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('set', KEYS[2] .. ':1', 1); redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // 读模式或(写模式且持有锁的人是自己) if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); local key = KEYS[2] .. ':' .. ind; redis.call('set', key, 1); redis.call('pexpire', key, ARGV[1]); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]); keys -> anyRWLock, {anyRWLock}:b8378711-798f-4b0c-bb3b-f06402f0c130:1:rwlock_timeout vals -> 30000, b8378711-798f-4b0c-bb3b-f06402f0c130:1, b8378711-798f-4b0c-bb3b-f06402f0c130:1:write ``` ##### 5.2 读锁的看门狗进行续约 重置anyRWLock的过期时间为30s,获取hash结构中每个key的val,val代表着读锁的加锁数量。hash结构所有的key对应的val总和,代表着在服务器中字符键值对的数量,重置他们的过期时间为30s ```lua // 读锁看门狗进行续约 local counter = redis.call('hget', KEYS[1], ARGV[2]); if (counter ~= false) then // 重置anyRWLock过期时间 redis.call('pexpire', KEYS[1], ARGV[1]); if (redis.call('hlen', KEYS[1]) > 1) then local keys = redis.call('hkeys', KEYS[1]); for n, key in ipairs(keys) do counter = tonumber(redis.call('hget', KEYS[1], key)); // val为数值 if type(counter) == 'number' then for i=counter, 1, -1 do // 重置字符键对的过期时间 redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); end; end; end; end; return 1; end; return 0; keys -> anyRWLock, {anyRWLock} vals -> 30000, ca05eacf-a132-4af1-ad2b-83b87688a71c:1 ``` ##### 5.3 写锁加锁 写锁加锁,客户端B首次加锁,当前没有人加锁,获取的mode肯定是false,开始设置写模式 hset anyRWLock mode write hset anyRWLock thread02:write 1 pexpire anyRWLock 30000 加锁成功之后,返回null,开启看门狗进行定时续约 ```lua local mode = redis.call('hget', KEYS[1], 'mode'); // 没有人持有锁 if (mode == false) then redis.call('hset', KEYS[1], 'mode', 'write'); redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // 有人持有锁 if (mode == 'write') then if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then // 锁重入,重置过期时间 redis.call('hincrby', KEYS[1], ARGV[2], 1); local currentExpire = redis.call('pttl', KEYS[1]); redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); return nil; end; end; return redis.call('pttl', KEYS[1]); keys -> anyRWLock vals -> 30000, cb0013a1-b766-41b8-9dfa-4f8672e173a4:1:write ``` ##### 5.4 读锁与读锁非互斥 客户端A首先加读锁成功,随后客户端B进行读锁加锁,判断当前锁模式为读锁,进行加锁 hincrby anyRWLock thread02 1 set {anyRWLock}:thread02:rwlock_timeout:1 1 pexpire {anyRWLock}:thread02:rwlock_timeout:1 3000 pexpire anyLock 30000 加锁成功之后,返回null,开启看门狗进行定时续约。 读锁之间不互斥,每个客户端的看门狗都会刷新anyRWLock的过期时间和自己负责的几个字符键值对的过期时间 ##### 5.5 读锁与写锁互斥 先加读锁,后加写锁,写锁加锁时判断有人持有锁且是写锁,返回读锁过期时间进行阻塞等待 先加写锁,后加读锁,读锁加锁时判断有人持有锁且是写锁,但持有锁的不是自己,返回写锁过期时间进行阻塞等待 ##### 5.6 写锁与写锁互斥 先加写锁,然后其他客户端加写锁,加锁时判断有人持有锁且是写锁,但持有锁的不是自己,返回写锁过期时间进行阻塞等待 ##### 5.7 读写锁重入 单个线程之间 读锁+读锁,同一线程加读锁再加读锁是可以的 读锁+写锁,同一线程加读锁,就不能再加写锁了 写锁+读锁,同一线程先加写锁,再去加读锁可以**重入**,但不允许其他线程加读锁 写锁+写锁,同一线程加写锁再加写锁是可以的 ##### 5.8 释放读锁 ```lua local mode = redis.call('hget', KEYS[1], 'mode'); if (mode == false) then // 锁未被持有,通知等待加锁线程唤醒 redis.call('publish', KEYS[2], ARGV[1]); return 1; end; local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); if (lockExists == 0) then // 持有锁的线程非当前线程,返回null return nil; end; // 有锁重入情况 local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); if (counter == 0) then // 重入锁已解除,释放 redis.call('hdel', KEYS[1], ARGV[2]); end; redis.call('del', KEYS[3] .. ':' .. (counter+1)); if (redis.call('hlen', KEYS[1]) > 1) then local maxRemainTime = -3; local keys = redis.call('hkeys', KEYS[1]); for n, key in ipairs(keys) do counter = tonumber(redis.call('hget', KEYS[1], key)); if type(counter) == 'number' then for i=counter, 1, -1 do local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); maxRemainTime = math.max(remainTime, maxRemainTime); end; end; end; if maxRemainTime > 0 then // 重置过期时间 redis.call('pexpire', KEYS[1], maxRemainTime); return 0; end; if mode == 'write' then return 0; end; end; redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; ``` ##### 5.9 释放写锁 ```lua local mode = redis.call('hget', KEYS[1], 'mode'); if (mode == false) then // 锁未被持有,唤醒加锁等待线程 redis.call('publish', KEYS[2], ARGV[1]); return 1; end; if (mode == 'write') then local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); // 写锁不存在 if (lockExists == 0) then return nil; else // 写锁存在,递减1 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then // 锁被重入过,重置过期时间为30s redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else // 锁完全释放,删除hash中的对应kv redis.call('hdel', KEYS[1], ARGV[3]); if (redis.call('hlen', KEYS[1]) == 1) then // 只有写锁,删除kv并发布消息唤醒阻塞线程 redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); else // has unlocked read-locks // 含有未解锁的读锁,设置为读模式 redis.call('hset', KEYS[1], 'mode', 'read'); end; return 1; end; end; end; return nil; ``` #### 6. 信号量 ##### 6.1 设置信号量数量 信号量设置,设置字符类型的kv为信号量 ```lua local value = redis.call('get', KEYS[1]); // 存在直接退出,不存在则设置 if (value == false or value == 0) then redis.call('set', KEYS[1], ARGV[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return 0; keys -> semaphore, redisson_sc:{semaphore} vals -> 3 ``` ##### 6.2 获取信号量 获取信号量, 成功获取到,退出;否则进入while循环获取信号量,直至成功 ```lua local value = redis.call('get', KEYS[1]); // value存在且val可以被获取 if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then // 递减1 local val = redis.call('decrby', KEYS[1], ARGV[1]); return 1; end; return 0; keys -> semaphore vals -> 1 ``` ##### 6.3 释放信号量 释放信号量,字符kv,semaphore自增1,发布val到信道redisson_sc:{semaphore}中。监听信道的线程收到消息后就会被唤醒,去获取信号量 ```lua // 自增1 local value = redis.call('incrby', KEYS[1], ARGV[1]); // 发布可用信号量值到信道中 redis.call('publish', KEYS[2], value); keys -> semaphore, redisson_sc:{semaphore} vals -> 3 ``` #### 7. 闭锁 ##### 7.1 设置闭锁 设置闭锁数量,当三个锁都被获取后,才会一起向下执行,否则都会被阻塞 ```lua if redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[1], ARGV[2]); redis.call('publish', KEYS[2], ARGV[1]); return 1 else return 0 end keys -> anyCountDownLatch, redisson_countdownlatch__channel__{anyCountDownLatch} vals -> 1, 3 ``` ##### 7.2 计数递减 countDown被调用,anyCountDownLatch的val会递减1,递减之后val>0,直接退出;递减之后val<=0删除anyCountDownLatch;递减之后val=0,发布消息到信道redisson_countdownlatch__channel__{anyCountDownLatch}中 ```lua local v = redis.call('decr', KEYS[1]); if v <= 0 then redis.call('del', KEYS[1]) end; if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end; keys -> anyCountDownLatch, redisson_countdownlatch__channel__{anyCountDownLatch} vals -> 0 ``` ##### 7.3 同步等待 进入while循环中,阻塞等待直至被唤醒,当anyCountDownLatch的val<=0时,退出循环继续执行用户代码