likes
comments
collection
share

举一反 N,解读 etcd watch 源码实现

作者站长头像
站长
· 阅读数 20

etcd 内置了 Watcher 机制,允许应用程序监视 etcd 中存储的键值对的更改,并在发生更改时接收通知。这种监视模式是实现分布式系统中的自动化和协同操作的重要组成部分。通过 Watcher 机制,应用程序可以实现很多功能,如实时通知、动态配置、负载均衡等。

一个最基础的 watch 命令如下,client1 开启一个 watch,监听一个名为 stock 的 key,client2 对 store 进行对 key put 操作,client1 会收到 key 上产生的 put 事件:

# client1
etcdctl watch stock
# ...等待事件到来

# client2
etcdctl put stock 100

# client1
etcdctl watch stock
# 收到 PUT 事件
PUT
stock
100

下面围绕这个基础例子看看 Watch 是如何实现的。

Client 处理

在分析客户端实现之前,首先需要的是,客户端与服务端通信使用了 gRPC 的双向流模式 ,这种模式支持 gRPC 客户端与服务端的双向读写,一个伪代码例子如下:

// 调用 gRPC 方法获取一个双向的流
stream, err := client.GetAStream(context.Background())
waitc := make(chan struct{})
go func() {
    for {
        // 从流中读取数据
        in, err := stream.Recv()
        if err == io.EOF {
          close(waitc)
          return
        }
        log.Printf("Got message %s", in)
    }
}()
for _, note := range notes {
	// 写数据到流中
    if err := stream.Send(note); err != nil {
        log.Fatalf("Failed to send a note: %v", err)
    }
}
stream.CloseSend()
<-waitc

watch 命令的处理函数中,调用了 getWatchChan() -> c.Watch() 函数来获取一个接收 Watch 结果的 channel,然后读取 channel 中的结果,并将 watch 产生的事件输出。

// file: etcdctl/ctlv3/command/watch_command.go
func watchCommandFunc(cmd *cobra.Command, args []string) {
    // 创建客户端
    c := mustClientFromCmd(cmd)
    // 获取 Watch 结果的 Channel
    wc, err := getWatchChan(c, watchArgs)

    // 读取 channel 并输出结果
    printWatchCh(c, wc, execArgs)
    ...
}

func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
    ...
    return c.Watch(clientv3.WithRequireLeader(context.Background()), key, opts...), nil
}

Watch() 在一个给定的 key 上进行 watch 操作,返回一个 WatchChan 用于接收事件通知。在一个主循环中,先获取一个锁,然后检查当前是否已经有一个双向流与给定的 ctx 相关联。如果没有,就调用 newWatcherGrpcStream() 函数创建一个新的流,并将其与 ctx 相关联,随后将请求提交并等待接收 WatchChan 结果的 channel,收到结果后函数返回。在 etcdctl 中,返回后的结果也就被上面的printWatchCh() 读取并输出结果。

// file: client/v3/watch.go
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
    // watch 请求
    wr := &watchRequest{...}
    ...
    for {
        w.mu.Lock()
        ...
        wgs := w.streams[ctxKey]
        if wgs == nil {
            // 为 watcher 创建一个 gRPC 双向流
            wgs = w.newWatcherGrpcStream(ctx)
            w.streams[ctxKey] = wgs
        }
        // 用于接收已就绪的请求
        reqc := wgs.reqc
        w.mu.Unlock()

        select {
        // 提交请求
        case reqc <- wr:
            ok = true
        ...
        }

        if ok {
            select {
            // 接收 Watch 结果 channel
            case ret := <-wr.retc:
                return ret
            ...
            }
            break
        }
    }
}

接下来看看 gRPC 双向流是如何被创建的。wgs.run() 是一个管理 watcher client 的 goroutine,它调用了w.newWatchClient()开启与 WatchServer 的 gRPC 双向流,然后在一个循环中等待就绪的 watch 请求与结果,这里 <-w.reqc 接收到的请求就是上面主 goroutine 的 Watch()case reqc <- wr:提交过来的。

