Redisson源码解读-公平锁

前言我在上一篇文章聊了Redisson的可重入锁,这次继续来聊聊Redisson的公平锁 。下面是官方原话:

它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程 。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒 。
源码版本:3.17.7
【Redisson源码解读-公平锁】这是我 fork 的分支,添加了自己理解的中文注释:https://github.com/xiaoguyu/redisson
公平锁先上官方例子:
RLock fairLock = redisson.getFairLock("anyLock");// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);...fairLock.unlock();因为在Redisson中,公平锁和普通可重入锁的逻辑大体上一样,我在上一篇文章都介绍了,这里就不再赘述 。下面开始介绍合理逻辑 。
加锁加锁的 lua 脚本在 RedissonFairLock#tryLockInnerAsync方法中
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {long wait = threadWaitTime;if (waitTime > 0) {wait = unit.toMillis(waitTime);}long currentTime = System.currentTimeMillis();if (command == RedisCommands.EVAL_NULL_BOOLEAN) {......}if (command == RedisCommands.EVAL_LONG) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,// remove stale threads"while true do " +// list为空,证明没有人排队,退出循环"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +// 能到这里,证明有人排队,拿出在排队的第一个人的超时时间,如果超时了,则移除相应数据"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +"if timeout <= tonumber(ARGV[4]) then " +// remove the item from the queue and timeout set// NOTE we do not alter any other timeout"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else " +"break;" +"end;" +"end;" +// check if the lock can be acquired now// 检查是否可以获取锁 。如果hash和list都不存在,或者线程队列的第一个是当前线程,则可以获取锁"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +// remove this thread from the queue and timeout set// 都获取锁了,当然要从线程队列和时间队列中移除"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +// decrease timeouts for all waiting in the queue// 刷新时间集合中的时间"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +// acquire the lock and set the TTL for the lease// 和公平锁的设置一样,值加1并且设置过期时间"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +// check if the lock is already held, and this is a re-entry// 能到这里,证明前面拿不到锁,但是也要做可重入锁的处理"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2],1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +// the lock cannot be acquired// check if the thread is already in the queue// 时间集合中有值,证明线程已经在队列中,不需要往后执行逻辑了"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +"if timeout ~= false then " +// the real timeout is the timeout of the prior thread// in the queue, but this is approximately correct, and// avoids having to traverse the queue// 因为下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])// 所以这里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +"end;" +// add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of// the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the// threadWaitTime"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +"local ttl;" +// 如果最后一个线程不是当前线程,则从时间集合取出(举例:线程1/2/3按顺序获取锁,此时pttl得到的是线程1的锁过期时间,zscore拿到的是线程2的锁的过期时间,此时线程3应该以线程2的为准)"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +"else " +// 否则直接获取锁的存活时间"ttl = redis.call('pttl', KEYS[1]);" +"end;" +// 过期时间 = 锁存活时间 + 等待时间 + 当前时间戳"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +// 如果添加到时间集合成功,则同时添加线程集合"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end;" +"return ttl;",Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),unit.toMillis(leaseTime), getLockName(threadId), wait, currentTime);}throw new IllegalArgumentException();}

经验总结扩展阅读