超详细!深入理解什么是 Redis 事务?Redis 事务的命令序列

时间:2020-09-23 11:47:00 来源: 运维派公众号


Redis可以看成NoSQL类型的数据库系统, Redis也提供了事务, 但是和传统的关系型数据库的事务既有相似性, 也存在区别.因为Redis的架构基于操作系统的多路复用的IO接口,主处理流程是一个单线程,因此对于一个完整的命令, 其处理都是原子性的, 但是如果需要将多个命令作为一个不可分割的处理序列, 就需要使用事务.

Redis事务有如下一些特点:

事务中的命令序列执行的时候是原子性的,也就是说,其不会被其他客户端的命令中断. 这和传统的数据库的事务的属性是类似的.

尽管Redis事务中的命令序列是原子执行的, 但是事务中的命令序列执行可以部分成功,这种情况下,Redis事务不会执行回滚操作. 这和传统关系型数据库的事务是有区别的.

尽管Redis有RDB和AOF两种数据持久化机制, 但是其设计目标是高效率的cache系统. Redis事务只保证将其命令序列中的操作结果提交到内存中,不保证持久化到磁盘文件. 更进一步的, Redis事务和RDB持久化机制没有任何关系, 因为RDB机制是对内存数据结构的全量的快照.由于AOF机制是一种增量持久化,所以事务中的命令序列会提交到AOF的缓存中.但是AOF机制将其缓存写入磁盘文件是由其配置的实现策略决定的,和Redis事务没有关系.

Redis事务API

从宏观上来讲, Redis事务开始后, 会缓存后续的操作命令及其操作数据,当事务提交时,原子性的执行缓存的命令序列.

从版本2.2开始,Redis提供了一种乐观的锁机制, 配合这种机制,Redis事务提交时, 变成了事务的条件执行. 具体的说,如果乐观锁失败了,事务提交时, 丢弃事务中的命令序列,如果乐观锁成功了, 事务提交时,才会执行其命令序列.当然,也可以不使用乐观锁机制, 在事务提交时, 无条件执行事务的命令序列.

Redis事务涉及到MULTI, EXEC, DISCARD, WATCH和UNWATCH这五个命令:

事务开始的命令是MULTI, 该命令返回OK提示信息. Redis不支持事务嵌套,执行多次MULTI命令和执行一次是相同的效果.嵌套执行MULTI命令时,Redis只是返回错误提示信息.

EXEC是事务的提交命令,事务中的命令序列将被执行(或者不被执行,比如乐观锁失败等).该命令将返回响应数组,其内容对应事务中的命令执行结果.

WATCH命令是开始执行乐观锁,该命令的参数是key(可以有多个), Redis将执行WATCH命令的客户端对象和key进行关联,如果其他客户端修改了这些key,则执行WATCH命令的客户端将被设置乐观锁失败的标志.该命令必须在事务开始前执行,即在执行MULTI命令前执行WATCH命令,否则执行无效,并返回错误提示信息.

UNWATCH命令将取消当前客户端对象的乐观锁key,该客户端对象的事务提交将变成无条件执行.

DISCARD命令将结束事务,并且会丢弃全部的命令序列.

需要注意的是,EXEC命令和DISCARD命令结束事务时,会调用UNWATCH命令,取消该客户端对象上所有的乐观锁key.

无条件提交

如果不使用乐观锁, 则事务为无条件提交.下面是一个事务执行的例子:

multi

+OK

incrkey1

+QUEUED

setkey2val2

+QUEUED

exec

*2

:1

+OK

当客户端开始事务后, 后续发送的命令将被Redis缓存起来,Redis向客户端返回响应提示字符串QUEUED.当执行EXEC提交事务时,缓存的命令依次被执行,返回命令序列的执行结果.

事务的错误处理

事务提交命令EXEC有可能会失败, 有三种类型的失败场景:

在事务提交之前,客户端执行的命令缓存失败.比如命令的语法错误(命令参数个数错误, 不支持的命令等等).如果发生这种类型的错误,Redis将向客户端返回包含错误提示信息的响应.

事务提交时,之前缓存的命令有可能执行失败.

由于乐观锁失败,事务提交时,将丢弃之前缓存的所有命令序列.

当发生第一种失败的情况下,客户端在执行事务提交命令EXEC时,将丢弃事务中所有的命令序列.下面是一个例子:

