likes
comments
collection
share

Go 进阶 · 分布式爬虫实战day42-Master任务调度:服务发现与资源管理

作者站长头像
站长
· 阅读数 66

在上一节课程中,我们实现了 Master 的选主,这一节课,我们继续深入 Master 的开发,实现一下 Master 的服务发现与资源的管理。

Master 服务发现

首先我们来实现一下 Master 对 Worker 的服务发现。

Master 需要监听 Worker 节点的信息,感知到 Worker 节点的注册与销毁。和服务的注册一样,我们的服务发现也使用 micro 提供的 registry 功能,代码如下所示。

m.WatchWorker 方法调用 registry.Watch 监听 Worker 节点的变化,watch.Next() 会堵塞等待节点的下一个事件,当 Master 收到节点变化事件时,将事件发送到 workerNodeChange 通道。m.Campaign 方法接收到变化事件后,会用日志打印出变化的信息。


func (m *Master) Campaign() {
  ...
  workerNodeChange := m.WatchWorker()

  for {
    select {
    ...
    case resp := <-workerNodeChange:
      m.logger.Info("watch worker change", zap.Any("worker:", resp))
    }
  }
}

func (m *Master) WatchWorker() chan *registry.Result {
  watch, err := m.registry.Watch(registry.WatchService(worker.ServiceName))
  if err != nil {
    panic(err)
  }
  ch := make(chan *registry.Result)
  go func() {
    for {
      res, err := watch.Next()
      if err != nil {
        m.logger.Info("watch worker service failed", zap.Error(err))
        continue
      }
      ch <- res
    }
  }()
  return ch

}

Master 中的 etcd registry 对象是我们在初始化时注册到 go-micro 中的。

// cmd/master/master.go
reg := etcd.NewRegistry(registry.Addrs(sconfig.RegistryAddress))
master.New(
  masterID,
  master.WithLogger(logger.Named("master")),
  master.WithGRPCAddress(GRPCListenAddress),
  master.WithregistryURL(sconfig.RegistryAddress),
  master.WithRegistry(reg),
  master.WithSeeds(seeds),
)

深入 go-micro registry 接口

go-micro 提供的 registry 接口提供了诸多 API,其结构如下所示。

type Registry interface {
  Init(...Option) error
  Options() Options
  Register(*Service, ...RegisterOption) error
  Deregister(*Service, ...DeregisterOption) error
  GetService(string, ...GetOption) ([]*Service, error)
  ListServices(...ListOption) ([]*Service, error)
  Watch(...WatchOption) (Watcher, error)
  String() string
}

对于 Master 的服务发现,我们借助了 registry.Watch 方法。Watch 方法借助 client.Watch 实现了对特定 Key 的监听,并封装了 client.Watch 返回的结果。

func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
   return newEtcdWatcher(e, e.options.Timeout, opts...)
}

func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.WatchOption) (registry.Watcher, error) {
   var wo registry.WatchOptions
   for _, o := range opts {
      o(&wo)
   }
   watchPath := prefix
   if len(wo.Service) > 0 {
      watchPath = servicePath(wo.Service) + "/"
   }
   return &etcdWatcher{
      stop:    stop,
      w:       r.client.Watch(ctx, watchPath, clientv3.WithPrefix(), clientv3.WithPrevKV()),
      client:  r.client,
      timeout: timeout,
   }, nil
}

registry.Watch 方法返回了 Watcher 接口,Watcher 接口中有 Next 方法用于完成事件的迭代。

type Watcher interface {
   // Next 堵塞调用
   Next() (*Result, error)
   Stop()
}

go-micro 的 etcd 插件库实现的 Next 方法也比较简单,只要监听 client.Watch 返回的通道,并将事件信息封装后返回即可。

func (ew *etcdWatcher) Next() (*registry.Result, error) {
   for wresp := range ew.w {
      if wresp.Err() != nil {
         return nil, wresp.Err()
      }
      if wresp.Canceled {
         return nil, errors.New("could not get next")
      }
      for _, ev := range wresp.Events {
         service := decode(ev.Kv.Value)
         var action string
         switch ev.Type {
         case clientv3.EventTypePut:
            if ev.IsCreate() {
               action = "create"
            } else if ev.IsModify() {
               action = "update"
            }
         case clientv3.EventTypeDelete:
            action = "delete"
            // get service from prevKv
            service = decode(ev.PrevKv.Value)
         }
         if service == nil {
            continue
         }
         return &registry.Result{
            Action:  action,
            Service: service,
         }, nil
      }
   }
   return nil, errors.New("could not get next")
}

