likes
comments
collection
share

学习 etcd 存储的第一步,从 Etcd 框架和 Put 操作源码说起(详细版)

作者站长头像
站长
· 阅读数 17

etcd 是一个分布式、高可用的键值存储系统,它被设计为可靠的、安全的、快速的,并具有简单的API。

etcd 使用 Go 语言开发,基于 Raft 算法实现了分布式一致性。它可以用于存储集群中的关键配置信息、服务发现、锁等。

etcd 的数据模型类似于一个简单的文件系统,支持 PUT、GET、DELETE 等操作,每个节点的数据会自动同步到其他节点上,因此可以实现高可用、自动故障转移等功能。etcd 还支持 Watch 机制,可以监控数据变化并触发相应的操作。

PS:本文前大半部分都是在讲一些 etcdctl、etcd server、raft 节点间数据流转与处理相关的内容,如不感兴趣,可直接跳转到 迎接请求到来 部分阅读 Put 操作流程。

下面是一个最简单的 etcd put 操作:

# 启动 etcd server
$ etcd

$ etcdctl put gretting "hello"

$ etcdctl get gretting
gretting
hello

当我们发送执行一个 put 操作时,etcdctl 内部会创建一个 etcd 的客户端,向 etcd 服务端发送相应的 grpc 请求来完成操作。

学习 etcd 存储的第一步,从 Etcd 框架和 Put 操作源码说起(详细版)

etcd 的源码位于 github.com/etcd-io/etc… 接下来以 etcdctl 为切入点开始源码解读。为了保持简单,我们只关心示例操作的主要流程,比如这篇里我们只关注与 Put 操作相关的流程。

etcdctl

etcdctl 用于向 etcd server 发送请求,下面是 etcdctl 工具的入口方法。

// file: etcdctl/main.go
func main() {
    ctlv3.MustStart()
    return
}

入口函数内调用了 ctlv3.MustStart(),在 ctlv3 包下,初始化了命令行参数及命令处理函数,MustStart() 方法最终会调用到命令行工具的 Execute() 方法执行命令。

// file: etcdctl/ctlv3/ctl.go
...
var (
	rootCmd = &cobra.Command{
		Use:        cliName,
		Short:      cliDescription,
		SuggestFor: []string{"etcdctl"},
	}
)

func init() {
  // 初始化并解析命令行参数
	rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
	rootCmd.PersistentFlags().BoolVar(&globalFlags.Debug, "debug", false, "enable client-side debug logging")
  ...
}

// 注册各类操作方法
rootCmd.AddCommand(
    command.NewGetCommand(),
    command.NewPutCommand(),
    command.NewDelCommand(),
    ...
)

func Start() error {
  ...
  // 执行命令
	return rootCmd.Execute()
}

func MustStart() {
	if err := Start(); err != nil {
    ...
	}
}

可以看到,etcd 默认的 endpoints127.0.0.1:2379,也就是说我们最开始启动的 etcd 服务默认就监听在这个地址。

由于我们是一次 PUT 操作,在 command.NewPutCommand() 中注册了 PUT 命令,命令的处理函数是 putCommandFunc,该函数中真正执行了 PUT 操作。

// file: etcdctl/ctlv3/command/put_command.go
func NewPutCommand() *cobra.Command {
    cmd := &cobra.Command{
    ...
        Run: putCommandFunc,
    }
    ...
    return cmd
}

func putCommandFunc(cmd *cobra.Command, args []string) {
    ...
    resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)
    ...
}

mustClientFromCmd() 实现了将命令行操作转换为 etcd 客户端。在 golang 中,我们通常使用 clientv3.Client 来作为 etcd 客户端。

clientv3.New(clientv3.Config{
    Endpoints:   conf.AppConf.Etcd.Hosts,
    DialTimeout: 5 * time.Second,
    ...
})

mustClientFromCmd() 方法中会先调用 clientConfigFromCmd() 根据命令行参数生成必要的配置信息,然后调用 mustClient 创建客户端。

// file: etcdctl/ctlv3/command/global.go
func mustClientFromCmd(cmd *cobra.Command) *clientv3.Client {
    cfg := clientConfigFromCmd(cmd)
    return mustClient(cfg)
}

func clientConfigFromCmd(cmd *cobra.Command) *clientv3.ConfigSpec {
    ...
    // 对应 clientv3 的 Endpoints 和 DialTimeout 属性
    cfg := &clientv3.ConfigSpec{}
    cfg.Endpoints, err = endpointsFromCmd(cmd)
    cfg.DialTimeout = dialTimeoutFromCmd(cmd)
    ...
}

