条件与刚需 一个靠谱的分布式锁需要具备的条件和刚需
Lua脚本 在我们redis中,判断分布式锁的K-V中,K是业务相关的锁名称,可以随意,而V是跟线程有关的ID,因为我们del 删除K的时候要判断是不是当前线程加的,不能删除别的线程加的锁(不能误删),所以我们在删除K的时候会判断V是不是当前线程自己的,而在高并发多线程的情况下,判断和删除不是原子性的,因为我们采用Lua脚本确保两个操作的原子性。
Lua案例 在我们的Redis官网中,解决上述判断锁和删除锁的Lua脚本是这样的
1 2 3 4 5 if redis.call ('get' ,KEYS[1 ]) == ARGV[1 ] then return redis.call ('del' ,KEYS[1 ])else return 0 end
传入参数是KEYS[]和ARGV[] ,并且数组下标是以1开始的
在我们redis中,调用脚本使用EVAL命令
案例:使用mset写入多个值,我们的Lua脚本是这样写的
但是这样写有个弊端:k1,v1,k2,v2写死了,不利于复用
改下如下:
分析:这里的KEYS[]是Key数组,call是调用的意思,ARGV[]是value数组,后面的2是numKeys的意思,代表是keys的数量,这里有两个key所以是2,k1,k2是key,连续传入,lua1,lua2是参数
Lua条件判断 格式:
1 2 3 4 5 6 7 if 布尔条件 then 业务代码elseif 布尔条件 then 业务代码else 业务代码end
除了最后一个判断,每个条件判断后都需要加上then,并且有if最后一定得有end结尾,其他和Java差不多,条件判断内括号可加可不加
解决上述误删所调用的Lua脚本编写
可重入 我们上面实现的分布式没有考虑可重入问题,容易导致死锁问题,
我们原来的分布式锁长这样: keyName UUID+ThreadID,改进之后需要记录重入次数,所以改进之后的结果长这样
1 2 3 4 5 keyName UUID+ThreadID 0 keyName UUID+ThreadID 1 keyName UUID+ThreadID 2 keyName UUID+ThreadID 1 keyName UUID+ThreadID 0
重入一次+1,当为0时表示已经可以释放锁了
那么就需要采用hash结构
改为hash结构后lock/unlock的Lua脚本编写 Lock
加锁的Lua脚本,对标我们的lock方法(v1版)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if redis.call ('exists' ,'key' ) == 0 then redis.call ('hset' ,'key' ,'uuid:threadID' ,1 ) redis.call ('expire' ,'key' ,30 ) return 1 elseif redis.call ('hexists' ,'key' ,'uuid:threadID' ) == 1 then redis.call ('hincrby' ,'key' ,'uuid:threadID' ,1 ) redis.call ('expire' ,30 ) return 1 else return 0 end
但是上述的代码前两个逻辑几乎一致,所以是否可以合并简洁?经过测试,
hincrby可以代替hset的作用,也能创建key,所以v1版本可以改进
1 2 3 4 5 6 7 if redis.call ('exists' ,'key' ) == 0 or redis.call ('hexists' ,'key' ,'uuid:threadID' ) == 1 then redis.call ('hincrby' ,'key' ,'uuid:threadID' ,1 ) redis.call ('expire' ,30 ) return 1 else return 0 end
1 2 3 4 5 6 7 if redis.call ('exists' ,KEY [1]) == 0 or redis.call ('hexists' ,KEY [1],ARGV[1]) == 1 then redis.call ('hincrby' ,KEY [1],ARGV[1],1) redis.call ('expire' ,ARGV[2]) return 1 else return 0end
unlock 流程:首先判断是否有锁,没有直接返回nil,如果有且是自己的,调用incrby -1直到为0时删除锁
1 2 3 4 5 6 7 8 9 10 if redis.call ('hexists' ,'key' ,'uuid:threadID' ) == 0 then return nil elseif redis.call ('hincrby' ,'key' ,'uuid:threadID' ,-1 ) == 0 then return redis.call ('del' ,key)else return 0 end
1 2 3 4 5 6 7 8 if redis.call ('hexists' ,KEYS[1],ARGV[1]) == 0 then return nil //1.2 ,存在的话重入次数-1,当为0时del key elseif redis.call ('hincrby' ,KEYS[1],ARGV[1],-1) == 0 then return redis.call ('del' ,key )else return 0end
分布式锁在Java中的应用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public class RedisDistributedLock implements Lock { private StringRedisTemplate redisTemplate; private String lockName; private String field; private int EXPIRE; public RedisDistributedLock (StringRedisTemplate redisTemplate, String lockName) { this .redisTemplate = redisTemplate; this .lockName = lockName; this .field = "UUID" +Thread.currentThread().getId(); this .EXPIRE = 25 ; } @Override public void lock () { tryLock(); } @Override public boolean tryLock () { try { tryLock(-1 ,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return false ; } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { if (time == -1 ){ String script = "if redis.call('exists',KEY[1]) == 0 or redis.call('hexists',KEY[1],ARGV[1]) == 1 then " + "redis.call('hincrby',KEY[1],ARGV[1],1)" + "redis.call('expire',ARGV[2])" + "return 1" + "else " + "treturn 0" + "end" ; while (!redisTemplate.execute(new DefaultRedisScript <>(script,Boolean.class), Arrays.asList(lockName),field,EXPIRE)){ Thread.sleep(60 ); } return true ; } return false ; } @Override public void unlock () { String script = "if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then " + "return nil" + "elseif redis.call('incrby',KEYS[1],ARGV[1],-1) == 0 then" + "return redis.call('del',key)" + "else" + "return 0" + "end" ; Long flag = redisTemplate.execute(new DefaultRedisScript <>(script,Long.class), Arrays.asList(lockName),field); if (flag == null ){ throw new RuntimeException ("锁不存在" ); } } @Override public Condition newCondition () { return null ; } @Override public void lockInterruptibly () throws InterruptedException { } }
看门狗机制实现 首先先实现续期的Lua脚本
1 2 3 4 5 if redis.call ('HEXISTS' ,KEYS[1 ],ARGV[1 ]) == 1 then return redis.call ('expire' ,KEYS[1 ],ARGV[2 ])else return 0 end
思路,每次加锁成功后,开一个定时器每过10S再次调用重置时间函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void resetExpire ( ) { String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then" + "return redis.call('expire',KEYS[1],ARGV[2])" + "else" + "return 0" + "end" ; new Timer ().schedule (new TimerTask () { @Override public void run ( ) { if (redisTemplate.execute (new DefaultRedisScript <>(script,Boolean .class ), Arrays .asList (lockName),field,EXPIRE )){ resetExpire (); } } },(this .EXPIRE *1000 )/3 ); }