首页 资讯频道 互联频道 智能频道 网络 数据频道 安全频道 服务器频道 存储频道

详解如何实现一个 Redis 分布式锁

2020-08-03 11:50:17 来源 : 马哥Linux运维

在我们日常开发中,难免会遇到要加锁的情景。例如扣除产品库存,首先要从数据库中取出库存,进行库存判断,再减去库存。这一波操作明显不符合原子性,如果代码块不加锁,很容易因为并发导致超卖问题。咱们的系统如果是单体架构,那我们使用本地锁就可以解决问题。如果是分布式架构,就需要使用分布式锁。

方案

使用 SETNX 和 EXPIRE 命令

SETNXkeyvalue

EXPIREkeyseconds

DELkey

if(setnx("item_1_lock",1)){

expire("item_1_lock",30);

try{

...逻辑

}catch{

...

}finally{

del("item_1_lock");

}

}

这种方法看起来可以解决问题,但是有一定的风险,因为 SETNX 和 EXPIRE 这波操作是非原子性的,如果 SETNX 成功之后,出现错误,导致 EXPIRE 没有执行,导致锁没有设置超时时间形成死锁。

针对这种情况,我们可以使用 lua 脚本来保持操作原子性,保证 SETNX 和 EXPIRE 两个操作要么都成功,要么都不成功。

if(redis.call('setnx',KEYS[1],ARGV[1])<1)

thenreturn0;

end;

redis.call('expire',KEYS[1],tonumber(ARGV[2]));

return1;

通过这样的方法,我们初步解决了竞争锁的原子性问题,虽然其他功能还未实现,但是应该不会造成死锁🤪🤪🤪。

Redis 2.6.12 以上可灵活使用 SET 命令

SETkeyvalueNXEX30

DELkey

if(set("item_1_lock",1,"NX","EX",30)){

try{

...逻辑

}catch{

...

}finally{

del("item_1_lock");

}

}

改进后的方法不需要借助 lua 脚本就解决了 SETNX 和 EXPIRE 的原子性问题。现在我们再仔细琢磨琢磨,如果 A 拿到了锁顺利进入代码块执行逻辑,但是由于各种原因导致超时自动释放锁。在这之后 B 成功拿到了锁进入代码块执行逻辑,但此时如果 A 执行逻辑完毕再来释放锁,就会把 B 刚获得的锁释放了。就好比用自己家的钥匙开了别家的门,这是不可接受的。

为了解决这个问题我们可以尝试在 SET 的时候设置一个锁标识,然后在 DEL 的时候验证当前锁是否为自己的锁。

Stringvalue=UUID.randomUUID().toString().replaceAll("-","");

if(set("item_1_lock",value,"NX","EX",30)){

try{

...逻辑

}catch{

...

}finally{

...lua脚本保证原子性

}

}

if(redis.call('get',KEYS[1])==ARGV[1])

thenreturnredis.call('del',KEYS[1])

elsereturn0

end

到这里,我们终于解决了竞争锁的原子性问题和误删锁问题。但是锁一般还需要支持可重入、循环等待和超时自动续约等功能点。下面我们学习使用一个非常好用的包来解决这些问题。

入门 Redisson

Redission 的锁,实现了可重入和超时自动续约功能,它都帮我们封装好了,我们只要按照自己的需求调用它的 API 就可以轻松实现上面所提到的几个功能点。详细功能可以查看 Redisson 文档

在项目中安装 Redisson

org.redisson

redisson

3.13.2

 

implementation'org.redisson:redisson:3.13.2'

用 Maven 或者 Gradle 构建,目前最新版本为 3.13.2,也可以在这里 Redisson 找到你需要的版本。

简单尝试

RedissonClientredissonClient=Redisson.create();

RLocklock=redissonClient.getLock("lock");

booleanres=lock.lock();

if(res){

try{

...逻辑

}finally{

lock.unlock();

}

}

Redisson 将底层逻辑全部做了一个封装 📦,我们无需关心具体实现,几行代码就能使用一把完美的锁。下面我们简单折腾折腾源码 🤔🤔🤔。