func mustClient(cc *clientv3.ConfigSpec) *clientv3.Client {
    ...
    // 创建客户端
    client, err := clientv3.New(*cfg)
    ...
    return client
}

可以看到,etcdctl 工具实际上就是使用 clientv3.New 在内部创建了一个客户端来进行各种操作。我们先来大概了解一下创建客户端时做了些什么。

clientv3.Client 结构体中,组合了一些接口,这些接口负责执行各种 etcd 的操作,可以允许我们直接使用 Client 的实例来进行如 PutGrantWatch 等操作。

// file: client/v3/client.go
type Client struct {
    KV
    Lease
    Watcher
    ...
}

// file: client/v3/kv.go
type KV interface {
    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
    ...
}

// file: client/v3/lease.go
type Lease interface {
    Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
    ...
}

在调用 clientv3.New() 时,会与 Endpoints 所在 etcd 服务建立连接,接着创建这些接口实现的实例。

// file: client/v3/client.go
func New(cfg Config) (*Client, error) {
    ...
    return newClient(&cfg)
}

func newClient(cfg *Config) (*Client, error) {
    ...
    // 建立连接
    conn, err := client.dialWithBalancer()
    ... 
    // 创建接口实例
    client.KV = NewKV(client)
    client.Lease = NewLease(client)
    ...
}

dialWithBalancer() 方法向 etcd 服务发起了一个 grpc 连接请求,到这里我们知道,etcd 的客户端和服务端使用 grpc 进行通信。注意,客户端选择了 endpoints[0] 的地址发送连接请求,这是因为 Endpoints 列表中的第一个地址通常是 raft 协议的 leader 节点,Raft 协议规定只有 leader 节点才能处理写请求,因此客户端应该优先连接 leader 节点,这里不再细述。

// file: client/v3/client.go
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
    ...
    return c.dial(creds, opts...)
}

func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
    ...
    target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.endpoints[0]))
    conn, err := grpc.DialContext(dctx, target, opts...)
    ...
}

再来看创建接口实例部分,由于我们进行的是最普通的 Put 操作,由 KV 接口定义,这里主要看 NewKV() 方法。方法逻辑非常简单,创建了一个 kv 结构体,该结构体实现了 KV 接口定义的所有方法。

// file: client/v3/kv.go
func NewKV(c *Client) KV {
    api := &kv{remote: RetryKVClient(c)}
    ...
    return api
}

RetryKVClient() 创建了一个 grpc 客户端,由于是 grpc,我们知道,存在 KVClient 就会存在一个对应的 KVServer,这个我们放到 etcd 服务启动的时候再来回顾。

// file: client/v3/retry.go
func RetryKVClient(c *Client) pb.KVClient {
    return &retryKVClient{
        kc: pb.NewKVClient(c.conn),
    }
}

// file: api/etcdserverpb/rpc.pb.go
func NewKVClient(cc *grpc.ClientConn) KVClient {
    return &kVClient{cc}
}

Put

处理完创建完客户端的逻辑之后,紧接着接着调用了 Put() 方法,开始发送 Put 请求。

resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)

从前面我们知道,Client 调用 Put 方法会执行到 kv 结构体内的 Put() 。Put 方法内调用了 kv.Do(),传入了一个 Put Op 类型。kv.Do() 方法逻辑很简单,根据对应的 Op 类型向 etcd 发送一个 grpc 请求。在前面 NewKv(client) 时我们知道,kv.remote 就是 grpc 的客户端 KVClient,所以这里就是直接发送了一个 grpc put 请求。

// file: client/v3/kv.go
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
    r, err := kv.Do(ctx, OpPut(key, val, opts...))
    return r.put, toErr(ctx, err)
}

func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
    var err error
    switch op.t {
    ...
    case tPut:
    var resp *pb.PutResponse
        // 创建请求 Message
        r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
        // 调用 KVClient.Put() 方法
        resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
        if err == nil {
            return OpResponse{put: (*PutResponse)(resp)}, nil
        }
        ...
    }
    return OpResponse{}, toErr(ctx, err)
}

目前为止,我们了解了使用 etcdctl 的一次 put 请求的核心流程,接下来看看 etcd 服务端是如何处理一个 Put 请求的。

学习 etcd 存储的第一步,从 Etcd 框架和 Put 操作源码说起(详细版)

etcd 服务启动

etcd server 的入口方法如下:

// file: server/main.go
func main() {
    etcdmain.Main(os.Args)
}

由于我们没有给 etcd 传入任何参数,直接进入 startEtcdOrProxyV2() 流程。

