likes
comments
collection
share

用 go 实现 Redis 集群

作者站长头像
站长
· 阅读数 60

之前实现的 redis 是个单机版的 redis,现在要实现一个 redis 集群

在单节点的 redis 中,如果无法满足需求了,比如:计算能力不足,吞吐量不够,数据量太大等等,这时候就需要搭建集群

集群搭建后,client 发来一条指令,client 是不知道要发给 redis 哪个节点的

client 只会把指令发送到它连接的那个节点,那个节点要识别出这条指令应该由哪个节点去处理

那它是根据什么规则确定这个指令应该去什么节点?

我们对 k1hash,得到的结果是 hash(k1),然后对 hash(k1) 做取模运算,得到的结果是 hash(k1) % NN 是节点的数量,这个结果就是 k1 应该去的节点

用 go 实现 Redis 集群

这时候如果增加一个节点的话,那么对 k1hashhash 的结果还是没变,但是对这个结果取模得到的节点编号可能就会发生变化

用 go 实现 Redis 集群

这样子的话,每个节点中存储的内容重新分配节点号了,这就导致开销非常大

所以就引入的一致性 hash 的概念

在分布式系统中用的最多的一种技术是一致性哈希

我们这个项目使用一致性哈希的地方是用在集群的搭建上

一致性 hash

一致性 hash 解决的问题是在节点微小的增删时,数据的迁移比较方便

比如 hash 分布在一个环上,所有的数字都能在这个环上找到一个点

我们首先给节点 ABCD 找一个位置,也就是算一下节点 ABCDhash

用 go 实现 Redis 集群

这时有一个 K1,那我们怎么知道要存到哪个节点呢?

还是一样对 K1hash,得到的结果是 hash(K1),然后在环上找到 hash(K1) 对应的位置,这个位置在那个节点的后面,K1 就存储到哪个节点

用 go 实现 Redis 集群

这是在增加一个节点 E,会发生什么呢?

它就会把节点 B 负责的一段 hash 分成了两份,从下面图中可以看到,其他节点没有任何变化,所以其他节点中存储的数据不需要迁移,只有节点 B 中存储的数据部分需要迁移

用 go 实现 Redis 集群

所以一致性 hash 解决的问题是在节点微小的增删时,数据的迁移比较方便

实现 NodeMap

我们把一致性 hash 的结构体定义为 NodeMap

这个结构体有三个参数:

  • hashFunchash 函数
  • nodeHash:节点 hash 值的切片,按顺序保存
  • nodehashMap:节点 hash 值和节点名称的映射
type HashFunc func(data []byte) uint32
type NodeMap struct {
	hashFunc    HashFunc       // hash 函数
	nodeHashs   []int          // 节点 hash 值的切片,按顺序保存
	nodehashMap map[int]string // 节点 hash 值和节点名称的映射
}

定义好 NodeMap 结构体后,我们需要实现一个 NewNodeMap 函数,用来初始化 NodeMap 结构体

这个函数接收一个 HashFunc 类型的参数,如果没有传入 HashFunc 参数,那么默认使用 crc32.ChecksumIEEE 函数

// hash 函数可以自定义
func NewNodeMap(fn HashFunc) *NodeMap {
	m := &NodeMap{
		hashFunc:    fn,
		nodehashMap: make(map[int]string),
	}
	if m.hashFunc == nil {
    // 默认使用 crc32.ChecksumIEEE 函数
		m.hashFunc = crc32.ChecksumIEEE
	}
	return m
}

实现一致性 hash

把一个新的节点加入到一致性 hash 的环中,由 AddNode 方法来完成

它接收一组节点切片作为参数,然后把传进来的节点切片中的每个节点的 hash 值计算出来,然后按顺序保存到 nodeHashs 中,同时把 hash 值和节点名称的映射保存到 nodehashMap

func (m *NodeMap) AddNode(keys ...string) {
  // 遍历节点切片,计算节点的 hash 值
  for _, key := range keys {
    if key == "" {
      continue
    }
    // 计算节点的 hash 值
    hash := int(m.hashFunc([]byte(key)))
    // 保存节点的 hash 值
    m.nodeHashs = append(m.nodeHashs, hash)
    // 保存节点的 hash 值和节点名称的映射
    m.nodehashMap[hash] = key
  }
  // 对节点的 hash 值排序
  sort.Ints(m.nodeHashs)
}

然后再来实现 k1 应该去哪个节点,由 PickNode 方法来完成

它接收一个 key 参数,对这个 keyhash,然后在 nodeHashs 中找到第一个大于这个 hash 值的节点,如果没有找到,那么就返回第一个节点