// file: client/v3/watch.go
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
    ctx, cancel := context.WithCancel(&valCtx{inctx})
    wgs := &watchGrpcStream{
        owner:      w,
        remote:     w.remote, // gRPC 的 WatchClient,对应 WatchServer,创建 Client 时赋值
        callOpts:   w.callOpts,
        ctx:        ctx,
        ctxKey:     streamKeyFromCtx(inctx),
        cancel:     cancel,
        substreams: make(map[int64]*watcherStream),
        respc:      make(chan *pb.WatchResponse),
        reqc:       make(chan watchStreamRequest),
        donec:      make(chan struct{}),
        errc:       make(chan error, 1),
        closingc:   make(chan *watcherStream),
        resumec:    make(chan struct{}),
        lg:         w.lg,
    }
    go wgs.run()
    return wgs
}

func (w *watchGrpcStream) run() {
    ...
    // 开启与 WatchServer 的 gRPC 双向流
    if wc, closeErr = w.newWatchClient(); closeErr != nil {
        return
    }
    
    ...
    for {
        select {
        // 接收就绪的 watch 请求
        case req := <-w.reqc:
        ...
        // 接收 watch 结果事件
        case pbresp := <-w.respc:
        ...
        ...
        }
    }
}

先来看w.newWatchClient()创建双向流的过程,对于我们的示例来说,最主要的流程就是调用 w.openWatchClient() 调用了 gRPC 服务,然后开启一个 goroutine 接收服务端的流,调用 w.serveWatchClient() 不断接收服务端发送过来的结果流,接收到结果后会推送到 w.respc channel 中,随后会被 run() goroutine 接收到并处理。

func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
    ...
    // 调用 gRPC 服务端
    wc, err := w.openWatchClient()

    ...
    // 接收服务端推过来的流
    go w.serveWatchClient(wc)
}

func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
    for {
        ...
        // 调用 WatchServer 的 Watch 方法,与服务端建立双向流连接
        if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
            break
        }
    }
    return ws, nil
}

func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
    for {
        // 从服务端接收流,即 Watch 响应结果
        resp, err := wc.Recv()
        if err != nil {
            select {
            case w.errc <- err:
            case <-w.donec:
            }
            return
        }
        select {
        // 将结果推送到 w.respc channel,由 run goroutine 中接收
        case w.respc <- resp:
        case <-w.donec:
            return
        }
    }
}

到现在,我们 run() goroutine 中的请求 channel 和响应 channel 的数据来源都知道了,接下来分别看看它们是如何处理请求和响应的。请求的处理就是将请求结构体生成一个 protobuf 并通过 gRPC stream 的 Send() 方法发给服务端。根据 proto 的定义,响应的结果类型为 WatchResponse,收到响应结果后,调用w.dispatchEvent() -> w.unicastResponse()将结果发送给对应的 watcher stream 的 recvc channel,最后再由 w.serveSubstream() goroutine 推送到 c.Watch() 返回的 WatchChan 中。

