用 go 实现 Redis 集群
之前实现的 redis
是个单机版的 redis
,现在要实现一个 redis
集群
在单节点的 redis
中,如果无法满足需求了,比如:计算能力不足,吞吐量不够,数据量太大等等,这时候就需要搭建集群
集群搭建后,client
发来一条指令,client
是不知道要发给 redis
哪个节点的
client
只会把指令发送到它连接的那个节点,那个节点要识别出这条指令应该由哪个节点去处理
那它是根据什么规则确定这个指令应该去什么节点?
我们对 k1
做 hash
,得到的结果是 hash(k1)
,然后对 hash(k1)
做取模运算,得到的结果是 hash(k1) % N
,N
是节点的数量,这个结果就是 k1
应该去的节点
这时候如果增加一个节点的话,那么对 k1
做 hash
,hash
的结果还是没变,但是对这个结果取模得到的节点编号可能就会发生变化
这样子的话,每个节点中存储的内容重新分配节点号了,这就导致开销非常大
所以就引入的一致性 hash
的概念
在分布式系统中用的最多的一种技术是一致性哈希
我们这个项目使用一致性哈希的地方是用在集群的搭建上
一致性 hash
一致性 hash
解决的问题是在节点微小的增删时,数据的迁移比较方便
比如 hash
分布在一个环上,所有的数字都能在这个环上找到一个点
我们首先给节点 A
、B
、C
、D
找一个位置,也就是算一下节点 A
、B
、C
、D
的 hash
值
这时有一个 K1
,那我们怎么知道要存到哪个节点呢?
还是一样对 K1
做 hash
,得到的结果是 hash(K1)
,然后在环上找到 hash(K1)
对应的位置,这个位置在那个节点的后面,K1
就存储到哪个节点
这是在增加一个节点 E
,会发生什么呢?
它就会把节点 B
负责的一段 hash
分成了两份,从下面图中可以看到,其他节点没有任何变化,所以其他节点中存储的数据不需要迁移,只有节点 B
中存储的数据部分需要迁移
所以一致性 hash
解决的问题是在节点微小的增删时,数据的迁移比较方便
实现 NodeMap
我们把一致性 hash
的结构体定义为 NodeMap
这个结构体有三个参数:
hashFunc
:hash
函数nodeHash
:节点hash
值的切片,按顺序保存nodehashMap
:节点hash
值和节点名称的映射
type HashFunc func(data []byte) uint32
type NodeMap struct {
hashFunc HashFunc // hash 函数
nodeHashs []int // 节点 hash 值的切片,按顺序保存
nodehashMap map[int]string // 节点 hash 值和节点名称的映射
}
定义好 NodeMap
结构体后,我们需要实现一个 NewNodeMap
函数,用来初始化 NodeMap
结构体
这个函数接收一个 HashFunc
类型的参数,如果没有传入 HashFunc
参数,那么默认使用 crc32.ChecksumIEEE
函数
// hash 函数可以自定义
func NewNodeMap(fn HashFunc) *NodeMap {
m := &NodeMap{
hashFunc: fn,
nodehashMap: make(map[int]string),
}
if m.hashFunc == nil {
// 默认使用 crc32.ChecksumIEEE 函数
m.hashFunc = crc32.ChecksumIEEE
}
return m
}
实现一致性 hash
把一个新的节点加入到一致性 hash
的环中,由 AddNode
方法来完成
它接收一组节点切片作为参数,然后把传进来的节点切片中的每个节点的 hash
值计算出来,然后按顺序保存到 nodeHashs
中,同时把 hash
值和节点名称的映射保存到 nodehashMap
中
func (m *NodeMap) AddNode(keys ...string) {
// 遍历节点切片,计算节点的 hash 值
for _, key := range keys {
if key == "" {
continue
}
// 计算节点的 hash 值
hash := int(m.hashFunc([]byte(key)))
// 保存节点的 hash 值
m.nodeHashs = append(m.nodeHashs, hash)
// 保存节点的 hash 值和节点名称的映射
m.nodehashMap[hash] = key
}
// 对节点的 hash 值排序
sort.Ints(m.nodeHashs)
}
然后再来实现 k1
应该去哪个节点,由 PickNode
方法来完成
它接收一个 key
参数,对这个 key
做 hash
,然后在 nodeHashs
中找到第一个大于这个 hash
值的节点,如果没有找到,那么就返回第一个节点
// 传入一个 key,返回这个 key 应该去的节点
func (m *NodeMap) PickNode(key string) string {
// 如果节点为空,返回空字符串
if m.IsEmpty() {
return ""
}
// 计算 key 的 hash 值
hash := int(m.hashFunc([]byte(key)))
// 在 nodeHashs 中找到第一个大于 hash 的节点
idx := sort.Search(len(m.nodeHashs), func(i int) bool {
return m.nodeHashs[i] >= hash
})
// 如果没有找到,说明 key 的 hash 值比所有节点的 hash 值都大,那么就返回第一个节点
if idx == len(m.nodeHashs) {
idx = 0
}
// 返回节点名称
return m.nodehashMap[m.nodeHashs[idx]]
}
redis 集群
比如说集群有三个节点的话,每一个节点都有一个 standalone_database
和 cluster_database
,cluster_database
是不做具体的业务的,它只做数据的分发
比如说 client
发来一条指令,cluster_database
会将这条指令根据一致性 hash
算法分发到对应的节点,对应节点的 cluster_database
会做一个验证,验证是不是要存入我的 standalone_database
集群相关的逻辑会在根目录下新建目录 cluster
client
发来一个 k1: v1
,然后通过 cluster_database
判断要去那个 standalone_database
,就需要转发到这个节点,那如何实现这个转发呢?
一个思路就是节点 A
要作为节点 B
的客户端,所以 cluster_database
不光有服务端的功能,还需要有客户端的功能
还有一个问题是节点 A
作为节点 B
的客户端,那么它要维持节点 B
的几个连接呢?所以我们需要一个连接池
节点 A
就要维护一组连接池负责转发,需要一种池化的工具,go-commons-pool
就是 go
中一种比较好用的工具
我们先来定义一个 clusterDatabase
结构体
type ClusterDatabase struct {
self string // 记录自己的名称和地址
nodes []string // 集群节点的切片
peerPicker *consistenthash.NodeMap // 一致性 hash
peerConnection map[string]*pool.ObjectPool // 池化工具 -> map 的 key 是节点地址
db databaseface.Database // 集群层 -> StandaloneDatabase
}
初始化连接池
我们新建文件 cluster/cluster_pool.go
,定义一个 NewClusterPool
函数,用来初始化连接池
type connectionFactory struct {
Peer string
}
这个连接池可以自动的管理连接,就需要一个 factory
,这个 factory
会在连接池中创建连接
这个结构体主要的两个方法是:
MakeObject
:建立连接func (f connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) { // 初始化 tcp 客户端 c, err := client.MakeClient(f.Peer) if err != nil { return nil, err } // 启动 tcp 客户端 c.Start() // 返回连接池 return pool.NewPooledObject(c), nil }
DestroyObject
:销毁连接func (f connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error { c, ok := object.Object.(*client.Client) if !ok { return errors.New("type mismatch") } // 关闭 tcp 客户端 c.Close() return nil }
初始化 ClusterDatabase
连接池初始化之后,我们就可以初始化 ClusterDatabase
了
我们作为一个集群,怎么知道对方的节点的地址是啥
就需要在 redis.conf
中配置 cluster
的节点地址
self 127.0.0.1:3000 # 自己的地址
peers 127.0.0.1:6380 # 集群中其他节点的地址,多个节点用,分隔
MakeClusterDatabase
函数用来初始化 ClusterDatabase
func MakeClusterDatabase() *ClusterDatabase {
// 初始化 clusterDatabase
cluster := &ClusterDatabase{
self: config.Properties.Self, // 记录自己的名称和地址
db: database2.NewStandaloneDatabase(), // 初始化自生的 StandaloneDatabase
peerPicker: consistenthash.NewNodeMap(nil), // 初始化一致性 hash
peerConnection: make(map[string]*pool.ObjectPool), // 初始化连接池
}
// 添加节点,长度:peers + self
nodes := make([]string, 0, len(config.Properties.Peers)+1)
for _, peer := range config.Properties.Peers {
nodes = append(nodes, peer)
}
nodes = append(nodes, config.Properties.Self)
// 添加节点到一致性 hash 中
cluster.peerPicker.AddNode(nodes...)
ctx := context.Background()
// 初始化连接池
for _, peer := range config.Properties.Peers {
cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{
Peer: peer,
})
}
// 将 nodes 保存在 cluster 中
cluster.nodes = nodes
return cluster
}
操作连接池
假设 client
连接到第一个节点上,client
所有的操作指令都会发生到第一个节点上,那么不同的命令就有不同的处理方式,比如:
PING
:PING
这个指令的目的是测试redis
有没有连接上,所以这个指令和单节点是一样的,直接回复一个PONG
,和其他节点没有任何交互,只需要调用底层的standalong_database
的PING
方法GET/SET
:这种指令就需要转发到对应的节点上(用一致性hash
确定去哪个节点),第一个节点就会变成client
,向对应的节点转发用户发来的指令,对应的节点接收到指令后执行这条指令,然后返回结果给第一个节点,第一个节点再返回给用户FLUSHDB
:这种指令是要清空所有节点中的数据,节点A
就需要转发到所有的节点上,每个节点都会执行这条指令
如果我们要转发的给兄弟节点的话,就需要去 peerConnection
中找到对应的连接,给这个连接发送指令,然后在还回去
我们将这个通信的操作放在 com
目录中
getPeerClient
getPeerClient
方法用来获取节点的连接
func (cluster *ClusterDatabase) getPeerClient(peer string) (*client.Client, error) {
// 从 peerConnection 中获取连接
pool, ok := cluster.peerConnection[peer]
if !ok {
return nil, errors.New("connection not found")
}
// 从连接池中获取借用一个连接
object, err := pool.BorrowObject(context.Background())
if err != nil {
return nil, err
}
// 断言连接类型
c, ok := object.(*client.Client)
if !ok {
return nil, errors.New("wrong type")
}
// 返回连接
return c, err
}
returnPeerClient
returnPeerClient
方法用来归还连接
func (cluster *ClusterDatabase) returnPeerClient(peer string, peerClient *client.Client) error {
// 从 peerConnection 中获取连接
pool, ok := cluster.peerConnection[peer]
if !ok {
return errors.New("connection not found")
}
// 归还连接
return pool.ReturnObject(context.Background(), peerClient)
}
relay
relay
方法用来转发指令,接收三个参数:
peer
:节点名称c
:resp.Connection
,client
连接args
:指令
func (cluster *ClusterDatabase) relay(peer string, c resp.Connection, args [][]byte) resp.Reply {
// 如果 peer 是自己,直接执行指令
if peer == cluster.self {
return cluster.db.Exec(c, args)
}
// 获取节点的连接
peerClient, err := cluster.getPeerClient(peer)
if err != nil {
return reply.MakeErrReply(err.Error())
}
// 归还连接
defer func() {
_ = cluster.returnPeerClient(peer, peerClient)
}()
// 选择节点数据库
peerClient.Send(utils.ToCmdLine("SELECT", strconv.Itoa(c.GetDBIndex())))
// 发送指令
return peerClient.Send(args)
}
broadcast
broadcast
方法用来广播指令,接收两个参数:
c
:resp.Connection
,client
连接args
:指令
func (cluster *ClusterDatabase) broadcast(c resp.Connection, args [][]byte) map[string]resp.Reply {
results := make(map[string]resp.Reply)
// 遍历节点
for _, node := range cluster.nodes {
// 转发指令
result := cluster.relay(node, c, args)
results[node] = result
}
return results
}
实现 defaultFunc
PING
只在本地执行,GET/SET
需要转发到对应的节点上,FLUSHDB
需要广播到所有节点上,那他们的关系如何维护?
先定义一个转发执行函数的类型,接收三个参数:
cluster
:ClusterDatabase
实例c
:resp.Connection
,对应节点的连接cmdArgs
:指令
type CmdFunc func(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply
大部分指令都是走转发的,所以 defaultFunc
函数就是转发执行函数
// GET K1
// SET K1 V1
func defaultFunc(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
// 获取用户输入的 key
key := string(cmdArgs[1])
// 选择节点
peer := cluster.peerPicker.PickNode(key)
// 转发指令
return cluster.relay(peer, c, cmdArgs)
}
声明一个 router
,注册所有转发的指令
func makeRouter() map[string]CmdFunc {
routerMap := make(map[string]CmdFunc)
routerMap["exists"] = defaultFunc
routerMap["type"] = defaultFunc
routerMap["set"] = defaultFunc
routerMap["setnx"] = defaultFunc
routerMap["get"] = defaultFunc
routerMap["getset"] = defaultFunc
return routerMap
}
实现特殊模式的指令
PING
和SELECT
走本地RENAME
和RENAMENX
走转发FLUSHDB
和DEL
走广播
PING
PING
是在节点本地执行,不需要转发或者广播,只需要调用底层的 standalong_database
的 Exec
方法
func ping(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
return cluster.db.Exec(c, cmdArgs)
}
将 PING
注册到 router
中
func makeRouter() map[string]CmdFunc {
routerMap := make(map[string]CmdFunc)
routerMap["ping"] = ping
return routerMap
}
RENAME 和 RENAMENX
RENAME
和 REANMENX
功能相同
我们这里没有没有实现在不同节点的功能,所以 Key
一更改的话,就去去到其他的节点,所以要先判断一下这个 Key
是否在同一个节点,如果在就执行 RENAME
,否则就抛出错误
// rename k1 k2
func rename(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
// 参数校验
if len(cmdArgs) != 3 {
return reply.MakeErrReply("ERR wrong number args")
}
// 获取修改之前的名字
src := string(cmdArgs[1])
// 获取修改之后的名字
dest := string(cmdArgs[2])
// 根据 src 获取节点
srcPeer := cluster.peerPicker.PickNode(src)
// 根据 dest 获取节点
destPeer := cluster.peerPicker.PickNode(dest)
// 如果 srcPeer 和 destPeer 不一样,说明不在同一个节点
if srcPeer != destPeer {
return reply.MakeErrReply("ERR rename must within on peer")
}
// 在同一个节点,则转发指令
return cluster.relay(srcPeer, c, cmdArgs)
}
将 RENAME
和 RENAMENX
注册到 router
中
func makeRouter() map[string]CmdFunc {
routerMap := make(map[string]CmdFunc)
routerMap["rename"] = rename
routerMap["renamenx"] = rename
return routerMap
}
FLUSHDB
FLUSHDB
是要广播到所有节点上
func flushdb(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
// 广播指令
replies := cluster.broadcast(c, cmdArgs)
var errReply reply.ErrorReply
// 遍历所有节点的返回结果
for _, r := range replies {
// 只要有一个节点返回错误,就返回错误
if reply.IsErrReply(r) {
errReply = r.(reply.ErrorReply)
break
}
}
// 如果没有错误,返回 OK
if errReply != nil {
return reply.MakeOkReply()
}
// 如果有错误,返回错误
return reply.MakeErrReply("error: " + errReply.Error())
}
将 FLUSHDB
注册到 router
中
func makeRouter() map[string]CmdFunc {
routerMap := make(map[string]CmdFunc)
routerMap["flushdb"] = flushdb
return routerMap
}
DEL
DEL
是要广播到所有节点上,执行删除,最后返回删除的数量
// del k1 k2 k3 k4 ...
func del(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
// 广播指令
replies := cluster.broadcast(c, cmdArgs)
var errReply reply.ErrorReply
// 删除的数量, 默认为 0
var deleted int64
// 遍历所有节点的返回结果
for _, r := range replies {
// 如果有一个节点返回错误,就返回错误
if reply.IsErrReply(r) {
errReply = r.(reply.ErrorReply)
break
}
// 如果返回的是 IntReply,就累加
intReply, ok := r.(*reply.IntReply)
if !ok {
errReply = reply.MakeErrReply("error")
}
deleted += intReply.Code
}
// 如果没有错误,返回删除的数量
if errReply != nil {
return reply.MakeIntReply(deleted)
}
// 如果有错误,返回错误
return reply.MakeErrReply("error: " + errReply.Error())
}
将 DEL
注册到 router
中
func makeRouter() map[string]CmdFunc {
routerMap := make(map[string]CmdFunc)
routerMap["del"] = del
return routerMap
}
SELECT
SELECT
是在本地执行,不需要转发或者广播,只需要调用底层的 standalong_database
的 Exec
方法
func execSelect(cluster *ClusterDatabase, c resp.Connection, cmdArgs [][]byte) resp.Reply {
return cluster.db.Exec(c, cmdArgs)
}
将 SELECT
注册到 router
中
func makeRouter() map[string]CmdFunc {
routerMap := make(map[string]CmdFunc)
routerMap["select"] = execSelect
return routerMap
}
实现集群层的执行
clusterDatabase.Exec
方法用来执行指令
func (cluster *ClusterDatabase) Exec(client resp.Connection, args [][]byte) (result resp.Reply) {
// 恢复 panic
defer func() {
if err := recover(); err != nil {
logger.Error(err)
result = reply.MakeUnknownErrReply()
}
}()
// 拿到用户输入的指令
cmdName := strings.ToLower(string(args[0]))
// 从 router 中获取指令
cmdFunc, ok := router[cmdName]
if !ok {
return reply.MakeErrReply("not supported cmd")
}
// 执行指令
result = cmdFunc(cluster, client, args)
return
}
源码
转载自:https://juejin.cn/post/7392071348480294923