multi

+OK

incrnum1num2

-ERRwrongnumberofargumentsfor'incr'command

setkey1val1

+QUEUED

exec

-EXECABORTTransactiondiscardedbecauseofpreviouserrors.

命令incr num1 num2并没有缓存成功, 因为incr命令只允许有一个参数,是个语法错误的命令.Redis无法成功缓存该命令,向客户端发送错误提示响应.接下来的set key1 val1命令缓存成功.最后执行事务提交的时候,因为发生过命令缓存失败,所以事务中的所有命令序列被丢弃.

如果事务中的所有命令序列都缓存成功,在提交事务的时候,缓存的命令中仍可能执行失败.但Redis不会对事务做任何回滚补救操作.下面是一个这样的例子:

multi

+OK

setkey1val1

+QUEUED

lpopkey1

+QUEUED

incrnum1

+QUEUED

exec

*3

+OK

-WRONGTYPEOperationagainstakeyholdingthewrongkindofvalue

:1

所有的命令序列都缓存成功,但是在提交事务的时候,命令set key1 val1和incr num1执行成功了,Redis保存了其执行结果,但是命令lpop key1执行失败了.

乐观锁机制

Redis事务和乐观锁一起使用时,事务将成为有条件提交.

关于乐观锁,需要注意的是:

WATCH命令必须在MULTI命令之前执行. WATCH命令可以执行多次.

WATCH命令可以指定乐观锁的多个key,如果在事务过程中,任何一个key被其他客户端改变,则当前客户端的乐观锁失败,事务提交时,将丢弃所有命令序列.

多个客户端的WATCH命令可以指定相同的key.

WATCH命令指定乐观锁后,可以接着执行MULTI命令进入事务上下文,也可以在WATCH命令和MULTI命令之间执行其他命令. 具体使用方式取决于场景需求,不在事务中的命令将立即被执行.

如果WATCH命令指定的乐观锁的key,被当前客户端改变,在事务提交时,乐观锁不会失败.

如果WATCH命令指定的乐观锁的key具有超时属性,并且该key在WATCH命令执行后, 在事务提交命令EXEC执行前超时, 则乐观锁不会失败.如果该key被其他客户端对象修改,则乐观锁失败.

一个执行乐观锁机制的事务例子:

rpushlistv1v2v3

:3

watchlist

+OK

multi

+OK

lpoplist

+QUEUED

exec

*1

$2

v1

下面是另一个例子,乐观锁被当前客户端改变, 事务提交成功:

watchnum

+OK

multi

+OK

incrnum

+QUEUED

exec

*1

:2

Redis事务和乐观锁配合使用时, 可以构造实现单个Redis命令不能完成的更复杂的逻辑.

Redis事务的源码实现机制

首先,事务开始的MULTI命令执行的函数为multiCommand, 其实现为(multi.c):

voidmultiCommand(redisClient*c){

if(c->flags&REDIS_MULTI){

addReplyError(c,"MULTIcallscannotbenested");

return;

}

c->flags|=REDIS_MULTI;

addReply(c,shared.ok);

}

该命令只是在当前客户端对象上加上REDIS_MULTI标志, 表示该客户端进入了事务上下文.

客户端进入事务上下文后,后续执行的命令将被缓存. 函数processCommand是Redis处理客户端命令的入口函数, 其实现为(redis.c):