func (w *watchGrpcStream) run() {
    ...
    for {
        select {
        // 接收就绪的 watch 请求
        case req := <-w.reqc:
            switch wreq := req.(type) {
            case *watchRequest:
                ...
                ws := &watcherStream{
                    initReq: *wreq,
                    id:      InvalidWatchID,
                    outc:    outc,
                    // unbuffered so resumes won't cause repeat events
                    recvc: make(chan *WatchResponse),
                }

                ...
                // 获取 watch 响应结果并进行进一步传递处理
                go w.serveSubstream(ws, w.resumec)
                w.resuming = append(w.resuming, ws)
                if len(w.resuming) == 1 {
                    // 将 watch 请求通过 gRPC 流发送到服务端
                    // ws.initReq.toPB() 将 watchRequest 转成 protobuf 形式
                    if err := wc.Send(ws.initReq.toPB()); err != nil {
                        w.lg.Debug("error when sending request", zap.Error(err))
                    }
                }
                ...
            // 接收 watch 结果事件
            case pbresp := <-w.respc:
                switch {
                case pbresp.Created:
                    ...
                    if len(w.resuming) != 0 {
                        if ws := w.resuming[0]; ws != nil {
                            w.dispatchEvent(pbresp)
                            w.resuming[0] = nil
                        }
                    }
                }
            ...
        }
    }
}
    
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
    // Watch 时产生的事件
    events := make([]*Event, len(pbresp.Events))
    for i, ev := range pbresp.Events {
        events[i] = (*Event)(ev)
    }
    wr := &WatchResponse{
        Header:          *pbresp.Header,
        Events:          events,
        CompactRevision: pbresp.CompactRevision,
        Created:         pbresp.Created,
        Canceled:        pbresp.Canceled,
        cancelReason:    pbresp.CancelReason,
    }
    ...
    return w.unicastResponse(wr, pbresp.WatchId)
}
    
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
    ws, ok := w.substreams[watchId]
    if !ok {
        return false
    }
    select {
    // 将结果发送到 watcher stream 的 recvc channel
    case ws.recvc <- wr:
    case <-ws.donec:
        return false
    }
    return true
}

最终的 WatchChan 就是包含了监听的 key 上的所有事件的 channel,我们可以根据业务逻辑来对不同的事件进行处理,如实时通知、动态配置更新等。在 etcdctl 的处理中,即命令行的处理中,只是使用 printWatchCh() 将事件输出到命令行。

type WatchChan <-chan WatchResponse

type WatchResponse struct {
    Header pb.ResponseHeader
    // key 上发生的事件列表
    Events []*Event

    // CompactRevision is the minimum revision the watcher may receive.
    CompactRevision int64

    // Canceled is used to indicate watch failure.
    // If the watch failed and the stream was about to close, before the channel is closed,
    // the channel sends a final response that has Canceled set to true with a non-nil Err().
    Canceled bool

    // 表示是否是一个创建 watcher 的请求
    Created bool

    closeErr error

    // cancelReason is a reason of canceling watch
    cancelReason string
}

在客户端的处理中,我们知道了一个 watch 请求就是与服务端创建了一个 gRPC 双向流,客户端向服务端发送一些请求(如创建 watcher),并从服务端接收响应,然后客户端对响应结果进行相应的处理(如 etcdctl 打印事件)。当然客户端还有着许多其他逻辑的处理,但目前对于我们最简单的示例来说已经足够了,对其他逻辑的具体实现流程可以以这个流程作为参考来阅读。

Server 处理

在 etcd 服务端的启动时会注册 proto 文件中定义的一些 gRPC 服务,这些服务为客户端提供了相应的 gRPC 方法。

// file: server/etcdserver/api/v3rpc/grpc.go
grpcServer := grpc.NewServer(append(opts, gopts...)...)

// 实现 KVServer,提供了 Put()、Txn() 等方法
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
// 实现 WatchServer,提供了 Watch() 方法
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))

客户端使用 WatchClient 调用到 Watch 方法,与服务端建立一个 gRPC 双向流。服务端 Watch() 方法的主要逻辑就是开启一个 recv loop goroutine 用于接收客户端的请求、一个 send loop goroutine 用于向客户端发送相应。

// file: server/etcdserver/api/v3rpc/watch.go
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
    // 创建一个服务端 watch stream 实例
    sws := serverWatchStream{
        lg: ws.lg,

        clusterID: ws.clusterID,
        memberID:  ws.memberID,

        maxRequestBytes: ws.maxRequestBytes,

        sg:        ws.sg,
        // mvcc watchableStore
        watchable: ws.watchable,
        ag:        ws.ag,

        gRPCStream:  stream,
        watchStream: ws.watchable.NewWatchStream(),
        // chan for sending control response like watcher created and canceled.
        ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),

        progress: make(map[mvcc.WatchID]bool),
        prevKV:   make(map[mvcc.WatchID]bool),
        fragment: make(map[mvcc.WatchID]bool),

        closec: make(chan struct{}),
    }
    
    // send loop goroutine
    go func() {
        sws.sendLoop()
    }()
    
    // recv loop goroutine
    go func() {
        if rerr := sws.recvLoop(); rerr != nil {
            ...
        }
    }()
    
    ...
}

