# zhl-redisson-starter **Repository Path**: l-cloud/zhl-redisson-starter ## Basic Information - **Project Name**: zhl-redisson-starter - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-09-28 - **Last Updated**: 2021-09-28 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 一、redisson分布式锁 redis官网推荐用锁 redisson文档说明地址: ```html https://github.com/redisson/redisson/wiki/8.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%92%8C%E5%90%8C%E6%AD%A5%E5%99%A8 ``` ## 一、Redisson实现分布式锁-原理 ### (1)、高效分布式锁 当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设计分布式锁,这里我认为以下几点是必须要考虑的。 #### 1、互斥 在分布式高并发的条件下,我们最需要保证,同一时刻只能有一个线程获得锁,这是最基本的一点。 #### 2、防止死锁 在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。 所以分布式非常有必要设置锁的`有效时间`,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。 #### 3、性能 对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。 所以在锁的设计时,需要考虑两点。 1、`锁的颗粒度要尽量小`。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。 2、`锁的范围尽量要小`。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。 #### 4、重入 我们知道ReentrantLock是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。关于这点之后会做演示。 针对以上Redisson都能很好的满足,下面就来分析下它。 ### (2)、Redisson原理分析 ![img](https://img2018.cnblogs.com/blog/1090617/201906/1090617-20190618183025891-1248337684.jpg) #### 1、加锁机制 线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。 线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。 #### 2、watch dog自动延期机制 这个比较难理解,找了些许资料感觉也并没有解释的很清楚。这里我自己的理解就是: 在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。 但在实际开发中会有下面一种情况: ```java //设置锁1秒过去 redissonLock.lock("redisson", 1); /** * 业务逻辑需要咨询2秒 */ redissonLock.release("redisson"); /** * 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了, * 那么这个时候 线程2 进来了。那么就存在 线程1和线程2 同时在这段业务逻辑里执行代码,这当然是不合理的。 * 而且如果是这种情况,那么在解锁时系统会抛异常,因为解锁和加锁已经不是同一线程了,具体后面代码演示。 */ ``` 所以这个时候`看门狗`就出现了,它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个watch dog后台线程,不断的延长锁key的生存时间。 `注意` 正常这个看门狗线程是不启动的,还有就是这个看门狗启动后对整体性能也会有一定影响,所以不建议开启看门狗。 #### 3、为啥要用lua脚本呢? 这个不用多说,主要是如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的**原子性**。 #### 4、可重入加锁机制 Redisson可以实现可重入加锁机制的原因,我觉得跟两点有关 1、Redis存储锁的数据类型是 Hash类型 2、Hash数据类型的key值包含了当前线程信息。 下面是redis存储的数据 ![img](https://img2018.cnblogs.com/blog/1090617/201906/1090617-20190618183037704-975536201.png) 这里表面数据类型是Hash类型,Hash类型相当于我们java的 `>` 类型,这里key是指 'redisson' 它的有效期还有9秒,我们再来看里们的key1值为`078e44a3-5f95-4e24-b6aa-80684655a15a:45`它的组成是: guid + 当前线程的ID。后面的value是就和可重入加锁有关。 **举图说明** ![img](https://img2018.cnblogs.com/blog/1090617/201906/1090617-20190618183046827-1994396879.png) 上面这图的意思就是可重入锁的机制,它最大的优点就是相同线程不需要在等待锁,而是可以直接进行相应操作。 #### 5、Redis分布式锁的缺点 Redis分布式锁会有个缺陷,就是在Redis哨兵模式下: `客户端1` 对某个`master节点`写入了redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生 master节点宕机,主备切换,slave节点从变为了 master节点。 这时`客户端2` 来尝试加锁的时候,在新的master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。 这时系统在业务语义上一定会出现问题,**导致各种脏数据的产生**。 `缺陷`在哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁。 ## 二、RedissonLock Redisson分布式锁的实现是基于RLock接口,RedissonLock实现RLock接口。 ### (1)、RLock接口 #### 1、概念 ```java public interface RLock extends Lock, RExpirable, RLockAsync ``` 很明显RLock是继承Lock锁,所以他有Lock锁的所有特性,比如lock、unlock、trylock等特性,同时它还有很多新特性:强制锁释放,带有效期的锁,。 #### 2、RLock锁API 这里针对上面做个整理,这里列举几个常用的接口说明 ```java public interface RRLock { //----------------------Lock接口方法----------------------- /** * 加锁 锁的有效期默认30秒 */ void lock(); /** * tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false . */ boolean tryLock(); /** * tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间, * 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。 * * @param time 等待时间 * @param unit 时间单位 小时、分、秒、毫秒等 */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /** * 解锁 */ void unlock(); /** * 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过 * Thread.currentThread().interrupt(); 方法真正中断该线程 */ void lockInterruptibly(); //----------------------RLock接口方法----------------------- /** * 加锁 上面是默认30秒这里可以手动设置锁的有效时间 * * @param leaseTime 锁有效时间 * @param unit 时间单位 小时、分、秒、毫秒等 */ void lock(long leaseTime, TimeUnit unit); /** * 这里比上面多一个参数,多添加一个锁的有效时间 * * @param waitTime 等待时间 * @param leaseTime 锁有效时间 * @param unit 时间单位 小时、分、秒、毫秒等 */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; /** * 检验该锁是否被线程使用,如果被使用返回True */ boolean isLocked(); /** * 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有) * 这个比上面那个实用 */ boolean isHeldByCurrentThread(); /** * 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间 * @param leaseTime 锁有效时间 * @param unit 时间单位 小时、分、秒、毫秒等 */ void lockInterruptibly(long leaseTime, TimeUnit unit); } ``` ### (2)、RedissonLock实现类 ```java public class RedissonLock extends RedissonExpirable implements RLock ``` RedissonLock实现了RLock接口,所以实现了接口的具体方法。这里我列举几个方法说明下 1、void lock()方法 ```java @Override public void lock() { try { lockInterruptibly(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } ``` 发现lock锁里面进去其实用的是`lockInterruptibly`(中断锁,表示可以被中断),而且捕获异常后用 Thread.currentThread().interrupt()来真正中断当前线程,其实它们是搭配一起使用的。 具体有关lockInterruptibly()方法讲解推荐一个博客。`博客`:[Lock的lockInterruptibly()](https://blog.csdn.net/zengmingen/article/details/53260650) 接下来执行流程,这里理下关键几步 ```java /** * 1、带上默认值调另一个中断锁方法 */ @Override public void lockInterruptibly() throws InterruptedException { lockInterruptibly(-1, null); } /** * 2、另一个中断锁的方法 */ void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException /** * 3、这里已经设置了锁的有效时间默认为30秒 (commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()=30) */ RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); /** * 4、最后通过lua脚本访问Redis,保证操作的原子性 */ RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } ``` 那么void lock(long leaseTime, TimeUnit unit)方法其实和上面很相似了,就是从上面第二步开始的。 2、tryLock(long waitTime, long leaseTime, TimeUnit unit) 这里只显示一些重要逻辑。 ```java @Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); //1、 获取锁同时获取成功的情况下,和lock(...)方法是一样的 直接返回True,获取锁False再往下走 if (ttl == null) { return true; } //2、如果超过了尝试获取锁的等待时间,当然返回false 了。 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } // 3、订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。 final RFuture subscribeFuture = subscribe(threadId); // 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。 // 只有await返回true,才进入循环尝试获取锁 if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } //4、如果没有超过尝试获取锁的等待时间,那么通过While一直获取锁。最终只会有两种结果 //1)、在等待时间内获取锁成功 返回true。2)等待时间结束了还没有获取到锁那么返回false。 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // 获取锁成功 if (ttl == null) { return true; } // 获取锁失败 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } } } ``` `重点` tryLock一般用于特定满足需求的场合,但不建议作为一般需求的分布式锁,一般分布式锁建议用void lock(long leaseTime, TimeUnit unit)。因为从性能上考虑,在高并发情况下后者效率是前者的好几倍 #### 3、unlock() 解锁的逻辑很简单。 ```java @Override public void unlock() { // 1.通过 Lua 脚本执行 Redis 命令释放锁 Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); // 2.非锁的持有者释放锁时抛出异常 if (opStatus == null) { throw new IllegalMonitorStateException( "attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } // 3.释放锁后取消刷新锁失效时间的调度任务 if (opStatus) { cancelExpirationRenewal(); } } ``` 使用 EVAL 命令执行 Lua 脚本来释放锁: 1. key 不存在,说明锁已释放,直接执行 `publish` 命令发布释放锁消息并返回 `1`。 2. key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 `nil`。 3. 因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 `hincrby` 对锁的值**减一**。 4. 释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 `0`;如果刚才释放的已经是最后一把锁,则执行 `del` 命令删除锁的 key,并发布锁释放消息,返回 `1`。 `注意`这里有个实际开发过程中,容易出现很容易出现上面第二步异常,非锁的持有者释放锁时抛出异常。比如下面这种情况 ```java //设置锁1秒过去 redissonLock.lock("redisson", 1); /** * 业务逻辑需要咨询2秒 */ redissonLock.release("redisson"); /** * 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了, * 那么这个时候 线程2 进来了。在线程1去解锁就会抛上面这个异常(因为解锁和当前锁已经不是同一线程了) */ ``` # 二、idempotent 幂等处理方案 ### 1.原理 1.请求开始前,根据key查询 查到结果:报错 未查到结果:存入key-value-expireTime key=ip+url+args 2.请求结束后,直接删除key 不管key是否存在,直接删除 是否删除,可配置 3.expireTime过期时间,防止一个请求卡死,会一直阻塞,超过过期时间,自动删除 过期时间要大于业务执行时间,需要大概评估下; 4.此方案直接切的是接口请求层面。 5.过期时间需要大于业务执行时间,否则业务请求1进来还在执行中,前端未做遮罩,或者用户跳转页面后再回来做重复请求2,在业务层面上看,结果依旧是不符合预期的。 6.建议delKey = false。即使业务执行完,也不删除key,强制锁expireTime的时间。预防5的情况发生。 7.实现思路:同一个请求ip和接口,相同参数的请求,在expireTime内多次请求,只允许成功一次。 8.页面做遮罩,数据库层面的唯一索引,先查询再添加,等处理方式应该都处理下。 9.此注解只用于幂等,不用于锁,100个并发这种压测,会出现问题,在这种场景下也没有意义,实际中用户也不会出现1s或者3s内手动发送了50个或者100个重复请求,或者弱网下有100个重复请求; ### 2.使用 - 1. 引入依赖 ```java cn.com.trade365.starter.plugin trade-lock-starter 1.0.0 ``` - 2. 配置 redis 链接相关信息 ```yaml spring: redis: host: 127.0.0.1 port: 6379 ``` 理论是支持 [redisson-spring-boot-starter](https://github.com/redisson/redisson/tree/master/redisson-spring-boot-starter) 全部配置 - 3. 接口设置注解 ```java @Idempotent(key = "#demo.username", expireTime = 3, info = "请勿重复查询") @GetMapping("/test") public String test(Demo demo) { return "success"; } ``` ### idempotent 注解 配置详细说明 - 1. 幂等操作的唯一标识,使用spring el表达式 用#来引用方法参数 。 可为空则取 当前 url + args 做表示 ```java String key(); ``` - 2. 有效期 默认:1 有效期要大于程序执行时间,否则请求还是可能会进来 ```java int expireTime() default 1; ``` - 3. 时间单位 默认:s (秒) ```java TimeUnit timeUnit() default TimeUnit.SECONDS; ``` - 4. 幂等失败提示信息,可自定义 ```java String info() default "重复请求,请稍后重试"; ``` - 5. 是否在业务完成后删除key true:删除 false:不删除 ```java boolean delKey() default false; ```