intprocessCommand(redisClient*c){

/*TheQUITcommandishandledseparately.Normalcommandprocswill

*gothroughcheckingforreplicationandQUITwillcausetrouble

*whenFORCE_REPLICATIONisenabledandwouldbeimplementedin

*aregularcommandproc.*/

if(!strcasecmp(c->argv[0]->ptr,"quit")){

addReply(c,shared.ok);

c->flags|=REDIS_CLOSE_AFTER_REPLY;

returnREDIS_ERR;

}

/*NowlookupthecommandandcheckASAPabouttrivialerrorconditions

*suchaswrongarity,badcommandnameandsoforth.*/

c->ccmd=c->lastcmd=lookupCommand(c->argv[0]->ptr);

if(!c->cmd){

flagTransaction(c);

addReplyErrorFormat(c,"unknowncommand'%s'",

(char*)c->argv[0]->ptr);

returnREDIS_OK;

}elseif((c->cmd->arity>0&&c->cmd->arity!=c->argc)||

(c->argc<-c->cmd->arity)){

flagTransaction(c);

addReplyErrorFormat(c,"wrongnumberofargumentsfor'%s'command",

c->cmd->name);

returnREDIS_OK;

}

/*Checkiftheuserisauthenticated*/

if(server.requirepass&&!c->authenticated&&c->cmd->proc!=authCommand)

{

flagTransaction(c);

addReply(c,shared.noautherr);

returnREDIS_OK;

}

/*Handlethemaxmemorydirective.

*

*Firstwetrytofreesomememoryifpossible(iftherearevolatile

*keysinthedataset).Iftherearenottheonlythingwecando

*isreturninganerror.*/

if(server.maxmemory){

intretval=freeMemoryIfNeeded();

/*freeMemoryIfNeededmayflushslaveoutputbuffers.Thismayresult

*intoaslave,thatmaybetheactiveclient,tobefreed.*/

if(server.current_client==NULL)returnREDIS_ERR;

/*Itwasimpossibletofreeenoughmemory,andthecommandtheclient

*istryingtoexecuteisdeniedduringOOMconditions?Error.*/

if((c->cmd->flags&REDIS_CMD_DENYOOM)&&retval==REDIS_ERR){

flagTransaction(c);

addReply(c,shared.oomerr);

returnREDIS_OK;

}

}

/*Don'tacceptwritecommandsifthereareproblemspersistingondisk

*andifthisisamasterinstance.*/

if(((server.stop_writes_on_bgsave_err&&

server.saveparamslen>0&&

server.lastbgsave_status==REDIS_ERR)||

server.aof_last_write_status==REDIS_ERR)&&

server.masterhost==NULL&&

(c->cmd->flags&REDIS_CMD_WRITE||

c->cmd->proc==pingCommand))

{

flagTransaction(c);

if(server.aof_last_write_status==REDIS_OK)

addReply(c,shared.bgsaveerr);

else

addReplySds(c,

sdscatprintf(sdsempty(),

"-MISCONFErrorswritingtotheAOFfile:%s\r\n",

strerror(server.aof_last_write_errno)));

returnREDIS_OK;

}

/*Don'tacceptwritecommandsiftherearenotenoughgoodslavesand

*userconfiguredthemin-slaves-to-writeoption.*/

if(server.masterhost==NULL&&

server.repl_min_slaves_to_write&&

server.repl_min_slaves_max_lag&&

c->cmd->flags&REDIS_CMD_WRITE&&

server.repl_good_slaves_count

{

flagTransaction(c);

addReply(c,shared.noreplicaserr);

returnREDIS_OK;

}

/*Don'tacceptwritecommandsifthisisareadonlyslave.But

*acceptwritecommandsifthisisourmaster.*/

if(server.masterhost&&server.repl_slave_ro&&

!(c->flags&REDIS_MASTER)&&

c->cmd->flags&REDIS_CMD_WRITE)

{

addReply(c,shared.roslaveerr);

returnREDIS_OK;

}

/*OnlyallowSUBSCRIBEandUNSUBSCRIBEinthecontextofPub/Sub*/

if(c->flags&REDIS_PUBSUB&&

c->cmd->proc!=pingCommand&&

c->cmd->proc!=subscribeCommand&&

c->cmd->proc!=unsubscribeCommand&&

c->cmd->proc!=psubscribeCommand&&

c->cmd->proc!=punsubscribeCommand){

addReplyError(c,"only(P)SUBSCRIBE/(P)UNSUBSCRIBE/QUITallowedinthiscontext");

returnREDIS_OK;

}

/*OnlyallowINFOandSLAVEOFwhenslave-serve-stale-dataisnoand

*weareaslavewithabrokenlinkwithmaster.*/

if(server.masterhost&&server.repl_state!=REDIS_REPL_CONNECTED&&

server.repl_serve_stale_data==0&&

!(c->cmd->flags&REDIS_CMD_STALE))

{

flagTransaction(c);

addReply(c,shared.masterdownerr);

returnREDIS_OK;

}

/*LoadingDB?Returnanerrorifthecommandhasnotthe

*REDIS_CMD_LOADINGflag.*/

if(server.loading&&!(c->cmd->flags&REDIS_CMD_LOADING)){

addReply(c,shared.loadingerr);

returnREDIS_OK;

}

/*Luascripttooslow?Onlyallowalimitednumberofcommands.*/

if(server.lua_timedout&&

c->cmd->proc!=authCommand&&

c->cmd->proc!=replconfCommand&&

!(c->cmd->proc==shutdownCommand&&

c->argc==2&&

tolower(((char*)c->argv[1]->ptr)[0])=='n')&&

!(c->cmd->proc==scriptCommand&&

c->argc==2&&

tolower(((char*)c->argv[1]->ptr)[0])=='k'))

{

flagTransaction(c);

addReply(c,shared.slowscripterr);

returnREDIS_OK;

}

/*Execthecommand*/

if(c->flags&REDIS_MULTI&&

c->cmd->proc!=execCommand&&c->cmd->proc!=discardCommand&&

c->cmd->proc!=multiCommand&&c->cmd->proc!=watchCommand)

{

queueMultiCommand(c);

addReply(c,shared.queued);

}else{

call(c,REDIS_CALL_FULL);

if(listLength(server.ready_keys))

handleClientsBlockedOnLists();

}

returnREDIS_OK;

}

Line145:151当客户端处于事务上下文时, 如果接收的是非事务命令(MULTI, EXEC, WATCH, DISCARD), 则调用queueMultiCommand将命令缓存起来,然后向客户端发送成功响应.

在函数processCommand中, 在缓存命令之前, 如果检查到客户端发送的命令不存在,或者命令参数个数不正确等情况, 会调用函数flagTransaction标命令缓存失败.也就是说,函数processCommand中, 所有调用函数flagTransaction的条件分支,都是返回失败响应.

缓存命令的函数queueMultiCommand的实现为(multi.c):

/*AddanewcommandintotheMULTIcommandsqueue*/

voidqueueMultiCommand(redisClient*c){

multiCmd*mc;

intj;

c->mstate.commands=zrealloc(c->mstate.commands,

sizeof(multiCmd)*(c->mstate.count+1));

mc=c->mstate.commands+c->mstate.count;

mc->ccmd=c->cmd;

mc->argc=c->argc;

mc->argv=zmalloc(sizeof(robj*)*c->argc);

memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);

for(j=0;jargc;j++)

incrRefCount(mc->argv[j]);

c->mstate.count++;

}

