学习 etcd 存储的第一步,从 Etcd 框架和 Put 操作源码说起(详细版)
etcd 是一个分布式、高可用的键值存储系统,它被设计为可靠的、安全的、快速的,并具有简单的API。
etcd 使用 Go 语言开发,基于 Raft 算法实现了分布式一致性。它可以用于存储集群中的关键配置信息、服务发现、锁等。
etcd 的数据模型类似于一个简单的文件系统,支持 PUT、GET、DELETE 等操作,每个节点的数据会自动同步到其他节点上,因此可以实现高可用、自动故障转移等功能。etcd 还支持 Watch 机制,可以监控数据变化并触发相应的操作。
PS:本文前大半部分都是在讲一些 etcdctl、etcd server、raft 节点间数据流转与处理相关的内容,如不感兴趣,可直接跳转到 迎接请求到来 部分阅读 Put 操作流程。
下面是一个最简单的 etcd put 操作:
# 启动 etcd server
$ etcd
$ etcdctl put gretting "hello"
$ etcdctl get gretting
gretting
hello
当我们发送执行一个 put 操作时,etcdctl 内部会创建一个 etcd 的客户端,向 etcd 服务端发送相应的 grpc 请求来完成操作。

etcd 的源码位于 github.com/etcd-io/etc… 接下来以 etcdctl 为切入点开始源码解读。为了保持简单,我们只关心示例操作的主要流程,比如这篇里我们只关注与 Put 操作相关的流程。
etcdctl
etcdctl
用于向 etcd server 发送请求,下面是 etcdctl 工具的入口方法。
// file: etcdctl/main.go
func main() {
ctlv3.MustStart()
return
}
入口函数内调用了 ctlv3.MustStart()
,在 ctlv3
包下,初始化了命令行参数及命令处理函数,MustStart()
方法最终会调用到命令行工具的 Execute()
方法执行命令。
// file: etcdctl/ctlv3/ctl.go
...
var (
rootCmd = &cobra.Command{
Use: cliName,
Short: cliDescription,
SuggestFor: []string{"etcdctl"},
}
)
func init() {
// 初始化并解析命令行参数
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().BoolVar(&globalFlags.Debug, "debug", false, "enable client-side debug logging")
...
}
// 注册各类操作方法
rootCmd.AddCommand(
command.NewGetCommand(),
command.NewPutCommand(),
command.NewDelCommand(),
...
)
func Start() error {
...
// 执行命令
return rootCmd.Execute()
}
func MustStart() {
if err := Start(); err != nil {
...
}
}
可以看到,etcd 默认的 endpoints
为 127.0.0.1:2379
,也就是说我们最开始启动的 etcd 服务默认就监听在这个地址。
由于我们是一次 PUT 操作,在 command.NewPutCommand()
中注册了 PUT 命令,命令的处理函数是 putCommandFunc
,该函数中真正执行了 PUT 操作。
// file: etcdctl/ctlv3/command/put_command.go
func NewPutCommand() *cobra.Command {
cmd := &cobra.Command{
...
Run: putCommandFunc,
}
...
return cmd
}
func putCommandFunc(cmd *cobra.Command, args []string) {
...
resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)
...
}
mustClientFromCmd()
实现了将命令行操作转换为 etcd 客户端。在 golang 中,我们通常使用 clientv3.Client
来作为 etcd 客户端。
clientv3.New(clientv3.Config{
Endpoints: conf.AppConf.Etcd.Hosts,
DialTimeout: 5 * time.Second,
...
})
在 mustClientFromCmd()
方法中会先调用 clientConfigFromCmd()
根据命令行参数生成必要的配置信息,然后调用 mustClient
创建客户端。
// file: etcdctl/ctlv3/command/global.go
func mustClientFromCmd(cmd *cobra.Command) *clientv3.Client {
cfg := clientConfigFromCmd(cmd)
return mustClient(cfg)
}
func clientConfigFromCmd(cmd *cobra.Command) *clientv3.ConfigSpec {
...
// 对应 clientv3 的 Endpoints 和 DialTimeout 属性
cfg := &clientv3.ConfigSpec{}
cfg.Endpoints, err = endpointsFromCmd(cmd)
cfg.DialTimeout = dialTimeoutFromCmd(cmd)
...
}
func mustClient(cc *clientv3.ConfigSpec) *clientv3.Client {
...
// 创建客户端
client, err := clientv3.New(*cfg)
...
return client
}
可以看到,etcdctl
工具实际上就是使用 clientv3.New
在内部创建了一个客户端来进行各种操作。我们先来大概了解一下创建客户端时做了些什么。
在 clientv3.Client
结构体中,组合了一些接口,这些接口负责执行各种 etcd 的操作,可以允许我们直接使用 Client
的实例来进行如 Put
、Grant
、Watch
等操作。
// file: client/v3/client.go
type Client struct {
KV
Lease
Watcher
...
}
// file: client/v3/kv.go
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
...
}
// file: client/v3/lease.go
type Lease interface {
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
...
}
在调用 clientv3.New()
时,会与 Endpoints
所在 etcd 服务建立连接,接着创建这些接口实现的实例。
// file: client/v3/client.go
func New(cfg Config) (*Client, error) {
...
return newClient(&cfg)
}
func newClient(cfg *Config) (*Client, error) {
...
// 建立连接
conn, err := client.dialWithBalancer()
...
// 创建接口实例
client.KV = NewKV(client)
client.Lease = NewLease(client)
...
}
dialWithBalancer()
方法向 etcd 服务发起了一个 grpc 连接请求,到这里我们知道,etcd 的客户端和服务端使用 grpc 进行通信。注意,客户端选择了 endpoints[0]
的地址发送连接请求,这是因为 Endpoints 列表中的第一个地址通常是 raft 协议的 leader 节点,Raft 协议规定只有 leader 节点才能处理写请求,因此客户端应该优先连接 leader 节点,这里不再细述。
// file: client/v3/client.go
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
...
return c.dial(creds, opts...)
}
func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
...
target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.endpoints[0]))
conn, err := grpc.DialContext(dctx, target, opts...)
...
}
再来看创建接口实例部分,由于我们进行的是最普通的 Put 操作,由 KV
接口定义,这里主要看 NewKV()
方法。方法逻辑非常简单,创建了一个 kv
结构体,该结构体实现了 KV
接口定义的所有方法。
// file: client/v3/kv.go
func NewKV(c *Client) KV {
api := &kv{remote: RetryKVClient(c)}
...
return api
}
RetryKVClient()
创建了一个 grpc 客户端,由于是 grpc,我们知道,存在 KVClient
就会存在一个对应的 KVServer
,这个我们放到 etcd 服务启动的时候再来回顾。
// file: client/v3/retry.go
func RetryKVClient(c *Client) pb.KVClient {
return &retryKVClient{
kc: pb.NewKVClient(c.conn),
}
}
// file: api/etcdserverpb/rpc.pb.go
func NewKVClient(cc *grpc.ClientConn) KVClient {
return &kVClient{cc}
}
Put
处理完创建完客户端的逻辑之后,紧接着接着调用了 Put()
方法,开始发送 Put 请求。
resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)
从前面我们知道,Client
调用 Put 方法会执行到 kv
结构体内的 Put()
。Put 方法内调用了 kv.Do()
,传入了一个 Put Op
类型。kv.Do()
方法逻辑很简单,根据对应的 Op
类型向 etcd 发送一个 grpc 请求。在前面 NewKv(client)
时我们知道,kv.remote
就是 grpc 的客户端 KVClient
,所以这里就是直接发送了一个 grpc put 请求。
// file: client/v3/kv.go
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, toErr(ctx, err)
}
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t {
...
case tPut:
var resp *pb.PutResponse
// 创建请求 Message
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
// 调用 KVClient.Put() 方法
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
...
}
return OpResponse{}, toErr(ctx, err)
}
目前为止,我们了解了使用 etcdctl 的一次 put 请求的核心流程,接下来看看 etcd 服务端是如何处理一个 Put 请求的。