// file: server/etcdmain/main.go
func Main(args []string) {
    ...
    startEtcdOrProxyV2(args)
}

先关注最简单的 etcd 命令启动,在 startEtcdOrProxyV2() 中,主要流程就是初始化一些配置、日志,检查部分参数然后启动 etcd,最后阻塞等待 etcd 抛出错误或停止然后退出。

// file: server/etcdmain/etcd.go
func startEtcdOrProxyV2(args []string) {
    ...
    // 配置初始化
    cfg := newConfig()
    ...
  
    // 日志初始化
    lg := cfg.ec.GetLogger()
    if lg == nil {
        ...
        lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
    }
    ...
  
    // 启动
    stopped, errc, err = startEtcd(&cfg.ec)
    ...
  
    // 阻塞进程
    select {
    case lerr := <-errc:
        lg.Fatal("listener failed", zap.Error(lerr))
    case <-stopped:
    }

    osutil.Exit(0)
}

newConfig() 中,初始化了 etcd 启动时需要的一些参数,如当前 raft 节点的监听的 url、监听客户端请求的 url、数据目录等。在前面 etcdctl 命令初始化的时候,我们也看到了默认会请求到 http://localhost:2379 地址,就是在这里初始化的。

// file: server/etcdmain/config.go
func newConfig() *config {
    cfg := &config{
        ec: *embed.NewConfig(),
        ...
    }
    fs.StringVar(&cfg.ec.Dir, "data-dir", cfg.ec.Dir, "Path to the data directory.")
    ...
}

// file: server/embed/config.go
const (
    DefaultListenPeerURLs   = "http://localhost:2380" // 节点之间通信的地址
    DefaultListenClientURLs = "http://localhost:2379" // 客户端请求的地址
)

func NewConfig() *Config {
    lpurl, _ := url.Parse(DefaultListenPeerURLs)
    lcurl, _ := url.Parse(DefaultListenClientURLs)
      ...
  
    cfg := &Config{
        ...
        ListenPeerUrls:      []url.URL{*lpurl},
        ListenClientUrls:    []url.URL{*lcurl},
        ...
    }
}

完成必要的初始化之后,调用 startEtcd() 启动 etcd 服务,其又调用了 embed.StartEtcd() 开始启动 etcd server 以及 http handler 等,然后等待一些 raft 节点相关的通知。

// file: server/etcdmain/etcd.go
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
    // 启动服务
    e, err := embed.StartEtcd(cfg)
    if err != nil {
        return nil, nil, err
    }
  
    // 订阅一些节点加入成功、停止之类的通知
    osutil.RegisterInterruptHandler(e.Close)
    select {
    case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
    case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
    case <-time.After(cfg.ExperimentalWaitClusterReadyTimeout):
        e.GetLogger().Warn("startEtcd: timed out waiting for the ready notification")
    }
    return e.Server.StopNotify(), e.Err(), nil
}

StartEtcd() 根据配置启动一些相关的 listener ,用于节点间通信、客户端与服务端通信等,创建了 EtcdServer 实例并启动。

func StartEtcd(inCfg *Config) (e *Etcd, err error) {
    // 验证配置
    if err = inCfg.Validate(); err != nil {
        return nil, err
    }
    ...
    // 创建 raft 节点相关 Listener
    if e.Peers, err = configurePeerListeners(cfg); err != nil {
        return e, err
    }
        ...
    // 创建客户端请求相关 Listener
    if e.sctxs, err = configureClientListeners(cfg); err != nil {
        return e, err
    }
    ...
    // 创建 EtcdServer 实例
    if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
        return e, err
    }
    ...
    // 启动实例
    e.Server.Start()
  
    // 监听 raft 节点相关 url
    if err = e.servePeers(); err != nil {
        return e, err
    }
    // 监听客户端请求 url
    if err = e.serveClients(); err != nil {
        return e, err
    }
    ...
}

configurePeerListeners()configureClientListeners() 的主要逻辑就是给 PeerUrlsClientUrls 里的地址创建 listener,它们都使用了 transport 包提供的方法创建,并携带了一些 TLS、SocketOpts、超时时间等信息。

// file: server/embed/etcd.go
func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
    ...
    for i, u := range cfg.ListenPeerUrls {
        ...
        peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
            transport.WithTLSInfo(&cfg.PeerTLSInfo),
            transport.WithSocketOpts(&cfg.SocketOpts),
            transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
        )
        if err != nil {
            return nil, err
        }
    ...
    }
    ...
}