另外,Worker 节点也利用了 registry 接口的 Register 方法实现了服务的注册。如下所示,Register 方法最终调用了 clientv3 的 Put 方法,将包含节点信息的键值对写入了 etcd 中。

func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
  // register each node individually
  for _, node := range s.Nodes {
    err := e.registerNode(s, node, opts...)
    if err != nil {
      gerr = err
    }
  }
  return gerr
}

func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error {
  service := &registry.Service{
    Name:      s.Name,
    Version:   s.Version,
    Metadata:  s.Metadata,
    Endpoints: s.Endpoints,
    Nodes:     []*registry.Node{node},
  }
  ...
  // create an entry for the node
  if lgr != nil {
    _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID))
  } else {
    _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service))
  }
  if err != nil {
    return err
  }
}

现在让我们来看一看服务发现的效果。首先,启动 Master 服务。

» go run main.go master --id=2 --http=:8081  --grpc=:9091

接着启动 Worker 服务。

» go run main.go worker --id=2 --http=:8079  --grpc=:9089

Worker 启动后,在 Master 的日志中会看到变化的事件。其中,"Action":"create" 表明当前的事件为节点的注册。

{"level":"INFO","ts":"2022-12-12T16:55:42.798+0800","logger":"master","caller":"master/master.go:117","msg":"watch worker change","worker:":{"Action":"create","Service":{"name":"go.micro.server.worker","version":"latest","metadata":null,"endpoints":[{"name":"Greeter.Hello","request":{"name":"Request","type":"Request","values":[{"name":"name","type":"string","values":null}]},"response":{"name":"Response","type":"Response","values":[{"name":"greeting","type":"string","values":null}]},"metadata":{"endpoint":"Greeter.Hello","handler":"rpc","method":"POST","path":"/greeter/hello"}}],"nodes":[{"id":"go.micro.server.worker-2","address":"192.168.0.107:9089","metadata":{"broker":"http","protocol":"grpc","registry":"etcd","server":"grpc","transport":"grpc"}}]}}}

维护 Worker 节点信息

完成服务发现之后,让我们更进一步,维护 Worker 节点的信息。在 updateWorkNodes 函数中,我们利用 registry.GetService 方法获取当前集群中全量的 Worker 节点,并将它最新的状态保存起来。

func (m *Master) Campaign() {
  ...
  workerNodeChange := m.WatchWorker()

  for {
    select {
    ...
    case resp := <-workerNodeChange:
      m.logger.Info("watch worker change", zap.Any("worker:", resp))
    }
  }
}

type Master struct {
  ...
  workNodes map[string]*registry.Node
}

func (m *Master) updateWorkNodes() {
  services, err := m.registry.GetService(worker.ServiceName)
  if err != nil {
    m.logger.Error("get service", zap.Error(err))
  }

  nodes := make(map[string]*registry.Node)
  if len(services) > 0 {
    for _, spec := range services[0].Nodes {
      nodes[spec.Id] = spec
    }
  }

  added, deleted, changed := workNodeDiff(m.workNodes, nodes)
  m.logger.Sugar().Info("worker joined: ", added, ", leaved: ", deleted, ", changed: ", changed)

  m.workNodes = nodes

}

我们还可以使用 workNodeDiff 函数比较集群中新旧节点的变化。

func workNodeDiff(old map[string]*registry.Node, new map[string]*registry.Node) ([]string, []string, []string) {
  added := make([]string, 0)
  deleted := make([]string, 0)
  changed := make([]string, 0)
  for k, v := range new {
    if ov, ok := old[k]; ok {
      if !reflect.DeepEqual(v, ov) {
        changed = append(changed, k)
      }
    } else {
      added = append(added, k)
    }
  }
  for k := range old {
    if _, ok := new[k]; !ok {
      deleted = append(deleted, k)
    }
  }
  return added, deleted, changed
}

当节点发生变化时,可以打印出日志。

{"level":"INFO","ts":"2022-12-12T16:55:42.810+0800","logger":"master","caller":"master/master.go:187","msg":"worker joined: [go.micro.server.worker-2], leaved: [], changed: []"}
{"level":"INFO","ts":"2022-12-12T16:58:32.026+0800","logger":"master","caller":"master/master.go:187","msg":"worker joined: [], leaved: [go.micro.server.worker-2], changed: []"}

Master 资源管理

下一步,让我们来看看对爬虫任务的管理。

