func(db*DB)Update(fnfunc(txn*Txn)error)error{ifdb.IsClosed(){// 如果已经关闭, 直接返回错误returnErrDBClosed}ifdb.opt.managedTxns{// 如果处于自动模式, 直接panic, 因为 Update 方法只能在自动模式下使用panic("Update can only be used with managedDB=false.")}txn:=db.NewTransaction(true)// 新建事务defertxn.Discard()iferr:=fn(txn);err!=nil{// 执行用户逻辑returnerr}returntxn.Commit()// 提交事务}
typeTxnstruct{readTsuint64// 定义了事务能"看到"的数据版本范围, 确保事务读取的是一致的快照commitTsuint64sizeint64countint64db*DBreads[]uint64// contains fingerprints of keys read.conflictKeysmap[uint64]struct{}// 保存写入的key的指纹, 用来做冲突检测.readsLocksync.Mutex// guards the reads slice. See addReadKey.pendingWritesmap[string]*Entry// 缓存待提交的数据.duplicateWrites[]*Entry// Used in managed mode to store duplicate entries.numIteratorsatomic.Int32discardedbool// 事务是否被丢弃doneReadboolupdatebool// 区分读写事务还是只读事务, 控制写入权限}
func(txn*Txn)Commit()error{// 关闭冲突检测时, conflictKeys为空, 还要检测这个pendingWrites, 索性直接检测 pendingWrites 是否为空iflen(txn.pendingWrites)==0{// pendingWrites 为空, 说明当前事务没有进行任何写操作, 可能是读事务或者没有操作, 直接释放资源就可以.txn.Discard()returnnil}// 检查是否被丢弃, 或者在自动模式下是否正确设置了时间戳iferr:=txn.commitPrecheck();err!=nil{returnerr}defertxn.Discard()txnCb,err:=txn.commitAndSend()iferr!=nil{returnerr}// If batchSet failed, LSM would not have been updated. So, no need to rollback anything.// TODO: What if some of the txns successfully make it to value log, but others fail.// Nothing gets updated to LSM, until a restart happens.returntxnCb()}
func(txn*Txn)commitAndSend()(func()error,error){orc:=txn.db.orcorc.writeChLock.Lock()// 加锁, 确保事务获取提交时间戳的顺序与被推送到写入通道中的顺序一样deferorc.writeChLock.Unlock()commitTs,conflict:=orc.newCommitTs(txn)// 检查之前看过的数据是否仍然有效ifconflict{returnnil,ErrConflict}keepTogether:=truesetVersion:=func(e*Entry){ife.version==0{e.version=commitTs}else{keepTogether=false}}// 给所有待写入的数据分配版本号for_,e:=rangetxn.pendingWrites{setVersion(e)}// The duplicateWrites slice will be non-empty only if there are duplicate// entries with different versions.for_,e:=rangetxn.duplicateWrites{setVersion(e)}entries:=make([]*Entry,0,len(txn.pendingWrites)+len(txn.duplicateWrites)+1)processEntry:=func(e*Entry){// Suffix the keys with commit ts, so the key versions are sorted in// descending order of commit timestamp.e.Key=y.KeyWithTs(e.Key,e.version)// Add bitTxn only if these entries are part of a transaction. We// support SetEntryAt(..) in managed mode which means a single// transaction can have entries with different timestamps. If entries// in a single transaction have different timestamps, we don't add the// transaction markers.ifkeepTogether{e.meta|=bitTxn}entries=append(entries,e)}// The following debug information is what led to determining the cause of// bank txn violation bug, and it took a whole bunch of effort to narrow it// down to here. So, keep this around for at least a couple of months.// var b strings.Builder// fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",// txn.readTs, commitTs, txn.reads, txn.conflictKeys)for_,e:=rangetxn.pendingWrites{processEntry(e)}for_,e:=rangetxn.duplicateWrites{processEntry(e)}ifkeepTogether{// CommitTs should not be zero if we're inserting transaction markers.y.AssertTrue(commitTs!=0)e:=&Entry{Key:y.KeyWithTs(txnKey,commitTs),Value:[]byte(strconv.FormatUint(commitTs,10)),meta:bitFinTxn,}entries=append(entries,e)}// 从一个requestPool中获取一个请求, 将entries放入req中, 最后将req发送到通道 writeCh 中req,err:=txn.db.sendToWriteCh(entries)iferr!=nil{orc.doneCommit(commitTs)returnnil,err}ret:=func()error{err:=req.Wait()// Wait before marking commitTs as done.// We can't defer doneCommit above, because it is being called from a// callback here.orc.doneCommit(commitTs)returnerr}returnret,nil}