func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
    ...
    for _, u := range cfg.ListenClientUrls {
        ...
        if sctx.l, err = transport.NewListenerWithOpts(addr, u.Scheme,
            transport.WithSocketOpts(&cfg.SocketOpts),
            transport.WithSkipTLSInfoCheck(true),
        ); err != nil {
            return nil, err
        }
        ...
    }
    ...
}

transport.NewListenerWithOpts() 用于创建 listener 并可以携带多个的 option,接着又继续调用到 newListener() -> newKeepAliveListener(),在这里看到了我们熟悉的 net.Listen,最后通过 NewKeepAliveListener() 封装一层后返回。

// file: client/pkg/transport/listener.go
func NewListenerWithOpts(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
    return newListener(addr, scheme, opts...)
}

func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
    ...
    // 逐个调用传入的 Option 方法
    lnOpts := newListenOpts(opts...)

    ...
    ln, err := newKeepAliveListener(...)
    ...
}

func newKeepAliveListener(cfg *net.ListenConfig, addr string) (ln net.Listener, err error) {
    if cfg != nil {
        ln, err = cfg.Listen(context.TODO(), "tcp", addr)
    } else {
        ln, err = net.Listen("tcp", addr)
    }
    if err != nil {
        return
    }

    return NewKeepAliveListener(ln, "tcp", nil)
}

// file: client/pkg/transport/keepalive_listener.go
func NewKeepAliveListener(l net.Listener, scheme string, tlscfg *tls.Config) (net.Listener, error) {
    kal := &keepaliveListener{
        Listener: l,
    }
    ...
    return kal, nil
}

创建完两个相关的 listener 后,开始创建并启动 EtcdServer 实例。

// 创建 EtcdServer 实例
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
    return e, err
}

// 启动实例
e.Server.Start()

etcdserver.NewServer() 中做了许多的处理,我们这里只针对示例看一些最关键的地方。注意,在创建实例时调用了 b.raft.newRaftNode() 创建了一个 raft 节点,这里后面还会提到。

// file: server/etcdserver/server.go
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
    // 创建数据目录、KV store 等
    b, err := bootstrap(cfg)
  
    srv = &EtcdServer{
        ...
        // 创建 raft 节点
        r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
        ...
    }
    // 创建处理具体操作的实例
    srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
    ...
    srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
    ...
    srv.uberApply = srv.NewUberApplier()

    return srv, nil
}

创建完 EtcdServer 实例后,调用了 e.Server.Start() 方法启动实例,接着执行到 s.start() 方法,在这里为 EtcdServer 的一些属性进行了分配,最终调用 s.run() 方法,s.run() 会使用 for select 结构将当前 goroutine 阻塞,并读取各种 channel 的数据进行处理。

// file: server/etcdserver/server.go
func (s *EtcdServer) Start() {
    s.start()
    ...
}

func (s *EtcdServer) start() {
    ...
    s.w = wait.New()
    s.done = make(chan struct{})
    s.stop = make(chan struct{})
    s.ctx, s.cancel = context.WithCancel(context.Background())
    ...

    go s.run()
}

func (s *EtcdServer) run() {
    ...
    s.r.start(rh)
  
    ...
    for {
        select {
        case ap := <-s.r.apply():
            f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
            sched.Schedule(f)
        }
        ...
    }
}

对于我们的示例操作,只需要关心 s.r.start()s.r.apply(),前面创建实例的时候我们知道,s.r 是一个 raft 的节点。etcd 使用了github.com/etcd-io/raf… 来作为 etcd 的 raft 协议实现。 在 r.start() 中,调用了 <-r.Ready(),当 raft 节点收到了其他节点发来的数据,如日志条目(Log Entry)、心跳包等,channel 返回一个值表示已准备就绪,然后后会向 r.applyc channel 中写入一条数据。

