前言Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid) 。Redisson有一样功能是可重入的分布式锁 。本文来讨论一下这个功能的特点以及源码分析 。
前置知识在讲Redisson,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制,磨刀不误砍柴工 。
分布式锁的思考首先思考下,如果我们自己去实现一个分布式锁,这个锁需要具备哪些功能?
- 互斥(这是一个锁最基本的功能)
- 锁失效机制(也就是可以设置锁定时长,防止死锁)
- 高性能、高可用
- 阻塞、非阻塞
- 可重入、公平锁
- 。。。
Redis订阅/发布机制Redisson中用到了Redis的订阅/发布机制,下面简单介绍下 。
简单来说就是如果client2 、 client5 和 client1 订阅了 channel1,当有消息发布到 channel1 的时候,client2 、 client5 和 client1 都会收到这个消息 。
文章插图
图片来自 菜鸟教程-Redis发布订阅
Redisson源码版本:3.17.7
下面以Redisson官方的可重入同步锁例子为入口,解读下源码 。
RLock lock = redisson.getLock("anyLock");// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);if (res) {try {...} finally {lock.unlock();}}
加锁我用时序图来表示加锁和订阅的过程 。时序图中括号后面的c1、c2代表client1,client2文章插图
当线程2获取了锁但还没释放锁时,如果线程1去获取锁,会阻塞等待,直到线程2解锁,通过Redis的发布订阅机制唤醒线程1,再次去获取锁 。
加锁方法是 lock.tryLock(100, 10, TimeUnit.SECONDS),对应着就是
RedissonLock#tryLock
/** * 获取锁 * @param waitTime尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败 * @param leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完) * @param unit 时间单位 * @return 获取锁成功返回true,失败返回false */@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();// 当前时间long threadId = Thread.currentThread().getId();// 当前线程id// 尝试加锁,加锁成功返回null,失败返回锁的剩余超时时间Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// 获取锁成功if (ttl == null) {return true;}// time小于0代表此时已经超过获取锁的等待时间,直接返回falsetime -= System.currentTimeMillis() - current;if (time <= 0) {// 没看懂这个方法,里面里面空运行,有知道的大神还请不吝赐教acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);try {subscribeFuture.get(time, TimeUnit.MILLISECONDS);} catch (TimeoutException e) {if (!subscribeFuture.cancel(false)) {subscribeFuture.whenComplete((res, ex) -> {// 出现异常,取消订阅if (ex == null) {unsubscribe(res, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;} catch (ExecutionException e) {acquireFailed(waitTime, unit, threadId);return false;}try {// 判断是否超时(超过了waitTime)time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {// 再次获取锁,成功则返回long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 阻塞等待信号量唤醒或者超时,接收到订阅时唤醒// 使用的是Semaphore#tryAcquire()currentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {// 因为是同步操作,所以无论加锁成功或失败,都取消订阅unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);}//return get(tryLockAsync(waitTime, leaseTime, unit));}
经验总结扩展阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 源码级深度理解 Java SPI
- Dubbo-聊聊通信模块设计
- 【lwip】10-ICMP协议&源码分析
- 【lwip】09-IPv4协议&超全源码实现分析
- 京东云开发者|经典同态加密算法Paillier解读 - 原理、实现和应用
- 从BeanFactory源码看Bean的生命周期
- 详解AQS中的condition源码原理
- 实例解读丨关于GaussDB ETCD服务异常
- 【lwip】08-ARP协议一图笔记及源码实现
- 【lwip】07-链路层收发以太网数据帧源码分析