likes
comments
collection
share

从源码实现了解 etcd 事务

作者站长头像
站长
· 阅读数 19

在 etcd 中,事务是一组原子性操作,可以确保多个操作之间的原子性,并且可以确保一组操作在执行期间不会被其他操作中断。

下面是一个最简单的事务示例,txn 表示开启一个事务,在 compares 中,输入事务的执行条件,即 user1 = "bad",如果满足条件,则删除 user1,否则将 user1 设置为 "good"

etcdctl txn --interactive

compares:
value("user1") = "bad"

success requests (get, put, delete):
del user1

failure requests (get, put, delete):
put user1 good

Client 处理

从源码实现了解 etcd 事务

与之前一样,txnCommandFunc() 中也是调用了 mustClientFromCmd() 将命令转为 Client 然后执行。

// file: etcdctl/ctlv3/command/txn_command.go
func txnCommandFunc(cmd *cobra.Command, args []string) {
    ...
    // 用于读取命令行输入
    reader := bufio.NewReader(os.Stdin)

    // 创建一个事务
    txn := mustClientFromCmd(cmd).Txn(context.Background())
    promptInteractive("compares:")
    // If 分支(compares)
    txn.If(readCompares(reader)...)
    // 打印到标准输出
    promptInteractive("success requests (get, put, del):")
    // Then 分支(success)
    txn.Then(readOps(reader)...)
    promptInteractive("failure requests (get, put, del):")
    // Else 分支(failure)
    txn.Else(readOps(reader)...)

    // 提交事务到 Server
    resp, err := txn.Commit()
    ...
}

客户端的 Txn() 方法创建了一个事务 txn 实例,实现了 Txn 接口。它包含了一套事务的完整方法。对于我们的示例来说,txn 结构体有三个主要成员:compssusfas,分别对应命令行中的 comparessuccessfailure

// file: client/v3/txn.go
type Txn interface {
    If(cs ...Cmp) Txn
    Then(ops ...Op) Txn
    Else(ops ...Op) Txn
    Commit() (*TxnResponse, error)
}

type txn struct {
    ...
    cmps []*pb.Compare // compares
    sus []*pb.RequestOp // success requests
    fas []*pb.RequestOp // failure requests
    ...
}

// file: client/v3/kv.go
func (kv *kv) Txn(ctx context.Context) Txn {
    return &txn{
        kv:       kv,
        ctx:      ctx,
        callOpts: kv.callOpts,
    }
}

If 分支中的 readCompares() 用于解析命令行中的 compares,将每一行的输入调用 ParseCompare() 转成一个 clientv3.Cmp 结构列表,ThenElse 分支则使用 readOps() 生成 clientv3.Op 结构列表。三个分支的核心方法就是将转换后的操作对象赋给自己对应的成员数组。

// file: client/v3/txn.go
func (txn *txn) If(cs ...Cmp) Txn {
    ...
    for i := range cs {
        txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
    }
}

func (txn *txn) Then(ops ...Op) Txn {
    ...
    for _, op := range ops {
        txn.sus = append(txn.sus, op.toRequestOp())
    }
}

func (txn *txn) Else(ops ...Op) Txn {
    ...
    for _, op := range ops {
        txn.fas = append(txn.fas, op.toRequestOp())
    }
}

完成分支操作的处理后,会调用 txn.Commit() 发送 gRPC 请求将事务提交到服务端。

func (txn *txn) Commit() (*TxnResponse, error) {
    ...
    r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}

    var resp *pb.TxnResponse
    var err error
    // 调用 KVClient 的 Txn 方法发送 gRPC 请求
    resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
    if err != nil {
        return nil, toErr(txn.ctx, err)
    }
    return (*TxnResponse)(resp), nil
}

Server 处理

func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
    ...
    ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
    // 发送 Raft 请求
    resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
    if err != nil {
        return nil, err
    }
    return resp.(*pb.TxnResponse), nil
}

Raft 节点收到的请求之后,会交由运行 etcd 服务时的调度器执行。

// file: server/etcdserver/server.go
func (s *EtcdServer) run() {
    ...
    for {
        select {
        case ap := <-s.r.apply():
            f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
            sched.Schedule(f)
        }
    }
}
// file: server/etcdserver/apply/apply.go
func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
    return mvcctxn.Txn(ctx, a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
}

mvcctxn.Txn() 方法内的大致处理逻辑如下:

从源码实现了解 etcd 事务

在进入方法之前,先来理清楚 applierV3backend 下事务相关的一些属性的关系,如 kvkv 是 etcd 的 MVCC 键值存储,在服务端运行创建 EtcdServer 实例时会被创建,之后 EtcdServer 会将它传递给 applierV3backend 来处理存储。