// file: server/etcdserver/raft.go
func (r *raftNode) start(rh *raftReadyHandler) {
    ...

    go func() {
    for {
        select {
        ...
        case rd := <-r.Ready():
            ...
            ap := toApply{...}
            select {
            case r.applyc <- ap:
            }
        ...
    }()
}

raft 节点就绪写入数据到 r.applyc 后,在外面的 s.run() 方法的 select 中会从 <-s.r.apply() 中读出这条数据,并把这条数据交给调度器处理。

// file: server/etcdserver/raft.go
func (r *raftNode) apply() chan toApply {
    return r.applyc
}

到这里,etcd 服务就已经作为一个 raft 节点启动起来了。等到开始处理 Put 操作的数据时,我们再继续往下看数据是如何被处理的。现在回到 StartEtcd() 中,看看启动实例之后的处理。

func StartEtcd(inCfg *Config) (e *Etcd, err error) {
    ...
    // 创建 EtcdServer 实例
    if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
        return e, err
    }
    ...
    // 启动实例
    e.Server.Start()

    // 监听 raft 节点相关 url
    if err = e.servePeers(); err != nil {
        return e, err
    }
    // 监听客户端请求 url
    if err = e.serveClients(); err != nil {
        return e, err
    }
    ...
}

servePeers() 主要做的就是注册 raft 节点相关的 http handler,开启 http 服务提供给其它 raft 节点调用(节点之间使用 http 通信),然后对之前创建的 listener 调用 Accept() 接收连接并处理。

// file: server/embed/etcd.go
func (e *Etcd) servePeers() (err error) {
    ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
    // 创建 listener 时为 e.Peers 赋值
    for _, p := range e.Peers {
        u := p.Listener.Addr().String()
        m := cmux.New(p.Listener)
        srv := &http.Server{
            Handler:     ph,
            ReadTimeout: 5 * time.Minute,
            ErrorLog:    defaultLog.New(io.Discard, "", 0), // do not log user error
        }
        go srv.Serve(m.Match(cmux.Any()))
        // 定义 p.serve 方法
        p.serve = func() error {
            e.cfg.logger.Info(
                "cmux::serve",
                zap.String("address", u),
            )
            // 这里面调用 Accept()
            return m.Serve()
        }
    ...
    }
  
    ...
  
    for _, pl := range e.Peers {
        // 开启新的 goroutine 调用 serve
        go func(l *peerListener) {
            u := l.Addr().String()
            e.cfg.logger.Info(
                "serving peer traffic",
                zap.String("address", u),
            )
            e.errHandler(l.serve())
        }(pl)
    }
}

// file: server/etcdserver/api/etcdhttp/peer.go
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
    return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
}

func newPeerHandler(...) http.Handler {
    ...
    mux := http.NewServeMux()
    mux.HandleFunc("/", http.NotFound)
    // rafthttp.RaftPrefix = "raft"
    mux.Handle(rafthttp.RaftPrefix, raftHandler) 
    mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
    ...
    return mux
}

serveClients() 则是向客户端请求提供了 http 接口和 grpc 服务。

// file: server/embed/etcd.go
func (e *Etcd) serveClients() (err error) {
    ...
    // 创建 listener 时为 e.sctxs 赋值
    for _, sctx := range e.sctxs {
        go func(s *serveCtx) {
            e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, gopts...))
        }(sctx)
    }
}

// file: server/embed/serve.go
func (sctx *serveCtx) serve(
    s *etcdserver.EtcdServer,
    tlsinfo *transport.TLSInfo,
    handler http.Handler,
    errHandler func(error),
    gopts ...grpc.ServerOption) (err error) {
    ...
  
    // 注册 rpc
    gs = v3rpc.Server(s, nil, nil, gopts...)
    ...
    go func() { errHandler(gs.Serve(grpcl)) }()

    // http 接口相关
    ...
    srvhttp := &http.Server{
        Handler:  createAccessController(sctx.lg, s, httpmux),
        ErrorLog: logger, // do not log user error
    }
    go func() { errHandler(srvhttp.Serve(httpl)) }()
    ...
}

到这里,我们可以回收 etcdctl 结尾的部分,那里我们最终创建了一个 KVClient,就是在 v3rpc.Server() 中注册了对应的 KVServer ,对于我们的 Put 示例操作来说,就是调用了 pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s)) 注册,也就是 EtcdServer 实例本身实现了 KVServer 接口。

// file: server/etcdserver/api/v3rpc/grpc.go
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
    ...
    grpcServer := grpc.NewServer(append(opts, gopts...)...)

    pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
    pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
    pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
    ...
}

迎接请求到来

经过前面一番折腾,我们知道了 EtcdServer 实例本身实现了 KVServer 的 rpc 接口。我们在客户端调用 Put 方法时,就会进入到 EtcdServer 的 Put rpc,下面是客户端调用 Put 的 rpc 方法。

// file: client/v3/kv.go
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
    r, err := kv.Do(ctx, OpPut(key, val, opts...))
    return r.put, toErr(ctx, err)
}

func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
    var err error
    switch op.t {
    ...
    case tPut:
        var resp *pb.PutResponse
        // 创建请求 Message
        r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
        // 调用 KVClient.Put() 方法
        resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
        if err == nil {
            return OpResponse{put: (*PutResponse)(resp)}, nil
        }
    ...
    }
    return OpResponse{}, toErr(ctx, err)
}

下面是 EtcdServer 实际接收 Put 请求的方法。

