Redis高并发分布式锁详解( 六 )

6)Redisson类#unlock方法
//RedissonLock类#unlock方法public void unlock() {Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));if (opStatus == null) {throw new IllegalMonitorStateException(...);}if (opStatus) {//移除看门狗的定时任务cancelExpirationRenewal();}}//RedissonLock类#unlockInnerAsync方法protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//如果不存在锁"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +//当前线程并没有持有锁 , 则返回nil"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//前线程持有锁 , 则对value-1 , 拿到-1之后的vlaue"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +//value>0 , 以毫秒为单位返回剩下的过期时间 。(保证可重入)"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +//value<=0 , 则对key进行删除操作 , return 1 (方法返回 true) 。然后进行redis-pub指令 , 用于唤醒其他正在休眠的线程 。"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",//参数顺序KEYS[1](锁的名称) , KEYS[2](发布订阅的Channel名:redisson_lock__channel+锁名) , ARGV[1](发布的消息) , ARGV[2](锁超时时间) , ARGV[3](uuid + ":" + threadId)Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}7)Redisson类#tryLock方法
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long newLeaseTime = -1;if (leaseTime != -1) {newLeaseTime = unit.toMillis(waitTime)*2;}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);int failedLocksLimit = failedLocksLimit();List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (Exception e) {lockAcquired = false;}if (lockAcquired) {acquiredLocks.add(lock);} else {if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1 && leaseTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}if (remainTime != -1) {remainTime -= (System.currentTimeMillis() - time);time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}return true;}Redis与Zookeeper分布式锁的区别1.从单机角度上来说 , 两者差别不大 , 都是项目引入的外部组件 , redis相对于zookeeper来说 , 项目中使用的更多 , 常用性角度redis更加 。
2.但是一般我们都会做集群(容错率更高):
【1】从分布式的CAP角度分析:
redis满足AP , 在Master节点上写成功了会优先返回给客户端 , 之后在同步给从节点

经验总结扩展阅读