// file: server/etcdserver/server.go
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
    ...
    srv = &EtcdServer{...}
    srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
}

MVCC 存储的接口之间的关系如下:

// file: server/storage/mvcc/kv.go
type WatchableKV interface {
    KV
    Watchable
}

type KV interface {
    ReadView
    WriteView
    Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead
    Write(trace *traceutil.Trace) TxnWrite
    HashStorage() HashStorage
    Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)
    Commit()
    Restore(b backend.Backend) error
    Close() error
}

type Watchable interface {
    NewWatchStream() WatchStream
}

type ReadView interface {
    FirstRev() int64
    Rev() int64
    Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
}

type WriteView interface {
    DeleteRange(key, end []byte) (n, rev int64)
    Put(key, value []byte, lease lease.LeaseID) (rev int64)
}

mvcc.New() 创建调用到了 newWatchableStore() 创建了一个 watchableStore 结构体实例,它实现了 WatchableKV 接口。newWatchableStore() 又创建了一个 store 结构体实例,它实现了 KV 接口,以及 ReadViewWriteView 接口。store 实现了 KV 接口。

// file: server/storage/mvcc/watchable_store.go
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
    return newWatchableStore(lg, b, le, cfg)
}

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
    ...
    s := &watchableStore{
        store:    NewStore(lg, b, le, cfg),
        ...
    }
    s.store.ReadView = &readView{s}
    s.store.WriteView = &writeView{s}
}
// file: server/storage/mvcc/kvstore.go
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
    ...
    s := &store{
        b:       b,
        kvindex: newTreeIndex(lg),
        currentRev:     1,
        compactMainRev: -1,
        ...
    }
    ...
    // BoltDB Bucket 相关
    tx := s.b.BatchTx()
    tx.LockOutsideApply()
    tx.UnsafeCreateBucket(schema.Key)
    schema.UnsafeCreateMetaBucket(tx)
    tx.Unlock()
    ...
}

// file: server/storage/mvcc/index.go
func newTreeIndex(lg *zap.Logger) index {
    return &treeIndex{
        // 创建 B+ 树
        tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
          return aki.Less(bki)
        }),
        lg: lg,
    }
}

接下来,回到 mvcctxn.Txn() 方法,首先会判断该事务是否是为写事务,如果 SuccessFailure 操作集合中全部都是读操作,则会认为是一个只读事务。如果是一个写事务,会先创建一个读事务来获取 Compare 分支的结果来决定事务应该执行 Success 分支还是 Failure 分支,即 ThenElse,之后对分支中的请求进行检查,完成读相关的操作之后释放读事务,创建一个写事务开始执行最终分支中的操作,最后释放事务并返回结果。

// file: server/etcdserver/txn/txn.go
func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
    // 是否为写事务
    isWrite := !IsTxnReadonly(rt)

    // 创建读事务,使用 TxnWrite 包装
    var txnWrite mvcc.TxnWrite
    txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.SharedBufReadTxMode, trace))

    // 执行 Compares 分支,判断事物的执行路径(Success 或 Failure)
    var txnPath []bool
    trace.StepWithFunction(
        func() {
            txnPath = compareToPath(txnWrite, rt)
        },
        "compare",
    )

    // 检查请求
    if isWrite {
        if _, err := checkRequests(txnWrite, rt, txnPath,
            func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil {
            txnWrite.End()
            return nil, nil, err
        }
    }
    if _, err := checkRequests(txnWrite, rt, txnPath, checkRequestRange); err != nil {
        txnWrite.End()
        return nil, nil, err
    }

    // 创建事务返回值句柄
    txnResp, _ := newTxnResp(rt, txnPath)

    // 释放读事务并创建写事务
    if isWrite {
        txnWrite.End()
        txnWrite = kv.Write(trace)
    }

    // 执行最终分支
    _, err := applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp)
    ...

    // 变更事务的 Revision
    rev := txnWrite.Rev()
    if len(txnWrite.Changes()) != 0 {
        rev++
    }

    // 结束事务
    txnWrite.End()
    ...
}

mvcc.NewReadOnlyTxnWrite(kv.Read()) 用来创建一个只读事务,重点的代码在 kv.Read() 中,从前面的接口关系我们知道,kv.Read() 实际调用的是 storeRead() 方法,它为 store 本身以及部分资源都加上了读锁,其它事务的操作不会被阻塞。读锁创建之后,使用 mvcc.NewReadOnlyTxnWrite() ,表示一个只读事务。