// 传入一个 key,返回这个 key 应该去的节点
func (m *NodeMap) PickNode(key string) string {
  // 如果节点为空,返回空字符串
  if m.IsEmpty() {
    return ""
  }
  // 计算 key 的 hash 值
  hash := int(m.hashFunc([]byte(key)))
  // 在 nodeHashs 中找到第一个大于 hash 的节点
  idx := sort.Search(len(m.nodeHashs), func(i int) bool {
    return m.nodeHashs[i] >= hash
  })
  // 如果没有找到,说明 key 的 hash 值比所有节点的 hash 值都大,那么就返回第一个节点
  if idx == len(m.nodeHashs) {
    idx = 0
  }
  // 返回节点名称
  return m.nodehashMap[m.nodeHashs[idx]]
}

redis 集群

比如说集群有三个节点的话,每一个节点都有一个 standalone_databasecluster_databasecluster_database 是不做具体的业务的,它只做数据的分发

比如说 client 发来一条指令,cluster_database 会将这条指令根据一致性 hash 算法分发到对应的节点,对应节点的 cluster_database 会做一个验证,验证是不是要存入我的 standalone_database

用 go 实现 Redis 集群

集群相关的逻辑会在根目录下新建目录 cluster

client 发来一个 k1: v1,然后通过 cluster_database 判断要去那个 standalone_database,就需要转发到这个节点,那如何实现这个转发呢?

一个思路就是节点 A 要作为节点 B 的客户端,所以 cluster_database 不光有服务端的功能,还需要有客户端的功能

还有一个问题是节点 A 作为节点 B 的客户端,那么它要维持节点 B 的几个连接呢?所以我们需要一个连接池

节点 A 就要维护一组连接池负责转发,需要一种池化的工具,go-commons-pool 就是 go 中一种比较好用的工具

我们先来定义一个 clusterDatabase 结构体

type ClusterDatabase struct {
  self           string                      // 记录自己的名称和地址
  nodes          []string                    // 集群节点的切片
  peerPicker     *consistenthash.NodeMap     // 一致性 hash
  peerConnection map[string]*pool.ObjectPool // 池化工具 -> map 的 key 是节点地址
  db             databaseface.Database       // 集群层 -> StandaloneDatabase
}

初始化连接池

我们新建文件 cluster/cluster_pool.go,定义一个 NewClusterPool 函数,用来初始化连接池

type connectionFactory struct {
	Peer string
}

这个连接池可以自动的管理连接,就需要一个 factory,这个 factory 会在连接池中创建连接

这个结构体主要的两个方法是:

  • MakeObject:建立连接
    func (f connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
      // 初始化 tcp 客户端
      c, err := client.MakeClient(f.Peer)
      if err != nil {
        return nil, err
      }
      // 启动 tcp 客户端
      c.Start()
      // 返回连接池
      return pool.NewPooledObject(c), nil
    }
    
  • DestroyObject:销毁连接
    func (f connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
      c, ok := object.Object.(*client.Client)
      if !ok {
        return errors.New("type mismatch")
      }
      // 关闭 tcp 客户端
      c.Close()
      return nil
    }
    

初始化 ClusterDatabase

连接池初始化之后,我们就可以初始化 ClusterDatabase

我们作为一个集群,怎么知道对方的节点的地址是啥

就需要在 redis.conf 中配置 cluster 的节点地址

self 127.0.0.1:3000 # 自己的地址
peers 127.0.0.1:6380 # 集群中其他节点的地址,多个节点用,分隔

MakeClusterDatabase 函数用来初始化 ClusterDatabase

func MakeClusterDatabase() *ClusterDatabase {
  // 初始化 clusterDatabase
  cluster := &ClusterDatabase{
    self:           config.Properties.Self,             // 记录自己的名称和地址
    db:             database2.NewStandaloneDatabase(),  // 初始化自生的 StandaloneDatabase
    peerPicker:     consistenthash.NewNodeMap(nil),     // 初始化一致性 hash
    peerConnection: make(map[string]*pool.ObjectPool),  // 初始化连接池
  }
  // 添加节点,长度:peers + self
  nodes := make([]string, 0, len(config.Properties.Peers)+1)
  for _, peer := range config.Properties.Peers {
    nodes = append(nodes, peer)
  }
  nodes = append(nodes, config.Properties.Self)
  // 添加节点到一致性 hash 中
  cluster.peerPicker.AddNode(nodes...)
  ctx := context.Background()
  // 初始化连接池
  for _, peer := range config.Properties.Peers {
    cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{
      Peer: peer,
    })
  }
  // 将 nodes 保存在 cluster 中
  cluster.nodes = nodes
  return cluster
}

操作连接池