在事务上下文中, 使用multiCmd结构来缓存命令, 该结构定义为(redis.h):

/*ClientMULTI/EXECstate*/

typedefstructmultiCmd{

robj**argv;

intargc;

structredisCommand*cmd;

}multiCmd;

其中argv字段指向命令的参数内存地址,argc为命令参数个数, cmd为命令描述结构, 包括名字和函数指针等.

命令参数的内存空间已经使用动态分配记录于客户端对象的argv字段了, multiCmd结构的argv字段指向客户端对象redisClient的argv即可.

无法缓存命令时, 调用函数flagTransaction,该函数的实现为(multi.c):

/*FlagthetransacationasDIRTY_EXECsothatEXECwillfail.

*Shouldbecalledeverytimethereisanerrorwhilequeueingacommand.*/

voidflagTransaction(redisClient*c){

if(c->flags&REDIS_MULTI)

c->flags|=REDIS_DIRTY_EXEC;

}

该函数在客户端对象中设置REDIS_DIRTY_EXEC标志, 如果设置了这个标志, 事务提交时, 命令序列将被丢弃.

最后,在事务提交时, 函数processCommand中将调用call(c,REDIS_CALL_FULL);, 其实现为(redis.c):

/*Call()isthecoreofRedisexecutionofacommand*/

voidcall(redisClient*c,intflags){

longlongdirty,start,duration;

intcclient_old_flags=c->flags;

/*SentthecommandtoclientsinMONITORmode,onlyifthecommandsare

*notgeneratedfromreadinganAOF.*/

if(listLength(server.monitors)&&

!server.loading&&

!(c->cmd->flags&(REDIS_CMD_SKIP_MONITOR|REDIS_CMD_ADMIN)))

{

replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);

}

/*Callthecommand.*/

c->flags&=~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);

redisOpArrayInit(&server.also_propagate);

dirty=server.dirty;

start=ustime();

c->cmd->proc(c);

duration=ustime()-start;

dirty=server.dirty-dirty;

if(dirty<0)dirty=0;

/*WhenEVALiscalledloadingtheAOFwedon'twantcommandscalled

*fromLuatogointotheslowlogortopopulatestatistics.*/

