Redisson源码解读-分布式锁( 三 )

Semaphore#tryAcquire表示阻塞并等待唤醒 。那么信号量什么时候被唤醒呢?在订阅方法中RedissonLock#subscribe 。订阅方法的逻辑也不少,咱们直接讲其最终调用的处理方法
// LockPubSub#onMessageprotected void onMessage(RedissonLockEntry value, Long message) {// 普通的解锁走的是这个if (message.equals(UNLOCK_MESSAGE)) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}// 这里就是唤醒信号量的地方value.getLatch().release();// 这个是读写锁用的} else if (message.equals(READ_UNLOCK_MESSAGE)) {while (true) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute == null) {break;}runnableToExecute.run();}value.getLatch().release(value.getLatch().getQueueLength());}}value.getLatch().release() 也就是Semaphore#release,会唤醒Semaphore#tryAcquire阻塞的线程
解锁上面我们聊了加锁,本小节来聊下解锁 。调用路径如下
// RedissonLock#unlock// RedissonBaseLock#unlockAsync(long threadId)public RFuture<Void> unlockAsync(long threadId) {// 调用lua解锁RFuture<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {// 取消看门狗cancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}解锁的逻辑不复杂,调用lua脚本解锁以及取消看门狗 。看门狗晚点说,先说下lua解锁
// RedissonLock#unlockInnerAsyncprotected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}老规矩,分析下这段lua:

  1. 如果锁不存在,返回null
  2. 锁的值减1,如果锁的值大于0(也就是可重入锁仍然有加锁次数),则重新设置过期时间
  3. 如果锁的值小于等于0,这说明可以真正解锁了,删除锁并通过发布订阅机制发布解锁消息
从 lua 中可以看到,解锁时会发布消息到 channel 中,加锁方法RedissonLock#tryLock中有相对应的订阅操作 。
看门狗试想一个场景:程序执行需要10秒,程序执行完成才去解锁,而锁的存活时间只有5秒,也就是程序执行到一半的时候锁就可以被其他程序获取了,这显然不合适 。那么怎么解决呢?
  1. 方式一:锁永远存在,直到解锁 。不设置存活时间 。
    这种方法的弊端在于,如果程序没解锁就挂了,锁就成了死锁
  2. 方式二:依然设置锁存活时间,但是监控程序的执行,如果程序还没有执行完成,则定期给锁续期 。
【Redisson源码解读-分布式锁】方式二就是Redisson的看门狗机制 。看门狗只有在没有显示指定锁的持有时间(leaseTime)时才会生效 。
// RedissonLock#tryAcquireAsync// RedissonBaseLock#scheduleExpirationRenewalprotected void scheduleExpirationRenewal(long threadId) {// 创建ExpirationEntry,并放入EXPIRATION_RENEWAL_MAP中,下面的renewExpiration()方法会从map中再拿出来用ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {// 看门狗的具体逻辑renewExpiration();} finally {// 如果线程被中断了,就取消看门狗if (Thread.currentThread().isInterrupted()) {// 取消看门狗cancelExpirationRenewal(threadId);}}}}

经验总结扩展阅读