etcd 服务启动
etcd server 的入口方法如下:
// file: server/main.go
func main() {
etcdmain.Main(os.Args)
}
由于我们没有给 etcd 传入任何参数,直接进入 startEtcdOrProxyV2()
流程。
// file: server/etcdmain/main.go
func Main(args []string) {
...
startEtcdOrProxyV2(args)
}
先关注最简单的 etcd
命令启动,在 startEtcdOrProxyV2()
中,主要流程就是初始化一些配置、日志,检查部分参数然后启动 etcd,最后阻塞等待 etcd 抛出错误或停止然后退出。
// file: server/etcdmain/etcd.go
func startEtcdOrProxyV2(args []string) {
...
// 配置初始化
cfg := newConfig()
...
// 日志初始化
lg := cfg.ec.GetLogger()
if lg == nil {
...
lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
}
...
// 启动
stopped, errc, err = startEtcd(&cfg.ec)
...
// 阻塞进程
select {
case lerr := <-errc:
lg.Fatal("listener failed", zap.Error(lerr))
case <-stopped:
}
osutil.Exit(0)
}
在 newConfig()
中,初始化了 etcd 启动时需要的一些参数,如当前 raft 节点的监听的 url、监听客户端请求的 url、数据目录等。在前面 etcdctl 命令初始化的时候,我们也看到了默认会请求到 http://localhost:2379 地址,就是在这里初始化的。
// file: server/etcdmain/config.go
func newConfig() *config {
cfg := &config{
ec: *embed.NewConfig(),
...
}
fs.StringVar(&cfg.ec.Dir, "data-dir", cfg.ec.Dir, "Path to the data directory.")
...
}
// file: server/embed/config.go
const (
DefaultListenPeerURLs = "http://localhost:2380" // 节点之间通信的地址
DefaultListenClientURLs = "http://localhost:2379" // 客户端请求的地址
)
func NewConfig() *Config {
lpurl, _ := url.Parse(DefaultListenPeerURLs)
lcurl, _ := url.Parse(DefaultListenClientURLs)
...
cfg := &Config{
...
ListenPeerUrls: []url.URL{*lpurl},
ListenClientUrls: []url.URL{*lcurl},
...
}
}
完成必要的初始化之后,调用 startEtcd()
启动 etcd 服务,其又调用了 embed.StartEtcd()
开始启动 etcd server 以及 http handler 等,然后等待一些 raft 节点相关的通知。
// file: server/etcdmain/etcd.go
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
// 启动服务
e, err := embed.StartEtcd(cfg)
if err != nil {
return nil, nil, err
}
// 订阅一些节点加入成功、停止之类的通知
osutil.RegisterInterruptHandler(e.Close)
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
case <-time.After(cfg.ExperimentalWaitClusterReadyTimeout):
e.GetLogger().Warn("startEtcd: timed out waiting for the ready notification")
}
return e.Server.StopNotify(), e.Err(), nil
}
StartEtcd()
根据配置启动一些相关的 listener ,用于节点间通信、客户端与服务端通信等,创建了 EtcdServer
实例并启动。
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// 验证配置
if err = inCfg.Validate(); err != nil {
return nil, err
}
...
// 创建 raft 节点相关 Listener
if e.Peers, err = configurePeerListeners(cfg); err != nil {
return e, err
}
...
// 创建客户端请求相关 Listener
if e.sctxs, err = configureClientListeners(cfg); err != nil {
return e, err
}
...
// 创建 EtcdServer 实例
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err
}
...
// 启动实例
e.Server.Start()
// 监听 raft 节点相关 url
if err = e.servePeers(); err != nil {
return e, err
}
// 监听客户端请求 url
if err = e.serveClients(); err != nil {
return e, err
}
...
}
configurePeerListeners()
和 configureClientListeners()
的主要逻辑就是给 PeerUrls
和 ClientUrls
里的地址创建 listener,它们都使用了 transport
包提供的方法创建,并携带了一些 TLS、SocketOpts、超时时间等信息。
// file: server/embed/etcd.go
func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
...
for i, u := range cfg.ListenPeerUrls {
...
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
transport.WithTLSInfo(&cfg.PeerTLSInfo),
transport.WithSocketOpts(&cfg.SocketOpts),
transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
)
if err != nil {
return nil, err
}
...
}
...
}
func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
...
for _, u := range cfg.ListenClientUrls {
...
if sctx.l, err = transport.NewListenerWithOpts(addr, u.Scheme,
transport.WithSocketOpts(&cfg.SocketOpts),
transport.WithSkipTLSInfoCheck(true),
); err != nil {
return nil, err
}
...
}
...
}
transport.NewListenerWithOpts()
用于创建 listener 并可以携带多个的 option,接着又继续调用到 newListener()
-> newKeepAliveListener()
,在这里看到了我们熟悉的 net.Listen
,最后通过 NewKeepAliveListener()
封装一层后返回。
// file: client/pkg/transport/listener.go
func NewListenerWithOpts(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
return newListener(addr, scheme, opts...)
}
func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
...
// 逐个调用传入的 Option 方法
lnOpts := newListenOpts(opts...)
...
ln, err := newKeepAliveListener(...)
...
}
func newKeepAliveListener(cfg *net.ListenConfig, addr string) (ln net.Listener, err error) {
if cfg != nil {
ln, err = cfg.Listen(context.TODO(), "tcp", addr)
} else {
ln, err = net.Listen("tcp", addr)
}
if err != nil {
return
}
return NewKeepAliveListener(ln, "tcp", nil)
}
// file: client/pkg/transport/keepalive_listener.go
func NewKeepAliveListener(l net.Listener, scheme string, tlscfg *tls.Config) (net.Listener, error) {
kal := &keepaliveListener{
Listener: l,
}
...
return kal, nil
}
创建完两个相关的 listener 后,开始创建并启动 EtcdServer
实例。
// 创建 EtcdServer 实例
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err
}
// 启动实例
e.Server.Start()
etcdserver.NewServer()
中做了许多的处理,我们这里只针对示例看一些最关键的地方。注意,在创建实例时调用了 b.raft.newRaftNode()
创建了一个 raft 节点,这里后面还会提到。
// file: server/etcdserver/server.go
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
// 创建数据目录、KV store 等
b, err := bootstrap(cfg)
srv = &EtcdServer{
...
// 创建 raft 节点
r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
...
}
// 创建处理具体操作的实例
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
...
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
...
srv.uberApply = srv.NewUberApplier()
return srv, nil
}
创建完 EtcdServer
实例后,调用了 e.Server.Start()
方法启动实例,接着执行到 s.start()
方法,在这里为 EtcdServer
的一些属性进行了分配,最终调用 s.run()
方法,s.run()
会使用 for select
结构将当前 goroutine 阻塞,并读取各种 channel 的数据进行处理。
// file: server/etcdserver/server.go
func (s *EtcdServer) Start() {
s.start()
...
}
func (s *EtcdServer) start() {
...
s.w = wait.New()
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.ctx, s.cancel = context.WithCancel(context.Background())
...
go s.run()
}
func (s *EtcdServer) run() {
...
s.r.start(rh)
...
for {
select {
case ap := <-s.r.apply():
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
}
...
}
}
对于我们的示例操作,只需要关心 s.r.start()
和 s.r.apply()
,前面创建实例的时候我们知道,s.r
是一个 raft 的节点。etcd 使用了github.com/etcd-io/raf… 来作为 etcd 的 raft 协议实现。 在 r.start()
中,调用了 <-r.Ready()
,当 raft 节点收到了其他节点发来的数据,如日志条目(Log Entry)、心跳包等,channel 返回一个值表示已准备就绪,然后后会向 r.applyc
channel 中写入一条数据。
// file: server/etcdserver/raft.go
func (r *raftNode) start(rh *raftReadyHandler) {
...
go func() {
for {
select {
...
case rd := <-r.Ready():
...
ap := toApply{...}
select {
case r.applyc <- ap:
}
...
}()
}
raft 节点就绪写入数据到 r.applyc
后,在外面的 s.run()
方法的 select
中会从 <-s.r.apply()
中读出这条数据,并把这条数据交给调度器处理。
// file: server/etcdserver/raft.go
func (r *raftNode) apply() chan toApply {
return r.applyc
}
到这里,etcd 服务就已经作为一个 raft 节点启动起来了。等到开始处理 Put 操作的数据时,我们再继续往下看数据是如何被处理的。现在回到 StartEtcd()
中,看看启动实例之后的处理。
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
...
// 创建 EtcdServer 实例
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err
}
...
// 启动实例
e.Server.Start()
// 监听 raft 节点相关 url
if err = e.servePeers(); err != nil {
return e, err
}
// 监听客户端请求 url
if err = e.serveClients(); err != nil {
return e, err
}
...
}
servePeers()
主要做的就是注册 raft 节点相关的 http handler,开启 http 服务提供给其它 raft 节点调用(节点之间使用 http 通信),然后对之前创建的 listener 调用 Accept()
接收连接并处理。
// file: server/embed/etcd.go
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
// 创建 listener 时为 e.Peers 赋值
for _, p := range e.Peers {
u := p.Listener.Addr().String()
m := cmux.New(p.Listener)
srv := &http.Server{
Handler: ph,
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(io.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
// 定义 p.serve 方法
p.serve = func() error {
e.cfg.logger.Info(
"cmux::serve",
zap.String("address", u),
)
// 这里面调用 Accept()
return m.Serve()
}
...
}
...
for _, pl := range e.Peers {
// 开启新的 goroutine 调用 serve
go func(l *peerListener) {
u := l.Addr().String()
e.cfg.logger.Info(
"serving peer traffic",
zap.String("address", u),
)
e.errHandler(l.serve())
}(pl)
}
}
// file: server/etcdserver/api/etcdhttp/peer.go
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
}
func newPeerHandler(...) http.Handler {
...
mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
// rafthttp.RaftPrefix = "raft"
mux.Handle(rafthttp.RaftPrefix, raftHandler)
mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
...
return mux
}
serveClients()
则是向客户端请求提供了 http 接口和 grpc 服务。
// file: server/embed/etcd.go
func (e *Etcd) serveClients() (err error) {
...
// 创建 listener 时为 e.sctxs 赋值
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, gopts...))
}(sctx)
}
}
// file: server/embed/serve.go
func (sctx *serveCtx) serve(
s *etcdserver.EtcdServer,
tlsinfo *transport.TLSInfo,
handler http.Handler,
errHandler func(error),
gopts ...grpc.ServerOption) (err error) {
...
// 注册 rpc
gs = v3rpc.Server(s, nil, nil, gopts...)
...
go func() { errHandler(gs.Serve(grpcl)) }()
// http 接口相关
...
srvhttp := &http.Server{
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
go func() { errHandler(srvhttp.Serve(httpl)) }()
...
}
到这里,我们可以回收 etcdctl 结尾的部分,那里我们最终创建了一个 KVClient
,就是在 v3rpc.Server()
中注册了对应的 KVServer
,对于我们的 Put 示例操作来说,就是调用了 pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
注册,也就是 EtcdServer
实例本身实现了 KVServer
接口。
// file: server/etcdserver/api/v3rpc/grpc.go
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
...
grpcServer := grpc.NewServer(append(opts, gopts...)...)
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
...
}
迎接请求到来
经过前面一番折腾,我们知道了 EtcdServer
实例本身实现了 KVServer
的 rpc 接口。我们在客户端调用 Put 方法时,就会进入到 EtcdServer
的 Put rpc,下面是客户端调用 Put 的 rpc 方法。
// file: client/v3/kv.go
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, toErr(ctx, err)
}
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t {
...
case tPut:
var resp *pb.PutResponse
// 创建请求 Message
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
// 调用 KVClient.Put() 方法
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
...
}
return OpResponse{}, toErr(ctx, err)
}
下面是 EtcdServer
实际接收 Put 请求的方法。
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
if err != nil {
return nil, err
}
return resp.(*pb.PutResponse), nil
}
Put()
方法调用了 s.raftRequest()
-> s.raftRequestOnce()
-> s.processInternalRaftRequestOnce()
,从方法名可以看出来,Put 操作就是一个 raft 请求,它最终会同步到各个 raft 节点中去。
// file: server/etcdserver/v3_server.go
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
...
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
...
return resp.(*pb.PutResponse), nil
}
func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
return s.raftRequestOnce(ctx, r)
}
func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
result, err := s.processInternalRaftRequestOnce(ctx, r)
...
return result.Resp, nil
}
processInternalRaftRequestOnce()
方法为请求生成一个 ID 并序列化请求体,向 s.w
中注册一个请求,然后调用 raft 节点的 Propose()
方法将请求发送给其它 raft 节点并等待返回结果。我们在处理请求结果时会再提到 s.w.Register(id)
的作用。
// file: server/etcdserver/v3_server.go
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
...
r.Header = &pb.RequestHeader{
ID: s.reqIDGen.Next(),
}
data, err := r.Marshal()
// 向 EtcdServer 下的 wait.Wait 中注册一个请求
id := r.ID
ch := s.w.Register(id)
...
err = s.r.Propose(cctx, data)
select {
case x := <-ch:
return x.(*apply2.Result), nil
...
}
}
Propose()
是 etcd raft 库中的节点方法,它首先会将请求发送给当前 raft 集群中的 Leader 节点,由 Leader 节点统一进行处理,确保整个集群中的数据的一致性。Leader 节点收到请求后,会在本地执行请求,然后再调用 Propose()
将请求执行结果发送给所有的Follower 节点,等待它们的返回结果,确保整个集群中的数据都得到了更新。最后,Leader 节点将所有 Follower 节点的返回结果进行汇总,生成最终的执行结果(在这里是 apply2.Result
),并将结果返回给 Propose 方法的调用方。
处理请求数据
回顾下在前面服务启动执行 EtcdServer
的 run()
方法时,会执行 s.r.start()
,其中调用了 <-r.Ready()
等待 raft 节点准备就绪,当我们在 Put 操作调用 Propose()
之后,<-r.Ready()
会返回,然后使用 toApply
包装一层写入到 r.applyc
中,由 run()
方法中的 <-s.r.apply()
读取出来,最后交由调度器执行 s.applyAll()
方法。
// file: server/etcdserver/server.go
func (s *EtcdServer) run() {
s.r.start(rh)
for {
select {
case ap := <-s.r.apply():
// ap 即 toApply 结构
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
}
}
}
// file: server/etcdserver/raft.go
func (r *raftNode) start(rh *raftReadyHandler) {
go func() {
for {
select {
case rd := <-r.Ready():
ap := toApply{...}
select {
case r.applyc <- ap:
}
}
}()
}
raft 节点提交的请求会以 log entry 的形式发送到其它节点,在 s.applyAll()
中对这些 log entry 进行了处理,
// file: server/etcdserver/server.go
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
...
s.applyEntries(ep, apply)
...
}
func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
...
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
...
}
}
一个 Put 请求的 log entry 在 etcd 中属于 EntryNormal
类型,s.apply()
接着进入 s.applyEntryNormal()
流程中处理
// file: server/etcdserver/server.go
func (s *EtcdServer) apply(
es []raftpb.Entry,
confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
...
switch e.Type {
case raftpb.EntryNormal:
s.applyEntryNormal(&e)
...
}
...
}
applyEntryNormal()
中解析了 Propose()
传过来的请求体,然后调用 s.uberApply.Apply()
处理请求。
// file: server/etcdserver/server.go
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
var ar *apply.Result
...
var raftReq pb.InternalRaftRequest
// 解析 Propose() 传过来的请求体
if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
...
}
id := raftReq.ID
...
needResult := s.w.IsRegistered(id)
if needResult || !noSideEffect(&raftReq) {
...
// 处理请求
ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
}
...
if ar.Err != errors.ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
s.w.Trigger(id, ar)
return
}
}
还记得我们在调用 Propose()
之前,向 s.w
中注册了一个请求,s.w.Register(id)
返回一个 channel 并从这个 channel 中读取数据。在 applyEntryNormal()
处理请求时,会先调用 s.w.IsRegistered(id)
根据传入的请求 ID 判断是否注册了该请求,如果返回 true
则说明可能有一个地方正在等待这个请求的返回(如下面的 case x := <-ch
),处理完请求后,会调用 s.w.Trigger(id, ar)
将请求结果写入注册时返回的 channel 并关闭它,这时 case x := <-ch
可以读出数据并将请求结果返回。
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
...
// 向 EtcdServer 下的 wait.Wait 中注册一个请求
ch := s.w.Register(id)
err = s.r.Propose(cctx, data)
select {
// 等待请求返回
case x := <-ch:
return x.(*apply2.Result), nil
...
}
}
然后我们进入 s.uberApply.Apply()
看看是如何处理请求的。它调用了 UberApplier
接口实现下的 Apply()
方法。
// file: server/etcdserver/apply/uber_applier.go
func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
// We first execute chain of Apply() calls down the hierarchy:
// (i.e. CorruptApplier -> CappedApplier -> Auth -> Quota -> Backend),
// then dispatch() unpacks the request to a specific method (like Put),
// that gets executed down the hierarchy again:
// i.e. CorruptApplier.Put(CappedApplier.Put(...(BackendApplier.Put(...)))).
return a.applyV3.Apply(context.TODO(), r, shouldApplyV3, a.dispatch)
}
由于 applyV3.Apply()
的执行链比较长,这里就不一一细数了,根据 Debug 与官方注释的指引,在 uberApply.Apply()
中最终会执行到 a.dispatch
,它根据我们的 Put 操作类型,又执行到 a.applyV3.Put()
。
// file: server/etcdserver/apply/uber_applier.go
func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
...
switch {
...
case r.Put != nil:
op = "Put"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(ctx, nil, r.Put)
}
...
}
a.applyV3.Put()
最终会执行到 applierV3backend
结构体的 Put()
方法。弯弯绕绕一大圈,终于进入正式的数据处理环节了~从包名似乎可以大致窥见一些端倪,没错,就是多版本并发控制(mvcc)和事务(txn)。在创建 EtcdServer
实例的时候,创建了一个 MVCC 的 KV 存储 srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
,这里的 a.kv
就是 EtcdServer
下的 MVCC 存储。它提供了多版本控制、事务等功能。
// file: server/etcdserver/apply/apply.go
func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(ctx, a.lg, a.lessor, a.kv, txn, p)
}
etcd 的数据使用一个 Revision 作为数据的版本号, mvcctxn.Put()
创建了一个写事务写入数据并返回结果(Revision、数据历史记录等)。
// file: server/etcdserver/txn/txn.go
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
...
if txnWrite == nil {
...
// 创建写事务
txnWrite = kv.Write(trace)
defer txnWrite.End()
}
...
resp.Header.Revision = txnWrite.Put(p.Key, val, leaseID)
}
我们目前只关注 Put 操作是怎样被存储的,后面会专门写一篇事务的源码分析。这里我们首先提一下,etcd 使用 BoltDB 存储数据,它是一个嵌入式的键值存储数据库,支持 ACID 事务,BoltDB 中的 bucket
是一种用于组织和存储键值对的容器,每个 bucket
都可以包含任意数量的键值对,提交事务时,bucket
中的所有更改都会被写入磁盘。
txnWrite.Put()
最终会执行到 storeTxnWrite
结构体的 put()
方法,其会先将数据写到 bucket
中,然后调用 tw.s.kvindex.Put()
存储一些数据在内存里。这里留一个小疑问,后面解答:为什么最新的 Revision 需要加 1?对于初次赋值的 key 来说,Revision 值会是 2。
// file: server/storage/mvcc/kvstore_txn.go
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
// 最新的 Revision + 1
rev := tw.beginRev + 1
c := rev
...
// 创建一个 Revision 实例,changes 表示当前事务对这个 Revision 的修改次数
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
...
// 存入 bucket
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
// 数据存入内存 store
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
...
}
mvcc 的 store (tw.s.kvindex
)需要实现一个 index
接口,作为 mvcc 的存储引擎,treeIndex
结构体实现了这个接口,事务中调用的就是它的 Put()
方法。可以看到 treeIndex
是一个 B 树,这里使用了 Goole 实现的库 github.com/google/btre… 实际上这是一个 B+ 树,也就是说 etcd 在内存中是以 B+ 树结构存储数据的。
// file: server/storage/mvcc/index.go
type index interface {
Put(key []byte, rev revision)
...
}
type treeIndex struct {
sync.RWMutex
tree *btree.BTreeG[*keyIndex]
lg *zap.Logger
}
func (ti *treeIndex) Put(key []byte, rev revision) {...}
我们来看看 Put
方法实现,其创建了一个 keyIndex
结构作为 B+ 树的索引,根据这个索引从树中 Get
数据,没有数据的话,会先将 revision
实例加入到 keyIndex
结构中,然后调用 B+ 树的 ReplaceOrInsert()
方法将数据插入;如果已经存在数据,则只更新 keyIndex
的信息。而最新的 Revision 号之所以加 1,是因为 B+ 的根节点通常是一个空节点,而 Revision 又作为了索引的一部分。
在 B+ 树中,只存储了数据的 key 和 revision 等元数据,这是为了提高内存使用效率和存储性能以及更小的内存占用,实际的值,是存在事务的 bucket
中,使用 BoltDB 作为存储引擎,而 B+ 树则是用来索引 BoltDB 数据库中的键值对的数据结构。当客户端请求读取一个键值对时,首先会在 B+ 树中查找该键的位置,然后从 BoltDB 中读取对应的值。数据存入事务 bucket
中后,不会立马被提交,而是后台通过定时任务每个一段时间提交一次事务,即数据最终落盘。后期考虑专门开一篇 etcd 的存储讲。
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()
okeyi, ok := ti.tree.Get(keyi)
if !ok {
keyi.put(ti.lg, rev.main, rev.sub)
// 插入树中
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi.put(ti.lg, rev.main, rev.sub)
}
到这里,Put 操作的基本流程已经完成,请求结果会返回到 applyEntryNormal
函数中,拿到请求结果 ar
之后,会调用 s.w.Trigger(id, ar)
将请求结果推送到 Propose()
之前注册请求的 channel,然后关闭它,等待在这个请求上的节点返回。
// file: server/etcdserver/server.go
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
...
needResult := s.w.IsRegistered(id)
if needResult || !noSideEffect(&raftReq) {
...
// 处理请求
ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
}
...
if ar.Err != errors.ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
s.w.Trigger(id, ar)
return
}
}
// file: server/etcdserver/v3_server.go
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
...
// 向 EtcdServer 下的 wait.Wait 中注册一个请求
ch := s.w.Register(id)
err = s.r.Propose(cctx, data)
select {
// 等待请求返回
case x := <-ch:
return x.(*apply2.Result), nil
...
}
}
最后 processInternalRaftRequestOnce()
将结果返回到 raftRequestOnce()
-> raftRequest()
-> Put()
(EtcdServer),将 RPC 请求结果返回给客户端。
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
if err != nil {
return nil, err
}
return resp.(*pb.PutResponse), nil
}
总结
在这篇文章中,讲述了一个最简单的 Put 请求最基本的处理流程,这个流程同时也是 etcd 其它操作的基础。在 etcdctl 的处理中,会把用户的命令转成客户端的方法,随后客户端向 etcd 服务端发送一个 gRPC 请求。服务端实现了对应的 gRPC 服务提供给客户端,服务的启动包含了 raft 节点通信和客户端通信的 http 服务,服务本身即为一个 raft 节点,启动时等待其它节点发来的同步消息。
服务端接收到客户端的 Put 请求后会进行以下操作:
- 将键值对以 log entry 的形式发送给 raft 集群中其它节点(包括自身),等待节点处理。
- 节点收到数据,开始调用 mvcc 存储来处理数据。
- mvcc 存储将键值与 revision 等元数据作为索引存入 B+ 树,把实际的值写入到 BoltDB 事务的 bucket 中。
- 后台会定期将 BoltDB 的事务提交,数据持久化会到磁盘中。读取数据时会先根据从 B+ 树找出相关的元数据,再从磁盘中读取实际的值。
还有许多细节文中并没有展开地说,比如具体的 BoltDB 存储,但相信有这些基础的核心流程,需要研究哪一部分的细节时可以事半功倍。
转载自:https://juejin.cn/post/7253020765312860216