if(server.loading&&c->flags&REDIS_LUA_CLIENT)

flags&=~(REDIS_CALL_SLOWLOG|REDIS_CALL_STATS);

/*IfthecallerisLua,wewanttoforcetheEVALcallertopropagate

*thescriptifthecommandflagorclientflagareforcingthe

*propagation.*/

if(c->flags&REDIS_LUA_CLIENT&&server.lua_caller){

if(c->flags&REDIS_FORCE_REPL)

server.lua_caller->flags|=REDIS_FORCE_REPL;

if(c->flags&REDIS_FORCE_AOF)

server.lua_caller->flags|=REDIS_FORCE_AOF;

}

/*LogthecommandintotheSlowlogifneeded,andpopulatethe

*per-commandstatisticsthatweshowinINFOcommandstats.*/

if(flags&REDIS_CALL_SLOWLOG&&c->cmd->proc!=execCommand){

char*latency_event=(c->cmd->flags&REDIS_CMD_FAST)?

"fast-command":"command";

latencyAddSampleIfNeeded(latency_event,duration/1000);

slowlogPushEntryIfNeeded(c->argv,c->argc,duration);

}

if(flags&REDIS_CALL_STATS){

c->cmd->microseconds+=duration;

c->cmd->calls++;

}

/*PropagatethecommandintotheAOFandreplicationlink*/

if(flags&REDIS_CALL_PROPAGATE){

intflags=REDIS_PROPAGATE_NONE;

if(c->flags&REDIS_FORCE_REPL)flags|=REDIS_PROPAGATE_REPL;

if(c->flags&REDIS_FORCE_AOF)flags|=REDIS_PROPAGATE_AOF;

if(dirty)

flags|=(REDIS_PROPAGATE_REPL|REDIS_PROPAGATE_AOF);

if(flags!=REDIS_PROPAGATE_NONE)

propagate(c->cmd,c->db->id,c->argv,c->argc,flags);

}

/*RestoretheoldFORCE_AOF/REPLflags,sincecallcanbeexecuted

*recursively.*/

c->flags&=~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);

c->flags|=client_old_flags&(REDIS_FORCE_AOF|REDIS_FORCE_REPL);

/*HandlethealsoPropagate()APItohandlecommandsthatwanttopropagate

*multipleseparatedcommands.*/

if(server.also_propagate.numops){

intj;

redisOp*rop;

for(j=0;j

rop=&server.also_propagate.ops[j];

propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,rop->target);

}

redisOpArrayFree(&server.also_propagate);

}

server.stat_numcommands++;

}

在函数call中通过执行c->cmd->proc(c);调用具体的命令函数.事务提交命令EXEC对应的执行函数为execCommand, 其实现为(multi.c):

voidexecCommand(redisClient*c){

intj;

robj**orig_argv;

intorig_argc;

structredisCommand*orig_cmd;

intmust_propagate=0;/*NeedtopropagateMULTI/EXECtoAOF/slaves?*/

if(!(c->flags&REDIS_MULTI)){

addReplyError(c,"EXECwithoutMULTI");

return;

}

/*CheckifweneedtoaborttheEXECbecause:

*1)SomeWATCHedkeywastouched.

*2)Therewasapreviouserrorwhilequeueingcommands.

*AfailedEXECinthefirstcasereturnsamultibulknilobject

*(technicallyitisnotanerrorbutaspecialbehavior),while

*inthesecondanEXECABORTerrorisreturned.*/

if(c->flags&(REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)){

addReply(c,c->flags&REDIS_DIRTY_EXEC?shared.execaborterr:

shared.nullmultibulk);

discardTransaction(c);

gotohandle_monitor;

}

/*Execallthequeuedcommands*/

unwatchAllKeys(c);/*UnwatchASAPotherwisewe'llwasteCPUcycles*/

orig_argv=c->argv;

orig_argc=c->argc;

orig_cmd=c->cmd;

addReplyMultiBulkLen(c,c->mstate.count);

for(j=0;jmstate.count;j++){

c->argc=c->mstate.commands[j].argc;

c->argv=c->mstate.commands[j].argv;

c->ccmd=c->mstate.commands[j].cmd;

/*PropagateaMULTIrequestonceweencounterthefirstwriteop.

*Thiswaywe'lldelivertheMULTI/..../EXECblockasawholeand

*boththeAOFandthereplicationlinkwillhavethesameconsistency

*andatomicityguarantees.*/

if(!must_propagate&&!(c->cmd->flags&REDIS_CMD_READONLY)){

execCommandPropagateMulti(c);

must_propagate=1;

}

call(c,REDIS_CALL_FULL);

/*Commandsmayalterargc/argv,restoremstate.*/

c->mstate.commands[j].argc=c->argc;

c->mstate.commands[j].argv=c->argv;

c->mstate.commands[j].cmd=c->cmd;

}

c->argv=orig_argv;

c->argc=orig_argc;

c->cmd=orig_cmd;

discardTransaction(c);

/*MakesuretheEXECcommandwillbepropagatedaswellifMULTI

*wasalreadypropagated.*/

if(must_propagate)server.dirty++;

handle_monitor:

/*SendEXECtoclientswaitingdatafromMONITOR.Wedoithere

*sincethenaturalorderofcommandsexecutionisactually:

*MUTLI,EXEC,...commandsinsidetransaction...

*InsteadEXECisflaggedasREDIS_CMD_SKIP_MONITORinthecommand

*table,andwedoitherewithcorrectordering.*/

if(listLength(server.monitors)&&!server.loading)

replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);

}