func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
    ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
    resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
    if err != nil {
        return nil, err
    }
    return resp.(*pb.PutResponse), nil
}

Put() 方法调用了 s.raftRequest() -> s.raftRequestOnce() -> s.processInternalRaftRequestOnce(),从方法名可以看出来,Put 操作就是一个 raft 请求,它最终会同步到各个 raft 节点中去。

// file: server/etcdserver/v3_server.go
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
    ...
    resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
    ...
    return resp.(*pb.PutResponse), nil
}

func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
    return s.raftRequestOnce(ctx, r)
}

func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
    result, err := s.processInternalRaftRequestOnce(ctx, r)
    ...
    return result.Resp, nil
}

processInternalRaftRequestOnce() 方法为请求生成一个 ID 并序列化请求体,向 s.w 中注册一个请求,然后调用 raft 节点的 Propose() 方法将请求发送给其它 raft 节点并等待返回结果。我们在处理请求结果时会再提到 s.w.Register(id) 的作用。

// file: server/etcdserver/v3_server.go
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
    ...
    r.Header = &pb.RequestHeader{
            ID: s.reqIDGen.Next(),
    }
  
    data, err := r.Marshal()
  
    // 向 EtcdServer 下的 wait.Wait 中注册一个请求
    id := r.ID
    ch := s.w.Register(id)

    ...
    err = s.r.Propose(cctx, data)

    select {
    case x := <-ch:
        return x.(*apply2.Result), nil
    ...
    }
}

Propose() 是 etcd raft 库中的节点方法,它首先会将请求发送给当前 raft 集群中的 Leader 节点,由 Leader 节点统一进行处理,确保整个集群中的数据的一致性。Leader 节点收到请求后,会在本地执行请求,然后再调用 Propose() 将请求执行结果发送给所有的Follower 节点,等待它们的返回结果,确保整个集群中的数据都得到了更新。最后,Leader 节点将所有 Follower 节点的返回结果进行汇总,生成最终的执行结果(在这里是 apply2.Result),并将结果返回给 Propose 方法的调用方。

处理请求数据

回顾下在前面服务启动执行 EtcdServerrun() 方法时,会执行 s.r.start(),其中调用了 <-r.Ready() 等待 raft 节点准备就绪,当我们在 Put 操作调用 Propose() 之后,<-r.Ready() 会返回,然后使用 toApply 包装一层写入到 r.applyc 中,由 run() 方法中的 <-s.r.apply() 读取出来,最后交由调度器执行 s.applyAll() 方法。

// file: server/etcdserver/server.go
func (s *EtcdServer) run() {
    s.r.start(rh)
    for {
        select {
        case ap := <-s.r.apply():
            // ap 即 toApply 结构
            f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
            sched.Schedule(f)
        }
    }
}

// file: server/etcdserver/raft.go
func (r *raftNode) start(rh *raftReadyHandler) {
    go func() {
    for {
        select {
        case rd := <-r.Ready():
            ap := toApply{...}
            select {
            case r.applyc <- ap:
            }
        }
    }()
}

raft 节点提交的请求会以 log entry 的形式发送到其它节点,在 s.applyAll() 中对这些 log entry 进行了处理,

// file: server/etcdserver/server.go
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
    ...
    s.applyEntries(ep, apply)
    ...
}

func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
    ...
    if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
        ...
    }
}

一个 Put 请求的 log entry 在 etcd 中属于 EntryNormal 类型,s.apply() 接着进入 s.applyEntryNormal() 流程中处理

// file: server/etcdserver/server.go
func (s *EtcdServer) apply(
    es []raftpb.Entry,
    confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
    ...
    switch e.Type {
    case raftpb.EntryNormal:
        s.applyEntryNormal(&e)
    ...
    }
    ...
}

applyEntryNormal() 中解析了 Propose() 传过来的请求体,然后调用 s.uberApply.Apply() 处理请求。

// file: server/etcdserver/server.go
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
    var ar *apply.Result
    ...
    var raftReq pb.InternalRaftRequest
    // 解析 Propose() 传过来的请求体
    if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
        ...
    }

    id := raftReq.ID
    ...
    needResult := s.w.IsRegistered(id)
    if needResult || !noSideEffect(&raftReq) {
        ...
        // 处理请求
        ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
    }

    ...
    if ar.Err != errors.ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
        s.w.Trigger(id, ar)
        return
    }
}