爬虫任务也可以理解为一种资源。和 Worker 一样,Master 中可以有一些初始化的爬虫任务存储在配置文件中。初始化时,程序通过读取配置文件将爬虫任务注入到 Master 中。这节课我们先将任务放置到配置文件中,下节课我们还会构建 Master 的 API 来完成任务的增删查改。

seeds := worker.ParseTaskConfig(logger, nil, nil, tcfg)
  master.New(
    masterID,
    master.WithLogger(logger.Named("master")),
    master.WithGRPCAddress(GRPCListenAddress),
    master.WithregistryURL(sconfig.RegistryAddress),
    master.WithRegistry(reg),
    master.WithSeeds(seeds),
  )

在初始化 Master 时,调用 m.AddSeed 函数完成资源的添加。m.AddSeed 会首先调用 etcdCli.Get 方法,查看当前任务是否已经写入到了 etcd 中。如果没有,则调用 m.AddResource 将任务存储到 etcd,存储在 etcd 中的任务的 Key 为 /resources/xxxx。

func (m *Master) AddSeed() {
  rs := make([]*ResourceSpec, 0, len(m.Seeds))
  for _, seed := range m.Seeds {
    resp, err := m.etcdCli.Get(context.Background(), getResourcePath(seed.Name), clientv3.WithSerializable())
    if err != nil {
      m.logger.Error("etcd get faiiled", zap.Error(err))
      continue
    }
    if len(resp.Kvs) == 0 {
      r := &ResourceSpec{
        Name: seed.Name,
      }
      rs = append(rs, r)
    }
  }

  m.AddResource(rs)
}

const (
  RESOURCEPATH = "/resources"
)

func getResourcePath(name string) string {
  return fmt.Sprintf("%s/%s", RESOURCEPATH, name)
}

在添加资源的时候,我们可以设置资源的 ID、创建时间等。在这里我借助了第三方库 Snowflake ,使用雪花算法来为资源生成了一个单调递增的分布式 ID。

func (m *Master) AddResource(rs []*ResourceSpec) {
  for _, r := range rs {
    r.ID = m.IDGen.Generate().String()
    ns, err := m.Assign(r)
    if err != nil {
      m.logger.Error("assign failed", zap.Error(err))
      continue
    }
    r.AssignedNode = ns.Id + "|" + ns.Address
    r.CreationTime = time.Now().UnixNano()
    m.logger.Debug("add resource", zap.Any("specs", r))

    _, err = m.etcdCli.Put(context.Background(), getResourcePath(r.Name), encode(r))
    if err != nil {
      m.logger.Error("put etcd failed", zap.Error(err))
      continue
    }
    m.resources[r.Name] = r
  }
}

Snowflake  利用雪花算法生成了一个 64 位的唯一 ID,其结构如下。


+--------------------------------------------------------------------------+
| 1 Bit Unused | 41 Bit Timestamp |  10 Bit NodeID  |   12 Bit Sequence ID |
+--------------------------------------------------------------------------+

其中,41 位用于存储时间戳;10 位用于存储 NodeID,在这里就是我们的 Master ID;最后 12 位为序列号。如果我们的程序打算在同一个毫秒内生成多个 ID,那么每生成一个新的 ID,序列号会递增 1,这意味着每个节点每毫秒最多能够产生 4096 个不同的 ID,这已经能满足我们当前的场景了。雪花算法确保了我们生成的资源 ID 是全局唯一的。

添加资源时,还有一步很重要,那就是调用 m.Assign 计算出当前的资源应该被分配到哪一个节点上。在这里,我们先用随机的方式选择一个节点,后面还会再优化调度逻辑。

func (m *Master) Assign(r *ResourceSpec) (*registry.Node, error) {
  for _, n := range m.workNodes {
    return n, nil
  }
  return nil, errors.New("no worker nodes")
}

设置好资源的 ID 信息、分配信息之后,调用 etcdCli.Put,将资源的 KV 信息存储到 etcd 中。其中,存储到 etcd 中的 Value 需要是 string 类型,所以我们书写了 JSON 的序列化与反序列化函数,用于存储信息的序列化和反序列化。

func encode(s *ResourceSpec) string {
  b, _ := json.Marshal(s)
  return string(b)
}

func decode(ds []byte) (*ResourceSpec, error) {
  var s *ResourceSpec
  err := json.Unmarshal(ds, &s)
  return s, err
}

最后一步,当 Master 成为新的 Leader 后,我们还要全量地获取一次 etcd 中当前最新的资源信息,并把它保存到内存中,核心逻辑位于 loadResource 函数中。