假设 client 连接到第一个节点上,client 所有的操作指令都会发生到第一个节点上,那么不同的命令就有不同的处理方式,比如:

  • PINGPING 这个指令的目的是测试 redis 有没有连接上,所以这个指令和单节点是一样的,直接回复一个 PONG,和其他节点没有任何交互,只需要调用底层的 standalong_databasePING 方法
  • GET/SET:这种指令就需要转发到对应的节点上(用一致性 hash 确定去哪个节点),第一个节点就会变成 client,向对应的节点转发用户发来的指令,对应的节点接收到指令后执行这条指令,然后返回结果给第一个节点,第一个节点再返回给用户
  • FLUSHDB:这种指令是要清空所有节点中的数据,节点 A 就需要转发到所有的节点上,每个节点都会执行这条指令

如果我们要转发的给兄弟节点的话,就需要去 peerConnection 中找到对应的连接,给这个连接发送指令,然后在还回去

我们将这个通信的操作放在 com 目录中

getPeerClient

getPeerClient 方法用来获取节点的连接

func (cluster *ClusterDatabase) getPeerClient(peer string) (*client.Client, error) {
  // 从 peerConnection 中获取连接
  pool, ok := cluster.peerConnection[peer]
  if !ok {
    return nil, errors.New("connection not found")
  }
  // 从连接池中获取借用一个连接
  object, err := pool.BorrowObject(context.Background())
  if err != nil {
    return nil, err
  }
  // 断言连接类型
  c, ok := object.(*client.Client)
  if !ok {
    return nil, errors.New("wrong type")
  }
  // 返回连接
  return c, err
}

returnPeerClient

returnPeerClient 方法用来归还连接

func (cluster *ClusterDatabase) returnPeerClient(peer string, peerClient *client.Client) error {
  // 从 peerConnection 中获取连接
  pool, ok := cluster.peerConnection[peer]
  if !ok {
    return errors.New("connection not found")
  }
  // 归还连接
  return pool.ReturnObject(context.Background(), peerClient)
}

relay

relay 方法用来转发指令,接收三个参数:

  • peer:节点名称
  • cresp.Connectionclient 连接
  • args:指令
func (cluster *ClusterDatabase) relay(peer string, c resp.Connection, args [][]byte) resp.Reply {
  // 如果 peer 是自己,直接执行指令
  if peer == cluster.self {
    return cluster.db.Exec(c, args)
  }
  // 获取节点的连接
  peerClient, err := cluster.getPeerClient(peer)
  if err != nil {
    return reply.MakeErrReply(err.Error())
  }
  // 归还连接
  defer func() {
    _ = cluster.returnPeerClient(peer, peerClient)
  }()
  // 选择节点数据库
  peerClient.Send(utils.ToCmdLine("SELECT", strconv.Itoa(c.GetDBIndex())))
  // 发送指令
  return peerClient.Send(args)
}

broadcast

broadcast 方法用来广播指令,接收两个参数:

  • cresp.Connectionclient 连接
  • args:指令
func (cluster *ClusterDatabase) broadcast(c resp.Connection, args [][]byte) map[string]resp.Reply {
  results := make(map[string]resp.Reply)
  // 遍历节点
  for _, node := range cluster.nodes {
    // 转发指令
    result := cluster.relay(node, c, args)
    results[node] = result
  }
  return results
}

实现 defaultFunc

PING 只在本地执行,GET/SET 需要转发到对应的节点上,FLUSHDB 需要广播到所有节点上,那他们的关系如何维护?

先定义一个转发执行函数的类型,接收三个参数:

  • clusterClusterDatabase 实例
  • cresp.Connection,对应节点的连接
  • cmdArgs:指令