还记得我们在调用 Propose() 之前,向 s.w 中注册了一个请求,s.w.Register(id) 返回一个 channel 并从这个 channel 中读取数据。在 applyEntryNormal() 处理请求时,会先调用 s.w.IsRegistered(id) 根据传入的请求 ID 判断是否注册了该请求,如果返回 true 则说明可能有一个地方正在等待这个请求的返回(如下面的 case x := <-ch),处理完请求后,会调用 s.w.Trigger(id, ar) 将请求结果写入注册时返回的 channel 并关闭它,这时 case x := <-ch 可以读出数据并将请求结果返回。

func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
    ...
    // 向 EtcdServer 下的 wait.Wait 中注册一个请求
    ch := s.w.Register(id)
    err = s.r.Propose(cctx, data)
    select {
    // 等待请求返回
    case x := <-ch:
        return x.(*apply2.Result), nil
    ...
    }
}

然后我们进入 s.uberApply.Apply() 看看是如何处理请求的。它调用了 UberApplier 接口实现下的 Apply() 方法。

// file: server/etcdserver/apply/uber_applier.go
func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
    // We first execute chain of Apply() calls down the hierarchy:
    // (i.e. CorruptApplier -> CappedApplier -> Auth -> Quota -> Backend),
    // then dispatch() unpacks the request to a specific method (like Put),
    // that gets executed down the hierarchy again:
    // i.e. CorruptApplier.Put(CappedApplier.Put(...(BackendApplier.Put(...)))).
    return a.applyV3.Apply(context.TODO(), r, shouldApplyV3, a.dispatch)
}

由于 applyV3.Apply() 的执行链比较长,这里就不一一细数了,根据 Debug 与官方注释的指引,在 uberApply.Apply() 中最终会执行到 a.dispatch,它根据我们的 Put 操作类型,又执行到 a.applyV3.Put()

// file: server/etcdserver/apply/uber_applier.go
func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
    ...
    switch {
    ...
    case r.Put != nil:
        op = "Put"
        ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(ctx, nil, r.Put)
    }
    ...
}

a.applyV3.Put() 最终会执行到 applierV3backend 结构体的 Put() 方法。弯弯绕绕一大圈,终于进入正式的数据处理环节了~从包名似乎可以大致窥见一些端倪,没错,就是多版本并发控制(mvcc)和事务(txn)。在创建 EtcdServer 实例的时候,创建了一个 MVCC 的 KV 存储 srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig),这里的 a.kv 就是 EtcdServer 下的 MVCC 存储。它提供了多版本控制、事务等功能。

// file: server/etcdserver/apply/apply.go
func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
    return mvcctxn.Put(ctx, a.lg, a.lessor, a.kv, txn, p)
}

etcd 的数据使用一个 Revision 作为数据的版本号, mvcctxn.Put() 创建了一个写事务写入数据并返回结果(Revision、数据历史记录等)。

// file: server/etcdserver/txn/txn.go
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
    ...
    if txnWrite == nil {
        ...
        // 创建写事务
        txnWrite = kv.Write(trace)
        defer txnWrite.End()
    }

    ...
    resp.Header.Revision = txnWrite.Put(p.Key, val, leaseID)
}

我们目前只关注 Put 操作是怎样被存储的,后面会专门写一篇事务的源码分析。这里我们首先提一下,etcd 使用 BoltDB 存储数据,它是一个嵌入式的键值存储数据库,支持 ACID 事务,BoltDB 中的 bucket 是一种用于组织和存储键值对的容器,每个 bucket 都可以包含任意数量的键值对,提交事务时,bucket 中的所有更改都会被写入磁盘。

txnWrite.Put() 最终会执行到 storeTxnWrite 结构体的 put() 方法,其会先将数据写到 bucket 中,然后调用 tw.s.kvindex.Put() 存储一些数据在内存里。这里留一个小疑问,后面解答:为什么最新的 Revision 需要加 1?对于初次赋值的 key 来说,Revision 值会是 2。

// file: server/storage/mvcc/kvstore_txn.go
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
    // 最新的 Revision + 1
    rev := tw.beginRev + 1
    c := rev
    ...
  
    // 创建一个 Revision 实例,changes 表示当前事务对这个 Revision 的修改次数
    idxRev := revision{main: rev, sub: int64(len(tw.changes))}
    ...
    // 存入 bucket
    tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
    // 数据存入内存 store
    tw.s.kvindex.Put(key, idxRev)
    tw.changes = append(tw.changes, kv)
    ...
}

mvcc 的 store (tw.s.kvindex )需要实现一个 index 接口,作为 mvcc 的存储引擎,treeIndex 结构体实现了这个接口,事务中调用的就是它的 Put() 方法。可以看到 treeIndex 是一个 B 树,这里使用了 Goole 实现的库 github.com/google/btre… 实际上这是一个 B+ 树,也就是说 etcd 在内存中是以 B+ 树结构存储数据的。