LINE8:11检查EXEC命令和MULTI命令是否配对使用, 单独执行EXEC命令是没有意义的.

LINE19:24检查客户端对象是否具有REDIS_DIRTY_CAS或者REDIS_DIRTY_EXEC标志, 如果存在,则调用函数discardTransaction丢弃命令序列, 向客户端返回失败响应.

如果没有检查到任何错误,则先执行unwatchAllKeys(c);取消该客户端上所有的乐观锁key.

LINE32:52依次执行缓存的命令序列,这里有两点需要注意的是:

事务可能需要同步到AOF缓存或者replica备份节点中.如果事务中的命令序列都是读操作, 则没有必要向AOF和replica进行同步.如果事务的命令序列中包含写命令,则MULTI, EXEC和相关的写命令会向AOF和replica进行同步.根据LINE41:44的条件判断,执行execCommandPropagateMulti(c);保证MULTI命令同步, LINE59检查EXEC命令是否需要同步, 即MULTI命令和EXEC命令必须保证配对同步.EXEC命令的同步执行在函数的call中LINE62propagate(c->cmd,c->db->id,c->argv,c->argc,flags);, 具体的写入命令由各自的执行函数负责同步.

这里执行命令序列时, 通过执行call(c,REDIS_CALL_FULL);所以call函数是递归调用.

所以,综上所述, Redis事务其本质就是,以不可中断的方式依次执行缓存的命令序列,将结果保存到内存cache中.

事务提交时, 丢弃命令序列会调用函数discardTransaction, 其实现为(multi.c):

voiddiscardTransaction(redisClient*c){

freeClientMultiState(c);

initClientMultiState(c);

c->flags&=~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);

unwatchAllKeys(c);

}

该函数调用freeClientMultiState释放multiCmd对象内存.调用initClientMultiState复位客户端对象的缓存命令管理结构.调用unwatchAllKeys取消该客户端的乐观锁.

WATCH命令执行乐观锁, 其对应的执行函数为watchCommand, 其实现为(multi.c):

voidwatchCommand(redisClient*c){

intj;

if(c->flags&REDIS_MULTI){

addReplyError(c,"WATCHinsideMULTIisnotallowed");

return;

}

for(j=1;jargc;j++)

watchForKey(c,c->argv[j]);

addReply(c,shared.ok);

}

进而调用函数watchForKey, 其实现为(multi.c):

/*Watchforthespecifiedkey*/

voidwatchForKey(redisClient*c,robj*key){

list*clients=NULL;

listIterli;

listNode*ln;

watchedKey*wk;

/*Checkifwearealreadywatchingforthiskey*/

listRewind(c->watched_keys,&li);

while((ln=listNext(&li))){

wk=listNodeValue(ln);

if(wk->db==c->db&&equalStringObjects(key,wk->key))

return;/*Keyalreadywatched*/

}

/*ThiskeyisnotalreadywatchedinthisDB.Let'saddit*/

clients=dictFetchValue(c->db->watched_keys,key);

if(!clients){

clients=listCreate();

dictAdd(c->db->watched_keys,key,clients);

incrRefCount(key);

}

listAddNodeTail(clients,c);

/*Addthenewkeytothelistofkeyswatchedbythisclient*/

wk=zmalloc(sizeof(*wk));

wk->keykey=key;

wk->db=c->db;

incrRefCount(key);

listAddNodeTail(c->watched_keys,wk);

}

