Semaphore#tryAcquire
表示阻塞并等待唤醒 。那么信号量什么时候被唤醒呢?在订阅方法中RedissonLock#subscribe
。订阅方法的逻辑也不少,咱们直接讲其最终调用的处理方法
// LockPubSub#onMessageprotected void onMessage(RedissonLockEntry value, Long message) {// 普通的解锁走的是这个if (message.equals(UNLOCK_MESSAGE)) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}// 这里就是唤醒信号量的地方value.getLatch().release();// 这个是读写锁用的} else if (message.equals(READ_UNLOCK_MESSAGE)) {while (true) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute == null) {break;}runnableToExecute.run();}value.getLatch().release(value.getLatch().getQueueLength());}}
value.getLatch().release() 也就是Semaphore#release
,会唤醒Semaphore#tryAcquire
阻塞的线程
解锁上面我们聊了加锁,本小节来聊下解锁 。调用路径如下
// RedissonLock#unlock// RedissonBaseLock#unlockAsync(long threadId)public RFuture<Void> unlockAsync(long threadId) {// 调用lua解锁RFuture<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {// 取消看门狗cancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}
解锁的逻辑不复杂,调用lua脚本解锁以及取消看门狗 。看门狗晚点说,先说下lua解锁
// RedissonLock#unlockInnerAsyncprotected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}
老规矩,分析下这段lua:
- 如果锁不存在,返回null
- 锁的值减1,如果锁的值大于0(也就是可重入锁仍然有加锁次数),则重新设置过期时间
- 如果锁的值小于等于0,这说明可以真正解锁了,删除锁并通过发布订阅机制发布解锁消息
RedissonLock#tryLock
中有相对应的订阅操作 。看门狗试想一个场景:程序执行需要10秒,程序执行完成才去解锁,而锁的存活时间只有5秒,也就是程序执行到一半的时候锁就可以被其他程序获取了,这显然不合适 。那么怎么解决呢?
- 方式一:锁永远存在,直到解锁 。不设置存活时间 。
这种方法的弊端在于,如果程序没解锁就挂了,锁就成了死锁
- 方式二:依然设置锁存活时间,但是监控程序的执行,如果程序还没有执行完成,则定期给锁续期 。
// RedissonLock#tryAcquireAsync// RedissonBaseLock#scheduleExpirationRenewalprotected void scheduleExpirationRenewal(long threadId) {// 创建ExpirationEntry,并放入EXPIRATION_RENEWAL_MAP中,下面的renewExpiration()方法会从map中再拿出来用ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {// 看门狗的具体逻辑renewExpiration();} finally {// 如果线程被中断了,就取消看门狗if (Thread.currentThread().isInterrupted()) {// 取消看门狗cancelExpirationRenewal(threadId);}}}}
经验总结扩展阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 源码级深度理解 Java SPI
- Dubbo-聊聊通信模块设计
- 【lwip】10-ICMP协议&源码分析
- 【lwip】09-IPv4协议&超全源码实现分析
- 京东云开发者|经典同态加密算法Paillier解读 - 原理、实现和应用
- 从BeanFactory源码看Bean的生命周期
- 详解AQS中的condition源码原理
- 实例解读丨关于GaussDB ETCD服务异常
- 【lwip】08-ARP协议一图笔记及源码实现
- 【lwip】07-链路层收发以太网数据帧源码分析