func (m *Master) BecomeLeader() error {
  if err := m.loadResource(); err != nil {
    return fmt.Errorf("loadResource failed:%w", err)
  }
  atomic.StoreInt32(&m.ready, 1)
  return nil
}

func (m *Master) loadResource() error {
  resp, err := m.etcdCli.Get(context.Background(), RESOURCEPATH, clientv3.WithSerializable())
  if err != nil {
    return fmt.Errorf("etcd get failed")
  }

  resources := make(map[string]*ResourceSpec)
  for _, kv := range resp.Kvs {
    r, err := decode(kv.Value)
    if err == nil && r != nil {
      resources[r.Name] = r
    }
  }
  m.logger.Info("leader init load resource", zap.Int("lenth", len(m.resources)))
  m.resources = resources
  return nil
}

验证 Master 资源分配结果

最后让我们实战验证一下 Master 的资源分配结果。首先我们需要启动 Worker。

要注意的是,如果先启动了 Master,初始的任务将会由于没有对应的 Worker 节点而添加失败。

» go run main.go worker --id=2 --http=:8079  --grpc=:9089

接着启动 Master 服务。

» go run main.go master --id=2 --http=:8081  --grpc=:9091

现在查看 etcd 的信息会发现,当前两个爬虫任务都已经设置到 etcd 中,并且 Master 为他们分配的 Worker 节点为"go.micro.server.worker-2|192.168.0.107:9089",说明 Master 的资源分配成功了。

» docker exec etcd-gcr-v3.5.6 /bin/sh -c "/usr/local/bin/etcdctl get --prefix /"                                                                           jackson@bogon
/micro/registry/go.micro.server.master/go.micro.server.master-2
{"name":"go.micro.server.master","version":"latest","metadata":null,"endpoints":[{"name":"Greeter.Hello","request":{"name":"Request","type":"Request","values":[{"name":"name","type":"string","values":null}]},"response":{"name":"Response","type":"Response","values":[{"name":"greeting","type":"string","values":null}]},"metadata":{"endpoint":"Greeter.Hello","handler":"rpc","method":"POST","path":"/greeter/hello"}}],"nodes":[{"id":"go.micro.server.master-2","address":"192.168.0.107:9091","metadata":{"broker":"http","protocol":"grpc","registry":"etcd","server":"grpc","transport":"grpc"}}]}

/micro/registry/go.micro.server.worker/go.micro.server.worker-2
{"name":"go.micro.server.worker","version":"latest","metadata":null,"endpoints":[{"name":"Greeter.Hello","request":{"name":"Request","type":"Request","values":[{"name":"name","type":"string","values":null}]},"response":{"name":"Response","type":"Response","values":[{"name":"greeting","type":"string","values":null}]},"metadata":{"endpoint":"Greeter.Hello","handler":"rpc","method":"POST","path":"/greeter/hello"}}],"nodes":[{"id":"go.micro.server.worker-2","address":"192.168.0.107:9089","metadata":{"broker":"http","protocol":"grpc","registry":"etcd","server":"grpc","transport":"grpc"}}]}

/resources/douban_book_list
{"ID":"1602250527540776960","Name":"douban_book_list","AssignedNode":"go.micro.server.worker-2|192.168.0.107:9089","CreationTime":1670841268798763000}

/resources/election/3f3584fc571ae9d0
master2-192.168.0.107:9091

/resources/xxx
{"ID":"1602250527570137088","Name":"xxx","AssignedNode":"go.micro.server.worker-2|192.168.0.107:9089","CreationTime":1670841268805921000}

总结

这节课。我们实现了 Master 的两个重要功能:服务发现与资源管理。

对于服务发现,我们借助了 micro registry 提供的接口,实现了节点的注册、发现和状态获取。micro 的 registry 接口是一个插件,这意味着我们可以轻松使用不同插件与不同的注册中心交互。在这里我们使用的仍然是 go-micro 的 etcd 插件,借助 etcd clientv3 的 API 实现了服务发现与注册的相关功能。

而对于资源管理,这节课我们为资源加上了必要的 ID 信息,我们使用了分布式的雪花算法来保证生成 ID 全局唯一。同时,我们用随机的方式为资源分配了其所属的 Worker 节点并验证了分配的效果。在下一节课程中,我们还会继续实现负载均衡的资源分配。

本文章来源于极客时间《Go 进阶 · 分布式爬虫实战》。

转载自:https://juejin.cn/post/7204809985548632101
评论
请登录