// file: server/storage/mvcc/kvstore_txn.go
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
    // store 加上读锁
    s.mu.RLock()
    // 为当前 Revision 加上读锁防止被其它事务修改
    s.revMu.RLock()

    // 这里可以认为是创建一个 BoltDB 后端存储的读事务
    var tx backend.ReadTx
    ...
    tx = s.b.ReadTx()
    tx.RLock()

    // 获取当前 Revision 并释放读锁
    firstRev, rev := s.compactMainRev, s.currentRev
    s.revMu.RUnlock()
    ...
}

读事务创建完之后,调用 compareToPath() 执行 Compare 分支中的操作,其中又调用了 applyCompares(),如果 Compare 结果的分支中有嵌套的事务,那么会递归地调用本方法继续执行 Compare

// file: server/etcdserver/txn/txn.go
func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool {
    txnPath := make([]bool, 1)
    ops := rt.Success
    if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] {
        ops = rt.Failure
    }

    // 执行嵌套事务 Compare
    for _, op := range ops {
      tv, ok := op.Request.(*pb.RequestOp_RequestTxn)
      if !ok || tv.RequestTxn == nil {
        continue
      }
      txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...)
    }
}

applyCompares() 执行 Compare 分支中的每一个操作,如果有其中一个不满足条件则返回 false,进入 Failure 分支,否则进入 Success 分支。

// file: server/etcdserver/txn/txn.go
func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool {
    for _, c := range cmps {
        if !applyCompare(rv, c) {
          	return false
        }
    }
    return true
}

applyCompare() 执行 Compare 操作,它首先调用 rv.Range() 查找 Compare 中给定的 key 值,然后与给定的值进行比较,最终返回一个布尔值结果。

// file: server/etcdserver/txn/txn.go
func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
  	// 查找键值对
  	rr, err := rv.Range(context.TODO(), c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
  	...
  
  	// 比较值
    if len(rr.KVs) == 0 {
        if c.Target == pb.Compare_VALUE {
          	return false
        }
        return compareKV(c, mvccpb.KeyValue{})
    }
    for _, kv := range rr.KVs {
        if !compareKV(c, kv) {
          	return false
        }
    }
    return true
}

rv.Range() 最终会进入到 storeTxnRead.Range() 方法中,storeTxnRead 就是之前创建的只读事务 mvcc.TxnWrite 的接口实现。tr.rangeKey() 会将 key 和 revision 作为 store 中 B+ 树的索引,从树中查找对应的 key 的元数据,如果树中存在,则从磁盘中(BoltDB)中找出 key 的值。(还记得 Put 操作吗?Put 就是往 B+ 树和磁盘中存储键值对)。

// file: server/storage/mvcc/kvstore_txn.go
func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
    return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
}

func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
    ...
    // 索引查找
    revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
    if len(revpairs) == 0 {
        // 没有值则直接返回
        return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
    }
    ...
  
  	// 存在键值对则从磁盘中取出实际的值
    kvs := make([]mvccpb.KeyValue, limit)
    for i, revpair := range revpairs[:len(kvs)] {
        ...
        _, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
        ...
        kvs[i].Unmarshal(vs[0])
    }
}

applyCompare()rv.Range() 执行完之后,开始调用 compareKV() 进行比较,如果 rv.Range() 没有查出键值对且 Compare 操作为值类型比较的话,会直接返回 false,applyCompare(),我们的示例操作即是如此,但这里还是浅看一下 compareKV() 是如何比较的。

// file: server/etcdserver/txn/txn.go
func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
    var result int
    rev := int64(0)
    switch c.Target {
    case pb.Compare_VALUE:
        var v []byte
        // 从 Compare.TargetUnion 中取出给定的值进行比较
        if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
            v = tv.Value
        }
        // 字节比较
        result = bytes.Compare(ckv.Value, v)
        ...
    }

    switch c.Result {
    case pb.Compare_EQUAL:
        return result == 0
    ...
    }
}

回到 mvcc.Txn() 中,我们已经完成了 Compare 分支的工作,然后会检查请求的 key、版本号等等,这里不再细看。之后将读事务释放,创建一个写事务,最后执行 Compare 结果分支中的操作,执行完后累加事务的 Revision 并释放写事务。如果写事务或其中的嵌套事务执行操作时出现错误,那么会直接将服务 panic。也就是说,etcd 并不支持内部自动回滚事务,如果需要回滚,需要在程序错误处理中手动编写回滚逻辑。