关于乐观锁的key, 既保存于其客户端对象的watched_keys链表中, 也保存于全局数据库对象的watched_keys哈希表中.

LINE10:14检查客户端对象的链表中是否已经存在该key, 如果已经存在, 则直接返回.LINE16在全局数据库中返回该key对应的客户端对象链表, 如果链表不存在, 说明其他客户端没有使用该key作为乐观锁, 如果链表存在, 说明其他客户端已经使用该key作为乐观锁. LINE22将当前客户端对象记录于该key对应的链表中. LINE28将该key记录于当前客户端的key链表中.

当前客户端执行乐观锁以后, 其他客户端的写入命令可能修改该key值.所有具有写操作属性的命令都会执行函数signalModifiedKey, 其实现为(db.c):

voidsignalModifiedKey(redisDb*db,robj*key){

touchWatchedKey(db,key);

}

函数touchWatchedKey的实现为(multi.c):

/*"Touch"akey,sothatifthiskeyisbeingWATCHedbysomeclientthe

*nextEXECwillfail.*/

voidtouchWatchedKey(redisDb*db,robj*key){

list*clients;

listIterli;

listNode*ln;

if(dictSize(db->watched_keys)==0)return;

clients=dictFetchValue(db->watched_keys,key);

if(!clients)return;

/*MarkalltheclientswatchingthiskeyasREDIS_DIRTY_CAS*/

/*Checkifwearealreadywatchingforthiskey*/

listRewind(clients,&li);

while((ln=listNext(&li))){

redisClient*c=listNodeValue(ln);

c->flags|=REDIS_DIRTY_CAS;

}

}

语句if (dictSize(db->watched_keys) == 0) return;检查全局数据库中的哈希表watched_keys是否为空, 如果为空,说明没有任何客户端执行WATCH命令, 直接返回.如果该哈希表不为空, 取回该key对应的客户端链表结构,并把该链表中的每个客户端对象设置REDIS_DIRTY_CAS标志. 前面在EXEC的执行命令中,进行过条件判断, 如果客户端对象具有这个标志, 则丢弃事务中的命令序列.

在执行EXEC, DISCARD, UNWATCH命令以及在客户端结束连接的时候,都会取消乐观锁, 最终都会执行函数unwatchAllKeys, 其实现为(multi.c):

/*Unwatchallthekeyswatchedbythisclient.TocleantheEXECdirty

*flagisuptothecaller.*/

voidunwatchAllKeys(redisClient*c){

listIterli;

listNode*ln;

if(listLength(c->watched_keys)==0)return;

listRewind(c->watched_keys,&li);

while((ln=listNext(&li))){

list*clients;

watchedKey*wk;

/*Lookupthewatchedkey->clientslistandremovetheclient

*fromthelist*/

wk=listNodeValue(ln);

clients=dictFetchValue(wk->db->watched_keys,wk->key);

redisAssertWithInfo(c,NULL,clients!=NULL);

listDelNode(clients,listSearchKey(clients,c));

/*Killtheentryatallifthiswastheonlyclient*/

if(listLength(clients)==0)

dictDelete(wk->db->watched_keys,wk->key);

/*Removethiswatchedkeyfromtheclient->watchedlist*/

listDelNode(c->watched_keys,ln);

decrRefCount(wk->key);

zfree(wk);

}

}

语句if (listLength(c->watched_keys) == 0) return;判断如果当前客户端对象的watched_keys链表为空,说明当前客户端没有执行WATCH命令,直接返回.如果该链表非空, 则依次遍历该链表中的key, 并从该链表中删除key, 同时,获得全局数据库中的哈希表watched_keys中该key对应的客户端链表, 删除当前客户端对象.

关键词:Redis事务

关于我们 加入我们 广告服务 网站地图

All Rights Reserved, Copyright 2004-2020 www.ctocio.com.cn

如有意见请与我们联系 邮箱:5 53 13 8 [email protected]

豫ICP备20005723号    IT专家网 版权所有