一、基于 Redis 分布式锁优化
基于 setnx
实现的分布式锁存在下面问题:
- 不可重入: 同一个线程无法多次获取同一把锁(例如方法A执行前需要获取锁,然后调用方法B,方法B中也需要获取锁)
- 不可重试: 获取锁只尝试一次就返回 false,没有重试机制
- 超时释放: 锁超时释放虽然可以避免死锁,但是如果业务执行耗时较长,也会导致锁释放,存在安全隐患
- 主从一致性(读写分离): 如果 Redis 提供了主从集群,主从同步存在延迟,当主宕机时了,如果从并未同步主中的锁数据,会出现问题
二、Redisson 介绍
Redisson 是一个在Redis的基础上实现的 Java 驻内存数据网格 (In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
三、Redisson 入门
3.1、引入依赖
也有redisson-spring-boot-starter
,但是会替换官方的 redis 配置,推荐使用如下配置
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.6</version>
</dependency>
3.2、配置 Redisson 客户端:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
// 配置类
Config config = new Config();
// 添加 redis 地址,这里添加了单点地址,也可以使用 config.useClusterServers() 添加集群地址
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("root123");
// 创建客户端
return Redisson.create(config);
}
}
3.3、使用 Redisson 的分布式锁
@SpringBootTest
public class RedissonTest {
@Autowired
private RedissonClient redissonClient;
@Test
public void testRedisson() throws InterruptedException {
// 获取锁(可重入锁),指定锁名称
RLock lock = redissonClient.getLock("anyLock");
// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位,不加第一个参数默认获取锁失败直接返回
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
// 判断锁是否获取成功
if (isLock) {
try {
System.out.println("执行业务");
}finally {
// 释放锁
lock.unlock();
}
}
}
}
四、Redisson 可重入锁原理
对于之前通过 lua 脚本实现的分布式锁,可以发现是无法实现可重入锁的,可重入锁原理就是,当前线程在获取到锁之后,如果需要再次获取锁,会有一个计数器,记住当前线程总共拿了多少次锁,显然,之前 lua 脚本实现的分布式锁无法实现此效果,因为 value 是 string 类型,只存了了线程标识,想要实现可重入锁,将 value 类型改为 hash 类型即可。每次拿到锁,vlaue 都加 1,并且释放锁的时候,不是直接删除,而是 value 减 1,当 value 减少到 0 时,释放锁即删除 lock。
最终的流程大致如下,并且必须保证操作的原子性,所以这里使用 lua 脚本实现
获取锁的 Lua 脚本如下
local key = KEYS[1]; -- 锁的 key
local threadId = ARGV[1]; -- 线程的唯一标识
local releaseTime = ARGV[2]; -- 锁自动释放时间
-- 判断是否存在
if (redis.call('exists', key) == 0) then
-- 不存在,获取锁
redis.call('hset', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end ;
-- 锁已经存在
if (redis.call('hexists', key, threadId) == 1) then
-- 存在,获取锁,重入次数 + 1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取的锁的不是自己,获取锁失败
释放锁的 Lua 脚本如下
local key = KEYS[1]; -- 锁的 key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) then
return nil; -- 如果已经不是自己,则直接返回
end ;
-- 是自己的锁,则重入 -1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数已经为 0
if (count > 0) then
-- 大于 0 说明不能释放锁,重置有效期然后返回
redis.call("EXPIRE", key, releaseTime);
return nil;
else
-- 等于 0 说明可以释放锁,直接删除
redis.call('DEL', key);
end;
可重入锁代码测试
@SpringBootTest
@Slf4j
public class RedissonTest {
@Autowired
private RedissonClient redissonClient;
private RLock lock;
@BeforeEach
void setUp() {
lock = redissonClient.getLock("order");
}
/**
* 可重入锁测试
*/
@Test
void method1() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败... 1");
return;
}
try {
log.info("获取锁成功... 1");
method2();
log.info("开始执行业务... 1");
} finally {
log.warn("准备释放锁... 1");
lock.unlock();
}
}
private void method2() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败... 2");
return;
}
try {
log.info("获取锁成功... 2");
log.info("开始执行业务... 2");
} finally {
log.warn("准备释放锁... 2");
lock.unlock();
}
}
}
1.当获取锁时,成功,redis 中多了锁标识
2.当调用的方法中也获取锁时,count 加 1
3.当调用的方法释放锁时,count 减 1
4.当 method1释放锁之后,数据被删除
可以看到 redisson 源码实现方式就是通过 Lua 脚本实现
五、Redisson 分布式锁原理
- 可重入:利用 hash 结构记录线程 id 和重入次数
- 可重试:利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制
- 超时续约:利用 watchDog,每隔一段时间(releaseTime/3),重置超时时间