举一反 N,解读 etcd watch 源码实现
etcd 内置了 Watcher 机制,允许应用程序监视 etcd 中存储的键值对的更改,并在发生更改时接收通知。这种监视模式是实现分布式系统中的自动化和协同操作的重要组成部分。通过 Watcher 机制,应用程序可以实现很多功能,如实时通知、动态配置、负载均衡等。
一个最基础的 watch 命令如下,client1 开启一个 watch,监听一个名为 stock 的 key,client2 对 store 进行对 key put 操作,client1 会收到 key 上产生的 put 事件:
# client1
etcdctl watch stock
# ...等待事件到来
# client2
etcdctl put stock 100
# client1
etcdctl watch stock
# 收到 PUT 事件
PUT
stock
100
下面围绕这个基础例子看看 Watch 是如何实现的。
Client 处理
在分析客户端实现之前,首先需要的是,客户端与服务端通信使用了 gRPC 的双向流模式 ,这种模式支持 gRPC 客户端与服务端的双向读写,一个伪代码例子如下:
// 调用 gRPC 方法获取一个双向的流
stream, err := client.GetAStream(context.Background())
waitc := make(chan struct{})
go func() {
for {
// 从流中读取数据
in, err := stream.Recv()
if err == io.EOF {
close(waitc)
return
}
log.Printf("Got message %s", in)
}
}()
for _, note := range notes {
// 写数据到流中
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
在 watch
命令的处理函数中,调用了 getWatchChan()
-> c.Watch()
函数来获取一个接收 Watch 结果的 channel,然后读取 channel 中的结果,并将 watch 产生的事件输出。
// file: etcdctl/ctlv3/command/watch_command.go
func watchCommandFunc(cmd *cobra.Command, args []string) {
// 创建客户端
c := mustClientFromCmd(cmd)
// 获取 Watch 结果的 Channel
wc, err := getWatchChan(c, watchArgs)
// 读取 channel 并输出结果
printWatchCh(c, wc, execArgs)
...
}
func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
...
return c.Watch(clientv3.WithRequireLeader(context.Background()), key, opts...), nil
}
Watch()
在一个给定的 key 上进行 watch 操作,返回一个 WatchChan
用于接收事件通知。在一个主循环中,先获取一个锁,然后检查当前是否已经有一个双向流与给定的 ctx
相关联。如果没有,就调用 newWatcherGrpcStream()
函数创建一个新的流,并将其与 ctx
相关联,随后将请求提交并等待接收 WatchChan 结果的 channel,收到结果后函数返回。在 etcdctl 中,返回后的结果也就被上面的printWatchCh()
读取并输出结果。
// file: client/v3/watch.go
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
// watch 请求
wr := &watchRequest{...}
...
for {
w.mu.Lock()
...
wgs := w.streams[ctxKey]
if wgs == nil {
// 为 watcher 创建一个 gRPC 双向流
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
// 用于接收已就绪的请求
reqc := wgs.reqc
w.mu.Unlock()
select {
// 提交请求
case reqc <- wr:
ok = true
...
}
if ok {
select {
// 接收 Watch 结果 channel
case ret := <-wr.retc:
return ret
...
}
break
}
}
}
接下来看看 gRPC 双向流是如何被创建的。wgs.run()
是一个管理 watcher client 的 goroutine,它调用了w.newWatchClient()
开启与 WatchServer 的 gRPC 双向流,然后在一个循环中等待就绪的 watch 请求与结果,这里 <-w.reqc
接收到的请求就是上面主 goroutine 的 Watch()
中case reqc <- wr:
提交过来的。
// file: client/v3/watch.go
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
owner: w,
remote: w.remote, // gRPC 的 WatchClient,对应 WatchServer,创建 Client 时赋值
callOpts: w.callOpts,
ctx: ctx,
ctxKey: streamKeyFromCtx(inctx),
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan watchStreamRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
lg: w.lg,
}
go wgs.run()
return wgs
}
func (w *watchGrpcStream) run() {
...
// 开启与 WatchServer 的 gRPC 双向流
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
...
for {
select {
// 接收就绪的 watch 请求
case req := <-w.reqc:
...
// 接收 watch 结果事件
case pbresp := <-w.respc:
...
...
}
}
}
先来看w.newWatchClient()
创建双向流的过程,对于我们的示例来说,最主要的流程就是调用 w.openWatchClient()
调用了 gRPC 服务,然后开启一个 goroutine 接收服务端的流,调用 w.serveWatchClient()
不断接收服务端发送过来的结果流,接收到结果后会推送到 w.respc
channel 中,随后会被 run()
goroutine 接收到并处理。
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
...
// 调用 gRPC 服务端
wc, err := w.openWatchClient()
...
// 接收服务端推过来的流
go w.serveWatchClient(wc)
}
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
for {
...
// 调用 WatchServer 的 Watch 方法,与服务端建立双向流连接
if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
break
}
}
return ws, nil
}
func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
for {
// 从服务端接收流,即 Watch 响应结果
resp, err := wc.Recv()
if err != nil {
select {
case w.errc <- err:
case <-w.donec:
}
return
}
select {
// 将结果推送到 w.respc channel,由 run goroutine 中接收
case w.respc <- resp:
case <-w.donec:
return
}
}
}
到现在,我们 run()
goroutine 中的请求 channel 和响应 channel 的数据来源都知道了,接下来分别看看它们是如何处理请求和响应的。请求的处理就是将请求结构体生成一个 protobuf 并通过 gRPC stream 的 Send()
方法发给服务端。根据 proto 的定义,响应的结果类型为 WatchResponse
,收到响应结果后,调用w.dispatchEvent()
-> w.unicastResponse()
将结果发送给对应的 watcher stream 的 recvc
channel,最后再由 w.serveSubstream()
goroutine 推送到 c.Watch()
返回的 WatchChan
中。
func (w *watchGrpcStream) run() {
...
for {
select {
// 接收就绪的 watch 请求
case req := <-w.reqc:
switch wreq := req.(type) {
case *watchRequest:
...
ws := &watcherStream{
initReq: *wreq,
id: InvalidWatchID,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
...
// 获取 watch 响应结果并进行进一步传递处理
go w.serveSubstream(ws, w.resumec)
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// 将 watch 请求通过 gRPC 流发送到服务端
// ws.initReq.toPB() 将 watchRequest 转成 protobuf 形式
if err := wc.Send(ws.initReq.toPB()); err != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
...
// 接收 watch 结果事件
case pbresp := <-w.respc:
switch {
case pbresp.Created:
...
if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil {
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
}
}
...
}
}
}
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
// Watch 时产生的事件
events := make([]*Event, len(pbresp.Events))
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
CompactRevision: pbresp.CompactRevision,
Created: pbresp.Created,
Canceled: pbresp.Canceled,
cancelReason: pbresp.CancelReason,
}
...
return w.unicastResponse(wr, pbresp.WatchId)
}
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
ws, ok := w.substreams[watchId]
if !ok {
return false
}
select {
// 将结果发送到 watcher stream 的 recvc channel
case ws.recvc <- wr:
case <-ws.donec:
return false
}
return true
}
最终的 WatchChan
就是包含了监听的 key 上的所有事件的 channel,我们可以根据业务逻辑来对不同的事件进行处理,如实时通知、动态配置更新等。在 etcdctl 的处理中,即命令行的处理中,只是使用 printWatchCh()
将事件输出到命令行。
type WatchChan <-chan WatchResponse
type WatchResponse struct {
Header pb.ResponseHeader
// key 上发生的事件列表
Events []*Event
// CompactRevision is the minimum revision the watcher may receive.
CompactRevision int64
// Canceled is used to indicate watch failure.
// If the watch failed and the stream was about to close, before the channel is closed,
// the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool
// 表示是否是一个创建 watcher 的请求
Created bool
closeErr error
// cancelReason is a reason of canceling watch
cancelReason string
}
在客户端的处理中,我们知道了一个 watch 请求就是与服务端创建了一个 gRPC 双向流,客户端向服务端发送一些请求(如创建 watcher),并从服务端接收响应,然后客户端对响应结果进行相应的处理(如 etcdctl 打印事件)。当然客户端还有着许多其他逻辑的处理,但目前对于我们最简单的示例来说已经足够了,对其他逻辑的具体实现流程可以以这个流程作为参考来阅读。
Server 处理
在 etcd 服务端的启动时会注册 proto 文件中定义的一些 gRPC 服务,这些服务为客户端提供了相应的 gRPC 方法。
// file: server/etcdserver/api/v3rpc/grpc.go
grpcServer := grpc.NewServer(append(opts, gopts...)...)
// 实现 KVServer,提供了 Put()、Txn() 等方法
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
// 实现 WatchServer,提供了 Watch() 方法
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
客户端使用 WatchClient
调用到 Watch
方法,与服务端建立一个 gRPC 双向流。服务端 Watch()
方法的主要逻辑就是开启一个 recv loop goroutine 用于接收客户端的请求、一个 send loop goroutine 用于向客户端发送相应。
// file: server/etcdserver/api/v3rpc/watch.go
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
// 创建一个服务端 watch stream 实例
sws := serverWatchStream{
lg: ws.lg,
clusterID: ws.clusterID,
memberID: ws.memberID,
maxRequestBytes: ws.maxRequestBytes,
sg: ws.sg,
// mvcc watchableStore
watchable: ws.watchable,
ag: ws.ag,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool),
prevKV: make(map[mvcc.WatchID]bool),
fragment: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}
// send loop goroutine
go func() {
sws.sendLoop()
}()
// recv loop goroutine
go func() {
if rerr := sws.recvLoop(); rerr != nil {
...
}
}()
...
}
serverWatchStream
实例中 watch 相关的关键成员 watchable
和 watchStream
,watchable
是 etcd 核心的 mvcc 存储 mvcc.WatchableKV
,它的实现是 watchableStore
,在初始化 etcd 服务实例时会被创建。watchStream
则用于监听在 mvcc 存储中的 key 上产生的事件。
// file: server/storage/mvcc/watchable_store.go
func (s *watchableStore) NewWatchStream() WatchStream {
return &watchStream{
watchable: s,
// 用于接收事件结果
ch: make(chan WatchResponse, chanBufLen),
cancels: make(map[WatchID]cancelFunc),
// 用于记录存储中存在的 watcher
watchers: make(map[WatchID]*watcher),
}
}
当客户端调用到 Watch()
时,recv loop 会先收到一个创建 watcher 的请求,调用 watchStream.Watch()
方法创建 watcher 并将响应结果推送到 sws.ctrlStream
channel。
func (sws *serverWatchStream) recvLoop() error {
for {
// 阻塞 goroutine 从客户端流接收请求
req, err := sws.gRPCStream.Recv()
...
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
...
// 创建 watcher
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
// 响应结果
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
Created: true,
Canceled: err != nil,
}
select {
case sws.ctrlStream <- wr:
case <-sws.closec:
return nil
}
}
}
}
watchStream.Watch()
为 wacher 生成一个最新的 ID,并调用 mvcc 存储的 watch()
创建 watcher
实例、更新 revision 等,然后将 watcher
与 id
的对应关系加入 watchers
映射。
// file: server/storage/mvcc/watcher.go
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
...
ws.mu.Lock()
defer ws.mu.Unlock()
// 生成 watch ID
if id == clientv3.AutoWatchID {
for ws.watchers[ws.nextID] != nil {
ws.nextID++
}
id = ws.nextID
ws.nextID++
} else if _, ok := ws.watchers[id]; ok {
return -1, ErrWatcherDuplicateID
}
// 创建 watcher、更新 revision 等
w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
ws.watchers[id] = w
return id, nil
}
ws.watchable.watch()
创建了实际的 watcher
实例,然后加上一些相关的锁保证并发安全,然后根据 watcher
的同步情况将 watcher
加入到 watchableStore
的 watcherGroup
中,watcherGroup
是一个用于管理 watchers 的集合。
// file: server/storage/mvcc/watchable_store.go
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
// 创建 watcher 实例
wa := &watcher{
key: key,
end: end,
minRev: startRev,
id: id,
ch: ch,
fcs: fcs,
}
s.mu.Lock() // 存储锁
s.revMu.RLock() // revision 读锁
// 根据当前 watcher 的 revision 和当前 mvcc 存储的 revision 判断 wacher 是否已经与最新的存储同步
// 对于创建 watcher 的请求来说,默认是已同步的
synced := startRev > s.store.currentRev || startRev == 0
if synced {
wa.minRev = s.store.currentRev + 1
if startRev > wa.minRev {
wa.minRev = startRev
}
// 加入已同步的 watcherGroup
s.synced.add(wa)
} else {
slowWatcherGauge.Inc()
// 加入未同步的 watcherGroup
s.unsynced.add(wa)
}
s.revMu.RUnlock()
s.mu.Unlock()
return wa, func() { s.cancelWatcher(wa) }
}
s.synced.add()
、s.unsynced.add()
将 watcher
加入到 WatcherGroup
中,简单来说,就是将 watcher 即监听的 key 的对应关系存入相关的映射表。如果是 watch 一个范围(range)的话,还会加入到 Interval tree(区间树)中。
// file: server/storage/mvcc/watcher_group.go
func (wg *watcherGroup) add(wa *watcher) {
// 加入 watchers map,确保不会存在相同的 watcher
wg.watchers.add(wa)
if wa.end == nil {
// 加入 keyWatchers map,存储 watcher 和需要 watch 的 key 的对应关系
wg.keyWatchers.add(wa)
return
}
// interval already registered?
ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
if iv := wg.ranges.Find(ivl); iv != nil {
iv.Val.(watcherSet).add(wa)
return
}
// not registered, put in interval tree
ws := make(watcherSet)
ws.add(wa)
wg.ranges.Insert(ivl, ws)
}
最后,watcher
的 ID 会被返回到 recvLoop()
中, 然后将创建 watcher
请求的响应结果推送到 sws.ctrlStream
channel,ctrlStream
会在 sendLoop()
中进行处理。
在分析 sendLoop()
之前,我们可以先思考一下:一个正在 watch 的客户端是如何知道另一个客户端发生了 Put 操作?前面我们已经知道了,Watch 请求的本质就是在 etcd 的 mvcc 存储了一个 watcher 以及创建了与 key 的映射,而 gRPC 双向流只是 watch 的客户端与服务端通信的一种方式。根据这些,我们自然可以猜到,当进行一个客户端进行 Put 操作时,也能读取到 mvcc 存储中保存的 watchers,只需要根据 key 找到对应的 watcher,并以某种形式将 Put 事件通知到这个 watcher,然后将结果发送到双向流中,即实现了 watch 的效果。
接下来进入到 sendLoop()
,对于我们的示例来说,最主要的逻辑就是将响应结果发送到 gRPC 流中。前面创建 watcher 请求的结果会被推送到 sws.ctrlStream
channel,创建的结果会被发送到 gRPC 流,创建 watcher 是不携带任何事件(Event)的,客户端会收到一个 Created
字段为 true 的 WatchResponse
。watchStream.Chan()
通常是接收 key 上产生的事件的响应结果,主要逻辑也是最终将结果发送出去。
// file: server/etcdserver/api/v3rpc/watch.go
func (sws *serverWatchStream) sendLoop() {
...
for {
select {
// sws.watchStream.Chan() 即 watcher 实例的 ch
case wresp, ok := <-sws.watchStream.Chan():
...
// 处理事件
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
sws.mu.RLock()
needPrevKV := sws.prevKV[wresp.WatchID]
sws.mu.RUnlock()
for i := range evs {
events[i] = &evs[i]
if needPrevKV && !IsCreateEvent(evs[i]) {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(context.TODO(), evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
}
}
}
// 生成响应结果并发送给客户端
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wresp.Revision),
WatchId: int64(wresp.WatchID),
Events: events,
CompactRevision: wresp.CompactRevision,
Canceled: canceled,
}
var serr error
if !fragmented && !ok {
serr = sws.gRPCStream.Send(wr)
}
case c, ok := <-sws.ctrlStream:
if err := sws.gRPCStream.Send(c); err != nil {...}
...
}
}
}
我们已经知道,sendLoop()
中 sws.ctrlStream
的数据源是创建 watcher 时产生的。sws.watchStream.Chan()
的数据源却是由各个操作(如 Put)产生的。以一个 Put 操作举例,在 Put 的事务释放时,最终会执行到 watchableStoreTxnWrite.End()
,然后会调用 tw.s.notify()
。
// file: server/storage/mvcc/watchable_store_txn.go
func (tw *watchableStoreTxnWrite) End() {
changes := tw.Changes()
if len(changes) == 0 {
tw.TxnWrite.End()
return
}
rev := tw.Rev() + 1
// 收集 key 产生的事件
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes {
evs[i].Kv = &changes[i]
if change.CreateRevision == 0 {
evs[i].Type = mvccpb.DELETE
evs[i].Kv.ModRevision = rev
} else {
evs[i].Type = mvccpb.PUT
}
}
...
tw.s.notify(rev, evs)
}
notify()
将 key 上发生的事件通知到监听在这个 key 上的 watcher
。newWatcherBatch()
根据产生了事件的 key 从 watchers
集合里找出正在监听的 watcher
,前面我们知道,每次创建一个 watcher
都会加入到集合中。找到 watcher
后会调用 w.send()
将 WatchResponse
发送到 watcher
的 ch
。
// file: server/storage/mvcc/watchable_store.go
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
victim := make(watcherBatch)
// 找出监听在 key 上的 watcher
for w, eb := range newWatcherBatch(&s.synced, evs) {
...
// 生成 WatchResponse 并发送到 watcher 的 ch
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
}
}
}
func (w *watcher) send(wr WatchResponse) bool {
...
select {
// 响应结果发送到 watcher 的 ch
case w.ch <- wr:
return true
default:
return false
}
}
完成通知后,sendLoop()
会接收到这个响应结果,即从 sws.watchStream.Chan()
中读到了刚才携带了事件的WatchResponse
,然后将其发送给客户端,客户端收到后,对结果进行相应的处理,在我们的示例中,最终会将事件输出到终端。
func (sws *serverWatchStream) sendLoop() {
...
for {
select {
// watcher 实例的 ch
case wresp, ok := <-sws.watchStream.Chan():
// 发送给客户端
}
}
}
总结
相较于 Put、Delete 操作等同步的一元 gRPC 方法,Watch 则是更复杂一些的异步 gRPC 双向流,其中使用了大量的 channel 来处理不同的逻辑,总结下来的流程大致如下:
- 客户端发送 Watch 请求,与服务端建立 gRPC 双向流。
- 客户端阻塞等待 Watch 的响应结果
WatchResponse
。 - 服务端收到请求后开启一个 recv loop 和一个 send loop,分别用来读取客户端请求和发送相应。
- 服务端创建
watcher
并加入到 mvcc 存储的 watchers 集合中管理。 - 其它操作(如 Put)完成后将 key 上发生事件通知到对应的
watcher
。 - 服务端的 send loop 将响应结果发送给客户端。
- 客户端对结果进行业务逻辑处理。
Watch 的这种机制,就是一种发布-订阅模式,它在存储 store
上封装了一层 watchableStore
用来实现 watch 功能。通过 etcd 的 watch 实现,我们似乎也能类比到 Redis 的 pub/sub 机制,它们都允许客户端订阅特定的频道或键,并在这些频道或键发生变化时接收通知。当然 watch 还有许多有趣的功能,基于这一篇分析,相信我们也能更顺利的阅读我们感兴趣的其它部分。
转载自:https://juejin.cn/post/7255588784589635640