serverWatchStream 实例中 watch 相关的关键成员 watchablewatchStreamwatchable 是 etcd 核心的 mvcc 存储 mvcc.WatchableKV,它的实现是 watchableStore,在初始化 etcd 服务实例时会被创建。watchStream 则用于监听在 mvcc 存储中的 key 上产生的事件。

// file: server/storage/mvcc/watchable_store.go
func (s *watchableStore) NewWatchStream() WatchStream {
    return &watchStream{
        watchable: s,
        // 用于接收事件结果
        ch:        make(chan WatchResponse, chanBufLen),
        cancels:   make(map[WatchID]cancelFunc),
        // 用于记录存储中存在的 watcher
        watchers:  make(map[WatchID]*watcher),
    }
}

当客户端调用到 Watch() 时,recv loop 会先收到一个创建 watcher 的请求,调用 watchStream.Watch() 方法创建 watcher 并将响应结果推送到 sws.ctrlStream channel。

func (sws *serverWatchStream) recvLoop() error {
    for {
        // 阻塞 goroutine 从客户端流接收请求
        req, err := sws.gRPCStream.Recv()
        ...
        switch uv := req.RequestUnion.(type) {
        case *pb.WatchRequest_CreateRequest:
            ...
            // 创建 watcher
            id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)

            // 响应结果
            wr := &pb.WatchResponse{
                Header:   sws.newResponseHeader(wsrev),
                WatchId:  int64(id),
                Created:  true,
                Canceled: err != nil,
            }
            select {
            case sws.ctrlStream <- wr:
            case <-sws.closec:
                return nil
            }
        }
    }
}

watchStream.Watch() 为 wacher 生成一个最新的 ID,并调用 mvcc 存储的 watch()创建 watcher 实例、更新 revision 等,然后将 watcherid 的对应关系加入 watchers 映射。

// file: server/storage/mvcc/watcher.go
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
    ...
    ws.mu.Lock()
    defer ws.mu.Unlock()

    // 生成 watch ID
    if id == clientv3.AutoWatchID {
        for ws.watchers[ws.nextID] != nil {
            ws.nextID++
        }
        id = ws.nextID
        ws.nextID++
    } else if _, ok := ws.watchers[id]; ok {
        return -1, ErrWatcherDuplicateID
    }

    // 创建 watcher、更新 revision 等
    w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)

    ws.watchers[id] = w
    return id, nil
}

ws.watchable.watch() 创建了实际的 watcher 实例,然后加上一些相关的锁保证并发安全,然后根据 watcher 的同步情况将 watcher 加入到 watchableStorewatcherGroup 中,watcherGroup 是一个用于管理 watchers 的集合。

// file: server/storage/mvcc/watchable_store.go
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
    // 创建 watcher 实例
    wa := &watcher{
        key:    key,
        end:    end,
        minRev: startRev,
        id:     id,
        ch:     ch,
        fcs:    fcs,
    }
    
    s.mu.Lock() // 存储锁
    s.revMu.RLock() // revision 读锁
    // 根据当前 watcher 的 revision 和当前 mvcc 存储的 revision 判断 wacher 是否已经与最新的存储同步
    // 对于创建 watcher 的请求来说,默认是已同步的
    synced := startRev > s.store.currentRev || startRev == 0
    if synced {
        wa.minRev = s.store.currentRev + 1
        if startRev > wa.minRev {
            wa.minRev = startRev
        }
        // 加入已同步的 watcherGroup
        s.synced.add(wa)
    } else {
        slowWatcherGauge.Inc()
        // 加入未同步的 watcherGroup
        s.unsynced.add(wa)
    }
    s.revMu.RUnlock()
    s.mu.Unlock()
    
    return wa, func() { s.cancelWatcher(wa) }
}

