Redis高并发分布式锁详解( 二 )


2.采用SETNX的方式加分布式锁的情况:
代码示例
public String deductStock() {String lockKey = "lock:product_101";Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);if (!result) {return "error_code";}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stock > 0) {Long realStock = (Long) stringRedisTemplate.opsForValue().decrement("stock");System.out.println("扣减成功 , 剩余库存:" + realStock);} else {System.out.println("扣减失败 , 库存不足");}} finally {stringRedisTemplate.delete(lockKey);}return "end";}发现说明
1)这种方式明显保证了在分布式情况下只有一个线程能够执行业务代码 。但是我们不可能对于用户买商品的时候返回错误提示 , 如果不断自旋的话又容易让CPU飙升 。肯定要考虑休眠与唤醒 , 但可以在上层方法里面处理 。
2)同时很明显存在个问题 , 如果我在扣减库存时候服务器宕机了 , 库存扣减还没设置【且没执行finally代码 , 那么我这个商品的锁就不会被释放 , 除非手动清除】 。
那么肯定需要设置超时时间 。如
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);会发现补一个超时时间的话依旧无法避免之前的问题 , 故加锁和设置超时时间需要保持原子性 。
3)采用原子操作:Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);基于设置了超时时间 , 那么我们如何考量超时时间呢 , 业务执行多久我们根本不可得知 。故容易出现时间到期了 , 业务还没执行完 。这就容易出现A持有锁执行任务 , 还没完成就超时了 , B持有锁执行任务 , A执行完 , 释放锁【此时会释放B的锁】的情况 。所以释放锁必须要持有锁本人才能执行 。
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {stringRedisTemplate.delete(lockKey);}所以clientId需要是分布式ID , 然后释放锁改为判断clientId符合才能去释放 。
3.改进之后的情况:
代码示例
public String deductStock() {String lockKey = "lock:product_101";String clientId = UUID.randomUUID().toString();Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);if (!result) {return "error_code";}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { Long realStock = (Long) stringRedisTemplate.opsForValue().decrement("stock");System.out.println("扣减成功 , 剩余库存:" + realStock); } else { System.out.println("扣减失败 , 库存不足"); } } finally { if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); } } return "end"; }发现说明
1)即时加了判断 , 我们会发现依旧会存在问题【因为判断与释放锁操作不是原子性的】 , 如果在判断里面加上休眠进行试验
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {Thread.sleep(20000);stringRedisTemplate.delete(lockKey);}我们会发现根本问题依旧没有解决 , 只是减少了发生的情况 。究其原因 , 本质上还是锁超时导致的 。解决这个问题就要引入一个完美的解决方案叫做锁续命 。
2)锁续命(watchDog):假设主线程抢到锁开始执行业务逻辑 , 开启一个分线程 , 在分线程里边做一个定时任务 , 比如说设置的锁超时时间是30s , 那么我们的定时任务时间就设置为10s , 定时任务设置的时间一定要比锁超时时间小 , 每10s定时任务先去判断主线程有没有结束 , 没有结束的话说明主线程就还在 , 还在进行业务逻辑操作 , 这个时候我们执行一条expire命令 , 将主线程锁的超时时间重新设置为30s , 这样的话只要主线程还没结束 , 主线程就会被分线程定时任务去做续命逻辑 , 维持在30s , 判断主线程结束 , 就不再执行续命逻辑 。

经验总结扩展阅读