// file: server/storage/mvcc/index.go
type index interface {
    Put(key []byte, rev revision)
    ...
}

type treeIndex struct {
    sync.RWMutex
    tree *btree.BTreeG[*keyIndex]
    lg   *zap.Logger
}

func (ti *treeIndex) Put(key []byte, rev revision) {...}

我们来看看 Put 方法实现,其创建了一个 keyIndex 结构作为 B+ 树的索引,根据这个索引从树中 Get 数据,没有数据的话,会先将 revision 实例加入到 keyIndex 结构中,然后调用 B+ 树的 ReplaceOrInsert() 方法将数据插入;如果已经存在数据,则只更新 keyIndex 的信息。而最新的 Revision 号之所以加 1,是因为 B+ 的根节点通常是一个空节点,而 Revision 又作为了索引的一部分。

在 B+ 树中,只存储了数据的 key 和 revision 等元数据,这是为了提高内存使用效率和存储性能以及更小的内存占用,实际的值,是存在事务的 bucket 中,使用 BoltDB 作为存储引擎,而 B+ 树则是用来索引 BoltDB 数据库中的键值对的数据结构。当客户端请求读取一个键值对时,首先会在 B+ 树中查找该键的位置,然后从 BoltDB 中读取对应的值。数据存入事务 bucket 中后,不会立马被提交,而是后台通过定时任务每个一段时间提交一次事务,即数据最终落盘。后期考虑专门开一篇 etcd 的存储讲。

func (ti *treeIndex) Put(key []byte, rev revision) {
    keyi := &keyIndex{key: key}

    ti.Lock()
    defer ti.Unlock()
    okeyi, ok := ti.tree.Get(keyi)
    if !ok {
        keyi.put(ti.lg, rev.main, rev.sub)
        // 插入树中
        ti.tree.ReplaceOrInsert(keyi)
        return
    }
    okeyi.put(ti.lg, rev.main, rev.sub)
}

到这里,Put 操作的基本流程已经完成,请求结果会返回到 applyEntryNormal 函数中,拿到请求结果 ar 之后,会调用 s.w.Trigger(id, ar) 将请求结果推送到 Propose() 之前注册请求的 channel,然后关闭它,等待在这个请求上的节点返回。

// file: server/etcdserver/server.go
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
    ...
    needResult := s.w.IsRegistered(id)
    if needResult || !noSideEffect(&raftReq) {
        ...
        // 处理请求
        ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
    }

    ...
    if ar.Err != errors.ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
        s.w.Trigger(id, ar)
        return
    }
}

// file: server/etcdserver/v3_server.go
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
    ...
    // 向 EtcdServer 下的 wait.Wait 中注册一个请求
    ch := s.w.Register(id)
    err = s.r.Propose(cctx, data)
    select {
    // 等待请求返回
    case x := <-ch:
        return x.(*apply2.Result), nil
    ...
    }
}

最后 processInternalRaftRequestOnce() 将结果返回到 raftRequestOnce() -> raftRequest() -> Put()(EtcdServer),将 RPC 请求结果返回给客户端。

func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
    ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
    resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
    if err != nil {
        return nil, err
    }
    return resp.(*pb.PutResponse), nil
}

总结

在这篇文章中,讲述了一个最简单的 Put 请求最基本的处理流程,这个流程同时也是 etcd 其它操作的基础。在 etcdctl 的处理中,会把用户的命令转成客户端的方法,随后客户端向 etcd 服务端发送一个 gRPC 请求。服务端实现了对应的 gRPC 服务提供给客户端,服务的启动包含了 raft 节点通信和客户端通信的 http 服务,服务本身即为一个 raft 节点,启动时等待其它节点发来的同步消息。

服务端接收到客户端的 Put 请求后会进行以下操作:

  • 将键值对以 log entry 的形式发送给 raft 集群中其它节点(包括自身),等待节点处理。
  • 节点收到数据,开始调用 mvcc 存储来处理数据。
  • mvcc 存储将键值与 revision 等元数据作为索引存入 B+ 树,把实际的值写入到 BoltDB 事务的 bucket 中。
  • 后台会定期将 BoltDB 的事务提交,数据持久化会到磁盘中。读取数据时会先根据从 B+ 树找出相关的元数据,再从磁盘中读取实际的值。

还有许多细节文中并没有展开地说,比如具体的 BoltDB 存储,但相信有这些基础的核心流程,需要研究哪一部分的细节时可以事半功倍。