Redis分布式锁进阶

条件与刚需

一个靠谱的分布式锁需要具备的条件和刚需

  • 独占性
  • 高可用
  • 防死锁
  • 不乱抢
  • 重入性

Lua脚本

在我们redis中,判断分布式锁的K-V中,K是业务相关的锁名称,可以随意,而V是跟线程有关的ID,因为我们del 删除K的时候要判断是不是当前线程加的,不能删除别的线程加的锁(不能误删),所以我们在删除K的时候会判断V是不是当前线程自己的,而在高并发多线程的情况下,判断和删除不是原子性的,因为我们采用Lua脚本确保两个操作的原子性。

Lua案例

在我们的Redis官网中,解决上述判断锁和删除锁的Lua脚本是这样的

1
2
3
4
5
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end

传入参数是KEYS[]和ARGV[],并且数组下标是以1开始的

在我们redis中,调用脚本使用EVAL命令

案例:使用mset写入多个值,我们的Lua脚本是这样写的

但是这样写有个弊端:k1,v1,k2,v2写死了,不利于复用

改下如下:

分析:这里的KEYS[]是Key数组,call是调用的意思,ARGV[]是value数组,后面的2是numKeys的意思,代表是keys的数量,这里有两个key所以是2,k1,k2是key,连续传入,lua1,lua2是参数

Lua条件判断

格式:

1
2
3
4
5
6
7
if 布尔条件 then
业务代码
elseif 布尔条件 then
业务代码
else
业务代码
end

除了最后一个判断,每个条件判断后都需要加上then,并且有if最后一定得有end结尾,其他和Java差不多,条件判断内括号可加可不加

解决上述误删所调用的Lua脚本编写

可重入

我们上面实现的分布式没有考虑可重入问题,容易导致死锁问题,

我们原来的分布式锁长这样: keyName UUID+ThreadID,改进之后需要记录重入次数,所以改进之后的结果长这样

1
2
3
4
5
keyName UUID+ThreadID 0
keyName UUID+ThreadID 1
keyName UUID+ThreadID 2
keyName UUID+ThreadID 1
keyName UUID+ThreadID 0

重入一次+1,当为0时表示已经可以释放锁了

那么就需要采用hash结构

改为hash结构后lock/unlock的Lua脚本编写

Lock

  • 加锁的Lua脚本,对标我们的lock方法(v1版)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//1.首先判断我们的Key是否存在
if redis.call('exists','key') == 0 then //不存在可以直接lock
redis.call('hset','key','uuid:threadID',1)
redis.call('expire','key',30) //加上过期时间
return 1
//2.key已经存在,判断是不是我们当前线程自己加的锁
//2.1 是我们自己的锁的话,重入次数+1
elseif redis.call('hexists','key','uuid:threadID') == 1 then
redis.call('hincrby','key','uuid:threadID',1)
redis.call('expire',30)
return 1
else
return 0
end

但是上述的代码前两个逻辑几乎一致,所以是否可以合并简洁?经过测试,

hincrby可以代替hset的作用,也能创建key,所以v1版本可以改进

  • v2
1
2
3
4
5
6
7
if redis.call('exists','key') == 0 or redis.call('hexists','key','uuid:threadID') == 1 then 
redis.call('hincrby','key','uuid:threadID',1)
redis.call('expire',30)
return 1
else
return 0
end
  • v3,换上我们的参数
1
2
3
4
5
6
7
if redis.call('exists',KEY[1]) == 0 or redis.call('hexists',KEY[1],ARGV[1]) == 1 then 
redis.call('hincrby',KEY[1],ARGV[1],1)
redis.call('expire',ARGV[2])
return 1
else
return 0
end

unlock

流程:首先判断是否有锁,没有直接返回nil,如果有且是自己的,调用incrby -1直到为0时删除锁

  • v1
1
2
3
4
5
6
7
8
9
10
//1.首先判断是否有锁
//1.1,不存在 直接返回nil
if redis.call('hexists','key','uuid:threadID') == 0 then
return nil
//1.2 ,存在的话重入次数-1,当为0时del key
elseif redis.call('hincrby','key','uuid:threadID',-1) == 0 then
return redis.call('del',key)
else //有锁,但是还没减为0
return 0
end
  • v2,将我们的参数替换上去
1
2
3
4
5
6
7
8
if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then 
return nil
//1.2 ,存在的话重入次数-1,当为0时del key
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then
return redis.call('del',key)
else
return 0
end

分布式锁在Java中的应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class RedisDistributedLock implements Lock {

private StringRedisTemplate redisTemplate;
private String lockName; //keys[1]
private String field; //ARGV[1]
private int EXPIRE; //ARGV[2]

public RedisDistributedLock(StringRedisTemplate redisTemplate, String lockName) {
this.redisTemplate = redisTemplate;
this.lockName = lockName;
this.field = "UUID"+Thread.currentThread().getId();
this.EXPIRE = 25;
}

@Override
public void lock() {
tryLock();
}

@Override
public boolean tryLock() {
try {
tryLock(-1,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if(time == -1){
String script =
"if redis.call('exists',KEY[1]) == 0 or redis.call('hexists',KEY[1],ARGV[1]) == 1 then " +
"redis.call('hincrby',KEY[1],ARGV[1],1)" +
"redis.call('expire',ARGV[2])" +
"return 1" +
"else " +
"treturn 0" +
"end";
while(!redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),field,EXPIRE)){
Thread.sleep(60);
}
return true;
}
return false;
}
@Override
public void unlock() {
String script =
"if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then " +
"return nil" +
"elseif redis.call('incrby',KEYS[1],ARGV[1],-1) == 0 then" +
"return redis.call('del',key)" +
"else" +
"return 0" +
"end";
Long flag = redisTemplate.execute(new DefaultRedisScript<>(script,Long.class), Arrays.asList(lockName),field);

if (flag == null){
throw new RuntimeException("锁不存在");
}
}

@Override
public Condition newCondition() {
return null;
}
@Override
public void lockInterruptibly() throws InterruptedException {

}
}

看门狗机制实现

首先先实现续期的Lua脚本

1
2
3
4
5
if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then
return redis.call('expire',KEYS[1],ARGV[2])
else
return 0
end

思路,每次加锁成功后,开一个定时器每过10S再次调用重置时间函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void resetExpire() {
String script =
"if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then" +
"return redis.call('expire',KEYS[1],ARGV[2])" +
"else" +
"return 0" +
"end";
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if(redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),field,EXPIRE)){
resetExpire();
}
}
},(this.EXPIRE*1000)/3);

}

Redis分布式锁进阶
http://example.com/2023/10/23/Redis分布式锁进阶/
Author
Posted on
October 23, 2023
Licensed under