redis实现分布式锁

发布时间:2022-04-01 16:55:18 作者:yexindonglai@163.com 阅读(882)

什么是分布式锁

分布式锁的作用是为了在分布式环境下保证数据的一致性,是解决服务间通讯安全的一种手段,指的是将并行调用转为串行调用,防止多个服务在修改一个变量时产生的数据不一致问题;如下图

实现分布式锁的方式

就目前而言比较主流的分布式锁实现方式有2种

  1. redis
  2. zookeeper

既然,redis和zk都能实现分布式锁,那我们到底用哪个比较好呢,其实这要看你的实际情况,

  • 如果你的业务中并发量比较大,推荐你使用redis, 只是如果主从架构中主节点宕机的话有可能会出现数据不一致的情况,只是这个几率很小,这个问题可以用red Lock(红锁)来解决,但是红锁的方案目前来说争议比较大,会损失性能,并且也不能百分百解决问题,所以这种方案很少人使用;
  • 如果你的业务中必须要保证数据的一致性,推荐你使用zookeeper,因为zk天生就是用来做分布式锁的,但是确定也很明显,zk的性能远远达不到redis的高并发量

在本章中我们将讲解如何用redis实现分布式锁

第一种方案

1、加锁

今天我们主要谈使用redis来实现分布式锁,通常这也是面试经常问到的问题之一,要实现分布式锁,就要用到redis的set nx命令,我们先来看看他的用法

  1. # SETNX :SET if Not Exists (如果不存在,则 SET)
  2. SET KEY VALUE [EX seconds] [PX milliseconds] [NX|XX]
  • EX seconds:设置指定的过期时间(以秒为单位)
  • PX milliseconds:设置指定的过期时间(以毫秒为单位)
  • NX:仅在key不存在时设置键,如果key已存在,则不可以设置
  • XX:仅在key存在时设置键,如果key不存在,则不可以设置

加锁命令如下

  1. set lockKey val NX

2、解锁

解锁非常简单,只需要将key删除即可

  1. del lockKey

3、第一种方案出现的问题

到这里一个分布式锁就已经完成了,加锁解锁的流程如下图

但是这种方式有以下几个问题:
1、第一个问题:执行业务逻辑的过程中抛异常了怎么办?
如下图中红色部分,执行业务逻辑部分抛异常了,抛出异常肯定不会执行解锁的逻辑,就会导致死锁;

这个问题其实也很好解决。因为java是支持try catch异常捕捉的,我们只需要将加锁业务逻辑的操作放到 异常块里面即可解决问题,就像这样

  1. try{
  2. // 上锁
  3. redis.lock();
  4. 业务逻辑部分。。。。
  5. } catch (Exception e) {
  6. e.printStackTrace();
  7. } finally {
  8. // 解锁
  9. redis.unlock();
  10. }

2、第二个问题:这也是一个很严重的问题,如果客户端宕机了怎么办?
最要命的是刚加完锁就宕机了,后面的业务逻辑和解锁都没执行,那么这种情况下,异常捕捉就起不了作用了;直接导致死锁,宕机情况流程如下

这个问题就需要用到第二种方案来解决;

第二种方案

1、设置超时机制

针对以上第二个问题,可以给key设置一个超时时间,加锁后到一定时间后若还未解锁,就会自动解锁;这里将超时时间设置为30秒,命令如下

  1. set lockKey val EX 30 NX

设置超时时间后,就算服务器宕机的情况下,超过30秒即会自动解锁;

2、超市机制产生的问题

比如有以下的执行场景

  1. 第1个线程加锁成功,开始执行业务逻辑
  2. 第1个线程执行的业务逻辑超过了30秒,此时触发了超时机制,自动解锁了
  3. 因为第1个线程超时解锁了,所以第2个线程抢到锁资源,加锁成功
  4. 当第2个线程执行到第8秒的时候,第1个线程的业务逻辑执行完了,然后执行了第1个线程的解锁方法,因为第1个线程的锁已经没了,所以解的第二个线程的锁
  5. 当第2个线程创建的锁被第1个线程解了,数据自然会出现问题

此时这个锁资源就乱套了,整体流程如下:

这个问题要怎么解决呢?很简单,给每个线程上锁时给lockKey设置一个uuid即可,并且,在解锁时先判断这个uuid是否当前线程的uuid,如果uuid匹配上才进行解锁,如果匹配不上,就代表有问题了,流程如下

虽然加上了uuid,保证了当前线程加的锁只能自己解锁,但是根据上图的红色部分,万一出现uuid匹配不上的问题怎么办呢? 那么接下来就要用到第三种方案来解决这个问题了

第三种方案

1、加上自动续时功能