func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
    ...
    var txnPath []bool
    trace.StepWithFunction(
        func() {
          txnPath = compareToPath(txnWrite, rt)
        },
        "compare",
    )

    // 检查请求...

    ...
    if isWrite {
        // 释放读事务
        txnWrite.End()
        // 创建写事务
        txnWrite = kv.Write(trace)
    }

    ...
    // 执行最终分支操作
    _, err := applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp)
    if err != nil {
        if isWrite {
            // 事务中的写操作出现异常,直接 panic
            txnWrite.End()
            lg.Panic("unexpected error during txn with writes", zap.Error(err))
        } else {
            // 非写事务仅输出错误日志
            lg.Error("unexpected error during readonly txn", zap.Error(err))
        }
    }

    // 更新 Revision 并释放写锁
    rev := txnWrite.Rev()
    if len(txnWrite.Changes()) != 0 {
      	rev++
    }
    txnWrite.End()
}

读事务的释放就是将之前在 store 和后端存储上加的读锁释放。写事务会给 store 加上读锁,在执行写事务时其它事务还是能够读取 store 中的资源,然后获取后端存储(BoltDB)的事务(bucket)并为其加上写锁,写事务执行期间无法访问存储中的值,保证数据的正确性。

// file: server/storage/mvcc/kvstore_txn.go
func (tr *storeTxnRead) End() {
    tr.tx.RUnlock() 
    tr.s.mu.RUnlock()
}

func (s *store) Write(trace *traceutil.Trace) TxnWrite {
    // store 加读锁
    s.mu.RLock()
    tx := s.b.BatchTx()
    // 后端存储加写锁
    tx.LockInsideApply()
    tw := &storeTxnWrite{
        storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
        tx:           tx,
        beginRev:     s.currentRev,
        changes:      make([]mvccpb.KeyValue, 0, 4),
    }
    // newMetricsTxnWrite 包装事务,目的是为了测量和统计一些数据
    return newMetricsTxnWrite(tw)
}
// file: server/etcdserver/txn/txn.go
func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
    // 获取最终执行的分支
    reqs := rt.Success
    if !txnPath[0] {
        reqs = rt.Failure
    }

    // 执行分支中的所有操作
    for i, req := range reqs {
        respi := tresp.Responses[i].Response
        switch tv := req.Request.(type) {
        ...
        case *pb.RequestOp_RequestPut:
          // 进入 mvcc.Put 方法
          resp, _, err := Put(ctx, lg, lessor, kv, txnWrite, tv.RequestPut)
          if err != nil {
                return 0, fmt.Errorf("applyTxn: failed Put: %w", err)
          }
          respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
          ...
        }
    }
}

成功执行完成后会释放写事务,更新 store 的 revision 并将之前加的读锁和写锁释放。

func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
    ...
    txnWrite.End()
}

func (tw *storeTxnWrite) End() {
    // 修改 store revision
    if len(tw.changes) != 0 {
        tw.s.revMu.Lock()
        tw.s.currentRev++
    }
    // 释放后端存储写锁
    tw.tx.Unlock()
    if len(tw.changes) != 0 {
        tw.s.revMu.Unlock()
    }
    // 释放 store 读锁
    tw.s.mu.RUnlock()
}

事务执行完成后,最终将事务结果返回到 EtcdServerTxn 方法中,然后将结果响应给 gRPC 客户端。

func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
    ...
    ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
    resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
    if err != nil {
        return nil, err
    }
    return resp.(*pb.TxnResponse), nil
}

总结

本文以一个最简单的示例了解事务的源码实现,在客户端的处理中,会将用户不同分支的输入生成为对应分支的的操作类型,然后跟其它请求一样,向服务端发送一个 gRPC 请求,同样的,服务端收到请求后会发送 raft 请求给到其它节点,最后调用 mvcc 存储来处理事务。在分析事务之前,我们还梳理了 mvcc 存储中一些核心的成员关系,如 store、B+ 树、revision、锁等。

示例中的写事务的处理流程如下:

  1. 创建一个读事务,即 mvcc 的读锁。
  2. 获取 If 分支中的结果,得出一个最终的执行路径(ThenElse),然后检查请求的有效性。
  3. 释放读事务,创建一个写事务,即 mvcc 的写锁。
  4. 执行 If 结果分支中的操作,在我们的示例中结果是一个 Put 操作,将当前写事务传给 Put 方法,最后将键值对存入 B+ 树与 bucket,更新事务的 revision,释放写事务。

每个 raft 节点(etcd 服务)都有自己的 mvcc 存储(store),当一个事务请求发送给其它 raft 节点时它们也会各自将自己的存储锁住,保证了节点间的数据一致性。需要注意的是,如果事务的结果分支中存在多个写操作,其中一个写操作出现错误时会导致 panic,可能需要在程序中手动编写回滚键值对的代码。

除此之外,还有对只读事务的一些处理,感兴趣的同学可以自行翻阅一下。

转载自:https://juejin.cn/post/7253969759191466043
评论
请登录