s.synced.add()s.unsynced.add()watcher 加入到 WatcherGroup 中,简单来说,就是将 watcher 即监听的 key 的对应关系存入相关的映射表。如果是 watch 一个范围(range)的话,还会加入到 Interval tree(区间树)中。

// file: server/storage/mvcc/watcher_group.go
func (wg *watcherGroup) add(wa *watcher) {
    // 加入 watchers map,确保不会存在相同的 watcher
    wg.watchers.add(wa)
    if wa.end == nil {
        // 加入 keyWatchers map,存储 watcher 和需要 watch 的 key 的对应关系
        wg.keyWatchers.add(wa)
        return
    }

    // interval already registered?
    ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
    if iv := wg.ranges.Find(ivl); iv != nil {
        iv.Val.(watcherSet).add(wa)
        return
    }

    // not registered, put in interval tree
    ws := make(watcherSet)
    ws.add(wa)
    wg.ranges.Insert(ivl, ws)
}

最后,watcher 的 ID 会被返回到 recvLoop() 中, 然后将创建 watcher 请求的响应结果推送到 sws.ctrlStream channel,ctrlStream 会在 sendLoop() 中进行处理。

在分析 sendLoop() 之前,我们可以先思考一下:一个正在 watch 的客户端是如何知道另一个客户端发生了 Put 操作?前面我们已经知道了,Watch 请求的本质就是在 etcd 的 mvcc 存储了一个 watcher 以及创建了与 key 的映射,而 gRPC 双向流只是 watch 的客户端与服务端通信的一种方式。根据这些,我们自然可以猜到,当进行一个客户端进行 Put 操作时,也能读取到 mvcc 存储中保存的 watchers,只需要根据 key 找到对应的 watcher,并以某种形式将 Put 事件通知到这个 watcher,然后将结果发送到双向流中,即实现了 watch 的效果。

接下来进入到 sendLoop(),对于我们的示例来说,最主要的逻辑就是将响应结果发送到 gRPC 流中。前面创建 watcher 请求的结果会被推送到 sws.ctrlStream channel,创建的结果会被发送到 gRPC 流,创建 watcher 是不携带任何事件(Event)的,客户端会收到一个 Created 字段为 true 的 WatchResponsewatchStream.Chan() 通常是接收 key 上产生的事件的响应结果,主要逻辑也是最终将结果发送出去。

// file: server/etcdserver/api/v3rpc/watch.go
func (sws *serverWatchStream) sendLoop() {
    ...
    for {
        select {
        // sws.watchStream.Chan() 即 watcher 实例的 ch
        case wresp, ok := <-sws.watchStream.Chan():
            ...
            // 处理事件
            evs := wresp.Events
            events := make([]*mvccpb.Event, len(evs))
            sws.mu.RLock()
            needPrevKV := sws.prevKV[wresp.WatchID]
            sws.mu.RUnlock()
            for i := range evs {
                events[i] = &evs[i]
                if needPrevKV && !IsCreateEvent(evs[i]) {
                    opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
                    r, err := sws.watchable.Range(context.TODO(), evs[i].Kv.Key, nil, opt)
                    if err == nil && len(r.KVs) != 0 {
                        events[i].PrevKv = &(r.KVs[0])
                    }
                }
            }

            // 生成响应结果并发送给客户端
            wr := &pb.WatchResponse{
                Header:          sws.newResponseHeader(wresp.Revision),
                WatchId:         int64(wresp.WatchID),
                Events:          events,
                CompactRevision: wresp.CompactRevision,
                Canceled:        canceled,
            }
            var serr error
            if !fragmented && !ok {
                serr = sws.gRPCStream.Send(wr)
            }
        case c, ok := <-sws.ctrlStream:
            if err := sws.gRPCStream.Send(c); err != nil {...}
            ...
        }
    }
}