针对以上问题,可以使用自动续时功能,什么叫自动续时呢?其实就是在加锁成功后启动一个后台线程(看门狗),这个线程只做一件事,每隔一段时间都去判断一下这个锁资源是否还存在,如果还存在,重置这个锁的时间,让它恢复到设定的超时时间;这个就叫做自动续时功能,这样讲可能还不太好理解,举个栗子吧

比如将锁的超时时间设为30秒,在加锁成功后,启动一个后台线程(看门狗),每隔10秒钟(取超时时间的三分之一, 30/3=10)去判断一下当前线程的锁是否还存在,如果还在,将超时时间在设为30秒,如果不存在,就表示这个锁已经被释放了,就不用管它了. 整体流程如下图

2、如果一直自动续时怎么办?不就变成死锁了吗?

有些细心的同学会考虑到,如果业务逻辑执行过程中发生异常怎么办? 加锁后宕机了怎么办?是否也会造成死锁的问题,我们一个个看这个问题

2.1、业务逻辑执行过程中发生异常

如果业务逻辑发生了异常,就会执行到finally块的内容,也就是解锁的逻辑,解锁后自然不会有问题;

2.2、redis客户端宕机了怎么办?会死锁吗?

首先我们要知道,自动续时的后台线程是在客户端上运行的,如果客户端宕机了,自动续时的后台线程肯定也会停止,自然就不会再续时了,而且因为设置了超时机制,就算客户端宕机了,服务端超时后会自动解锁;死锁的问题肯定不存在了;流程如下

2.3 redis服务端宕机了怎么办?

增加集群和哨兵机制即可解决问题

redisson

以上的第三种方案就是redisson的底层原理,通过增加一个看门狗的线程来实时监控锁的状态,那么这里又有个问题了,redisson的底层是如何保证原子性的呢? 通过解答以下的2个问题我们就可以知道redisson是如何保证原子性的了

redisson 底层是如何上锁的?

redisson 上锁是通过hset nx命令来保证原子性的,但是仅仅这样还不够,因为redisson除了上锁之外还需要执行其他的逻辑,所以 redisson 使用了lua脚本来上锁,redis在执行lua脚本时会暂停其他的操作,保证了原子性,redisson上锁的lua脚本如下

  1. <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  2. this.internalLockLeaseTime = unit.toMillis(leaseTime);
  3. return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
  4. "if (redis.call('exists', KEYS[1]) == 0) then " + // 判断key是否存在
  5. " redis.call('hset', KEYS[1], ARGV[2], 1); " + // 若key不存在,使用hset设置key
  6. " redis.call('pexpire', KEYS[1], ARGV[1]);" + // 给key设置一个过期时间
  7. " return nil; " +
  8. "end; " +
  9. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 重入锁的判断逻辑
  10. " redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  11. " redis.call('pexpire', KEYS[1], ARGV[1]); " +
  12. " return nil; " +
  13. "end; " +
  14. "return redis.call('pttl', KEYS[1]);",
  15. Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
  16. }

上锁逻辑也很简单,总共就三步

  1. 判断lockKey是否存在,若存在表示已经被其他线程上锁,一直自旋判断,直到lockKey不存在为止
  2. 若lockKey不存在,使用hset设置 lockKey 的值,并设置过期时间,默认过期时间30秒;可在源码org.redisson.config.Config.class的构造函数中看到,构造函数源码如下:
    1. public Config() {
    2. this.transportMode = TransportMode.NIO;
    3. this.lockWatchdogTimeout = 30000L;
    4. this.keepPubSubOrder = true;
    5. this.addressResolverGroupFactory = new DnsAddressResolverGroupFactory();
    6. }
  3. 判断是否重入锁

redisson 底层是如何续期的?

redisson 使用了一个看门狗的后台线程来监控锁的状态,在续期的过程中也是通过lua脚本来保证原子性的;源码如下

  1. protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  2. return this.commandExecutor.evalWriteAsync(this.getName(),
  3. LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  4. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  5. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  6. "return 1; end; " +
  7. "return 0;",
  8. Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
  9. }

这的lua脚本其实就做了2件事

  1. 判断lockKey是否存在,已存在就表示锁还在,不存在就表示锁已被释放
  2. 若key存在,重新将过期时间设为30秒

实际上redisson 底层大部分的操作都是通过lua脚本来进行的,也没有用到set nx的功能,lua脚本更加的可靠,因为它可以同时执行多条命令,并且保证原子性

总结

通过流程图也可以看到,若要使用redis实现分布式锁,会存在一些性能问题,因为在等待锁的过程中会一直自旋,周而复始的去判断是否有锁资源,如果要用高性能的分布式锁,还是需要用zookeeper,因为zookeeper天生就是用来做分布式锁的;

关键字Redis