加锁

privatevoidlock(longleaseTime,TimeUnitunit,booleaninterruptibly)throwsInterruptedException{

longthreadId=Thread.currentThread().getId();

Longttl=tryAcquire(leaseTime,unit,threadId);

if(ttl==null){

return;

}

RFuturefuture=subscribe(threadId);

if(interruptibly){

commandExecutor.syncSubscriptionInterrupted(future);

}else{

commandExecutor.syncSubscription(future);

}

try{

while(true){

ttl=tryAcquire(leaseTime,unit,threadId);

if(ttl==null){

break;

}

if(ttl>=0){

try{

future.getNow().getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);

}catch(InterruptedExceptione){

if(interruptibly){

throwe;

}

future.getNow().getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);

}

}else{

if(interruptibly){

future.getNow().getLatch().acquire();

}else{

future.getNow().getLatch().acquireUninterruptibly();

}

}

}

}finally{

unsubscribe(future,threadId);

}

}

获取锁

privateRFuturetryAcquireAsync(longleaseTime,TimeUnitunit,longthreadId){

if(leaseTime!=-1){

returntryLockInnerAsync(leaseTime,unit,threadId,RedisCommands.EVAL_LONG);

}

RFuturettlRemainingFuture=tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS,threadId,RedisCommands.EVAL_LONG);

ttlRemainingFuture.onComplete((ttlRemaining,e)->{

if(e!=null){

return;

}

if(ttlRemaining==null){

scheduleExpirationRenewal(threadId);

}

});

returnttlRemainingFuture;

}

RFuturetryLockInnerAsync(longleaseTime,TimeUnitunit,longthreadId,RedisStrictCommandcommand){

internalLockLeaseTime=unit.toMillis(leaseTime);

returnevalWriteAsync(getName(),LongCodec.INSTANCE,command,

"if(redis.call('exists',KEYS[1])==0)then"+

"redis.call('hincrby',KEYS[1],ARGV[2],1);"+

"redis.call('pexpire',KEYS[1],ARGV[1]);"+

"returnnil;"+

"end;"+

"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+

"redis.call('hincrby',KEYS[1],ARGV[2],1);"+

"redis.call('pexpire',KEYS[1],ARGV[1]);"+

"returnnil;"+

"end;"+

"returnredis.call('pttl',KEYS[1]);",

Collections.singletonList(getName()),internalLockLeaseTime,getLockName(threadId));

}

删除锁

publicRFutureunlockAsync(longthreadId){

RPromiseresult=newRedissonPromise();

RFuturefuture=unlockInnerAsync(threadId);

future.onComplete((opStatus,e)->{

cancelExpirationRenewal(threadId);

if(e!=null){

result.tryFailure(e);

return;

}

if(opStatus==null){

IllegalMonitorStateExceptioncause=newIllegalMonitorStateException("attempttounlocklock,notlockedbycurrentthreadbynodeid:"

+id+"thread-id:"+threadId);

result.tryFailure(cause);

return;

}

result.trySuccess(null);

});

returnresult;

}

protectedRFutureunlockInnerAsync(longthreadId){

returnevalWriteAsync(getName(),LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,

"if(redis.call('hexists',KEYS[1],ARGV[3])==0)then"+

"returnnil;"+

"end;"+

"localcounter=redis.call('hincrby',KEYS[1],ARGV[3],-1);"+

"if(counter>0)then"+

"redis.call('pexpire',KEYS[1],ARGV[2]);"+

"return0;"+

"else"+

"redis.call('del',KEYS[1]);"+

"redis.call('publish',KEYS[2],ARGV[1]);"+

"return1;"+

"end;"+

"returnnil;",

Arrays.asList(getName(),getChannelName()),LockPubSub.UNLOCK_MESSAGE,internalLockLeaseTime,getLockName(threadId));

}

总结

使用 Redis 做分布式锁来解决并发问题仍存在一些困难,也有很多需要注意的点,我们应该正确评估系统的体量,不能为了使用某项技术而用。要完全解决并发问题,仍需要在数据库层面做功夫。

最近更新