我们已经知道,sendLoop()sws.ctrlStream 的数据源是创建 watcher 时产生的。sws.watchStream.Chan() 的数据源却是由各个操作(如 Put)产生的。以一个 Put 操作举例,在 Put 的事务释放时,最终会执行到 watchableStoreTxnWrite.End(),然后会调用 tw.s.notify()

// file: server/storage/mvcc/watchable_store_txn.go
func (tw *watchableStoreTxnWrite) End() {
    changes := tw.Changes()
    if len(changes) == 0 {
        tw.TxnWrite.End()
        return
    }

    rev := tw.Rev() + 1
    // 收集 key 产生的事件
    evs := make([]mvccpb.Event, len(changes))
    for i, change := range changes {
        evs[i].Kv = &changes[i]
        if change.CreateRevision == 0 {
            evs[i].Type = mvccpb.DELETE
            evs[i].Kv.ModRevision = rev
        } else {
            evs[i].Type = mvccpb.PUT
        }
    }
    ...
    tw.s.notify(rev, evs)
}

notify() 将 key 上发生的事件通知到监听在这个 key 上的 watchernewWatcherBatch() 根据产生了事件的 key 从 watchers 集合里找出正在监听的 watcher,前面我们知道,每次创建一个 watcher 都会加入到集合中。找到 watcher 后会调用 w.send()WatchResponse 发送到 watcherch

// file: server/storage/mvcc/watchable_store.go
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
    victim := make(watcherBatch)
    // 找出监听在 key 上的 watcher
    for w, eb := range newWatcherBatch(&s.synced, evs) {
        ... 
        // 生成 WatchResponse 并发送到 watcher 的 ch
        if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
            pendingEventsGauge.Add(float64(len(eb.evs)))
        }
    }
}

func (w *watcher) send(wr WatchResponse) bool {
    ...
    select {
    // 响应结果发送到 watcher 的 ch
    case w.ch <- wr:
        return true
    default:
        return false
    }
}

完成通知后,sendLoop() 会接收到这个响应结果,即从 sws.watchStream.Chan() 中读到了刚才携带了事件的WatchResponse,然后将其发送给客户端,客户端收到后,对结果进行相应的处理,在我们的示例中,最终会将事件输出到终端。

func (sws *serverWatchStream) sendLoop() {
    ...
    for {
        select {
        // watcher 实例的 ch
        case wresp, ok := <-sws.watchStream.Chan():
            // 发送给客户端
        }
    }
}

总结

相较于 Put、Delete 操作等同步的一元 gRPC 方法,Watch 则是更复杂一些的异步 gRPC 双向流,其中使用了大量的 channel 来处理不同的逻辑,总结下来的流程大致如下:

  1. 客户端发送 Watch 请求,与服务端建立 gRPC 双向流。
  2. 客户端阻塞等待 Watch 的响应结果 WatchResponse
  3. 服务端收到请求后开启一个 recv loop 和一个 send loop,分别用来读取客户端请求和发送相应。
  4. 服务端创建 watcher 并加入到 mvcc 存储的 watchers 集合中管理。
  5. 其它操作(如 Put)完成后将 key 上发生事件通知到对应的 watcher
  6. 服务端的 send loop 将响应结果发送给客户端。
  7. 客户端对结果进行业务逻辑处理。

Watch 的这种机制,就是一种发布-订阅模式,它在存储 store 上封装了一层 watchableStore 用来实现 watch 功能。通过 etcd 的 watch 实现,我们似乎也能类比到 Redis 的 pub/sub 机制,它们都允许客户端订阅特定的频道或键,并在这些频道或键发生变化时接收通知。当然 watch 还有许多有趣的功能,基于这一篇分析,相信我们也能更顺利的阅读我们感兴趣的其它部分。

转载自:https://juejin.cn/post/7255588784589635640
评论
请登录