type CmdFunc func(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply

大部分指令都是走转发的,所以 defaultFunc 函数就是转发执行函数

// GET K1
// SET K1 V1
func defaultFunc(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
  // 获取用户输入的 key
  key := string(cmdArgs[1])
  // 选择节点
  peer := cluster.peerPicker.PickNode(key)
  // 转发指令
  return cluster.relay(peer, c, cmdArgs)
}

声明一个 router,注册所有转发的指令

func makeRouter() map[string]CmdFunc {
  routerMap := make(map[string]CmdFunc)
  routerMap["exists"] = defaultFunc
  routerMap["type"] = defaultFunc
  routerMap["set"] = defaultFunc
  routerMap["setnx"] = defaultFunc
  routerMap["get"] = defaultFunc
  routerMap["getset"] = defaultFunc
  return routerMap
}

实现特殊模式的指令

  • PINGSELECT 走本地
  • RENAMERENAMENX 走转发
  • FLUSHDBDEL 走广播

PING

PING 是在节点本地执行,不需要转发或者广播,只需要调用底层的 standalong_databaseExec 方法

func ping(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
	return cluster.db.Exec(c, cmdArgs)
}

PING 注册到 router

func makeRouter() map[string]CmdFunc {
  routerMap := make(map[string]CmdFunc)
  routerMap["ping"] = ping
  return routerMap
}

RENAME 和 RENAMENX

RENAMEREANMENX 功能相同

我们这里没有没有实现在不同节点的功能,所以 Key 一更改的话,就去去到其他的节点,所以要先判断一下这个 Key 是否在同一个节点,如果在就执行 RENAME,否则就抛出错误

// rename k1 k2
func rename(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
  // 参数校验
  if len(cmdArgs) != 3 {
    return reply.MakeErrReply("ERR wrong number args")
  }
  // 获取修改之前的名字
  src := string(cmdArgs[1])
  // 获取修改之后的名字
  dest := string(cmdArgs[2])
  // 根据 src 获取节点
  srcPeer := cluster.peerPicker.PickNode(src)
  // 根据 dest 获取节点
  destPeer := cluster.peerPicker.PickNode(dest)
  // 如果 srcPeer 和 destPeer 不一样,说明不在同一个节点
  if srcPeer != destPeer {
    return reply.MakeErrReply("ERR rename must within on peer")
  }
  // 在同一个节点,则转发指令
  return cluster.relay(srcPeer, c, cmdArgs)
}

RENAMERENAMENX 注册到 router

func makeRouter() map[string]CmdFunc {
  routerMap := make(map[string]CmdFunc)
  routerMap["rename"] = rename
  routerMap["renamenx"] = rename
  return routerMap
}

FLUSHDB

FLUSHDB 是要广播到所有节点上

func flushdb(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
  // 广播指令
  replies := cluster.broadcast(c, cmdArgs)
  var errReply reply.ErrorReply
  // 遍历所有节点的返回结果
  for _, r := range replies {
    // 只要有一个节点返回错误,就返回错误
    if reply.IsErrReply(r) {
      errReply = r.(reply.ErrorReply)
      break
    }
  }
  // 如果没有错误,返回 OK
  if errReply != nil {
    return reply.MakeOkReply()
  }
  // 如果有错误,返回错误
  return reply.MakeErrReply("error: " + errReply.Error())
}

FLUSHDB 注册到 router

func makeRouter() map[string]CmdFunc {
  routerMap := make(map[string]CmdFunc)
  routerMap["flushdb"] = flushdb
  return routerMap
}

DEL

DEL 是要广播到所有节点上,执行删除,最后返回删除的数量

// del k1 k2 k3 k4 ...
func del(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
  // 广播指令
  replies := cluster.broadcast(c, cmdArgs)
  var errReply reply.ErrorReply
  // 删除的数量, 默认为 0
  var deleted int64
  // 遍历所有节点的返回结果
  for _, r := range replies {
    // 如果有一个节点返回错误,就返回错误
    if reply.IsErrReply(r) {
      errReply = r.(reply.ErrorReply)
      break
    }
    // 如果返回的是 IntReply,就累加
    intReply, ok := r.(*reply.IntReply)
    if !ok {
      errReply = reply.MakeErrReply("error")
    }
    deleted += intReply.Code
  }
  // 如果没有错误,返回删除的数量
  if errReply != nil {
    return reply.MakeIntReply(deleted)
  }
  // 如果有错误,返回错误
  return reply.MakeErrReply("error: " + errReply.Error())
}

DEL 注册到 router

func makeRouter() map[string]CmdFunc {
  routerMap := make(map[string]CmdFunc)
  routerMap["del"] = del
  return routerMap
}

SELECT

SELECT 是在本地执行,不需要转发或者广播,只需要调用底层的 standalong_databaseExec 方法

func execSelect(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
  return cluster.db.Exec(c, cmdArgs)
}

SELECT 注册到 router

func makeRouter() map[string]CmdFunc {
  routerMap := make(map[string]CmdFunc)
  routerMap["select"] = execSelect
  return routerMap
}

实现集群层的执行

clusterDatabase.Exec 方法用来执行指令

func (cluster *ClusterDatabase) Exec(client resp.Connection, args [][]byte) (result resp.Reply) {
  // 恢复 panic
  defer func() {
    if err := recover(); err != nil {
      logger.Error(err)
      result = reply.MakeUnknownErrReply()
    }
  }()
  // 拿到用户输入的指令
  cmdName := strings.ToLower(string(args[0]))
  // 从 router 中获取指令
  cmdFunc, ok := router[cmdName]
  if !ok {
    return reply.MakeErrReply("not supported cmd")
  }
  // 执行指令
  result = cmdFunc(cluster, client, args)
  return
}

源码

  1. cluster_database
  2. com
  3. cluster_pool
  4. router
转载自:https://juejin.cn/post/7392071348480294923
评论
请登录