用 go 实现 redis resp 协议解析器
redis 网络协议
Redis Serialization Protocol 是 Redis 的网络协议,简称 RESP 它是一种文本协议,基于 TCP 协议,用于 Redis 服务器和客户端之间的通信
RESP 协议的设计目标是简单、易于实现,同时保证高效的网络传输,它的设计思想是将数据序列化为文本协议,以便于网络传输,同时保证序列化和反序列化的效率
RESP 协议的数据类型主要有以下几种:
- 正常消息
- 错误消息
- 整数
- 多行字符串
- 数组
正常消息
简单字符串是一个以 + 开头的字符串,\r\n 结尾
例如:+OK\r\n 表示一个简单字符串,OK 是字符串的内容,\r\n 是字符串的结束符
错误消息
错误消息是一个以 - 开头的字符串,\r\n 结尾
例如:-ERR unknown command 'foobar'\r\n 表示一个错误消息,ERR unknown command 'foobar' 是错误消息的内容,\r\n 是字符串的结束符
整数
整数是一个以 : 开头的字符串,\r\n 结尾
例如::1000\r\n 表示一个整数,1000 是整数的内容,\r\n 是字符串的结束符
多行字符串
多行字符串是一个以 $ 开头的字符串,后面跟实际发送字节数,\r\n 结尾
例如:
$6\r\nfoobar\r\n表示一个多行字符串,6是字符串的长度,foobar是字符串的内容,\r\n是字符串的结束符$0\r\n\r\n表示一个空字符串$16\r\nfoobar\r\nfoobar\r\n表示一个多行字符串,14是字符串的长度,foobar\r\nfoobar是字符串的内容,\r\n是字符串的结束符$-1\r\n表示null
数组
数组是一个以 * 开头的字符串,后面跟实际数组元素个数,\r\n 结尾
例如:
SET key value:*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n表示一个数组,*3是数组的元素个数,$3是表示后面成员的长度,SET,key,value是数组的元素,\r\n是字符串的结束符
实现常量正常回复
conn 在 redis 协议层代表是一个 redis 的连接,所以我们抽象出一个 Connection 接口,定义在 interface/resp/conn.go 文件中
定义接口的主要目的是未来可能会有不同的 redis 客户端实现
type Connection interface {
Write([]byte) error // 写入数据
GetDBIndex() int // 获取 db 的
SelectDB(int) // 切换 redis 数据库
}
我们再来定义一个服务端回复客户端的消息的接口,定义在 interface/resp/reply.go 文件中,作用是把回复的内容转成字节,因为 tcp 协议传输的就是字节流
type Reply interface {
ToBytes() []byte // 把回复的内容转成字节
}
接口定义完之后,我们需要实现这两个接口相关的方法,定义在 resp/reply 目录中
首先来实现一些固定的回复,定义在 resp/reply/consts.go 文件中
比如:
- 回复
PONG - 回复
OK - 回复
null
ping
ping 指令标识:*1\r\n$4\r\nping\r\n
回复 PONG 实在用户发送 ping 指令的时候,redis 服务器回复的内容,我们会给用户的指令是 +PONG\r\n,实现如下:
// pong
type PongReply struct{}
var pongBytes = []byte("+PONG\r\n")
func MakePongReply() *PongReply {
return &PongReply{}
}
func (p *PongReply) ToBytes() []byte {
return pongBytes
}
ok
回复 OK 是在用户发送一些命令的时候,redis 服务器正常响应的内容,我们会给用户的指令是 +OK\r\n,实现如下:
// ok
type OkReply struct{}
var okBytes = []byte("+OK\r\n")
func MakeOkReply() *OkReply {
return &OkReply{}
}
func (r *OkReply) ToBytes() []byte {
return okBytes
}
nil
回复 nil 是在用户发送一些命令的时候,redis 服务器未找到对应的值,我们会给用户的指令是 $-1\r\n,实现如下:
// nil
type NullBulkReply struct{}
var nullBulkBytes = []byte("$-1\r\n")
func MakeNullBulkReply() *NullBulkReply {
return &NullBulkReply{}
}
func (n NullBulkReply) ToBytes() []byte {
return nullBulkBytes
}
实现常量异常回复
我们先定义一个异常回复的接口,定义在 interface/resp/reply.go 文件中
type ErrorReply interface {
Error() string
ToBytes() []byte
}
异常回复的实现定义在 resp/reply/error.go 文件中,主要实现两种异常回复:
- 未知错误
- 参数错误
未知错误
未知错误是用户发送的命令不在 redis 服务器支持的命令中,我们会给用户的指令是 -ERR unknown\r\n,实现如下:
// 未知错误
type UnknownErrReply struct{}
func MakeUnknownErrReply() *UnknownErrReply {
return &UnknownErrReply{}
}
var unknownErrBytes = []byte("-Err unknown\r\n")
func (u UnknownErrReply) Error() string {
return "Err unknown"
}
func (u UnknownErrReply) ToBytes() []byte {
return unknownErrBytes
}
参数错误
参数错误是用户在发送命令的时候,参数个数不对,我们会给用户的指令是 -ERR wrong number of arguments for 'command' command\r\n,实现如下:
// 参数错误
type ArgNumErrReply struct {
Cmd string
}
func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
return &ArgNumErrReply{Cmd: cmd}
}
func (r *ArgNumErrReply) Error() string {
return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}
func (r *ArgNumErrReply) ToBytes() []byte {
return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}
实现自定义回复
实现了常量回复和异常回复之后,我们再来实现自定义回复,定义在 resp/reply/reply.go 文件中
自定义错误回复可以更灵活的响应各种错误信息,主要有以下几种:
- 单个字符串回复
- 多个字符串回复
- 状态回复
- 数字回复
- 标准错误回复
单个字符串回复
在 redis 中,Bulk 代表一个块或者字符串的意思
比如要回复一个字符串 foobar,我们回复给用户的指令是 $6\r\nfoobar\r\n,所以 ToBytes 方法的实现如下:
type BulkReply struct {
Arg []byte
}
func MakeBulkReply(arg []byte) *BulkReply {
return &BulkReply{Arg: arg}
}
func (b *BulkReply) ToBytes() []byte {
// 如果是空,应该回复 $-1\r\n
if len(b.Arg) == 0 {
return nullBulkBytes
}
// $6\r\nfoobar\r\n
return []byte("$" + strconv.Itoa(len(b.Arg)) + CRLF + string(b.Arg) + CRLF)
}
多个字符串回复
多个字符串回复是一个数组,数组中的每个元素都是一个字符串
比如回复这个指令:*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
Args 中保存的是 [SET KEY VALUE] 的字节,我们需要取出数组中的每一项,拼接成字符串,最后回复给用户
拼接字符串使用 bytes.Buffer,因为 bytes.Buffer 是一个缓冲区,可以高效的拼接字符串
type MultiBulkReply struct {
Args [][]byte
}
func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
return &MultiBulkReply{Args: args}
}
func (r *MultiBulkReply) ToBytes() []byte {
argLen := len(r.Args)
var buf bytes.Buffer
// *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
// *3\r\n
buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
for _, arg := range r.Args {
// 如果是空,应该回复 $-1\r\n
if arg == nil {
buf.WriteString(string(nullBulkReplyBytes) + CRLF)
} else {
// $3\r\nSET\r\n
// $3\r\nkey\r\n
// $5\r\nvalue\r\n
buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
}
}
return buf.Bytes()
}
状态回复
状态回复是一个以 + 开头的字符串,\r\n 结尾
type StatusReply struct {
Status string
}
func MakeStatusReply(status string) *StatusReply {
return &StatusReply{Status: status}
}
func (r *StatusReply) ToBytes() []byte {
return []byte("+" + r.Status + CRLF)
}
数字回复
数字回复是一个以 : 开头的字符串,\r\n 结尾
type IntReply struct {
Code int64
}
func MakeIntReply(code int64) *IntReply {
return &IntReply{Code: code}
}
func (r *IntReply) ToBytes() []byte {
return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
}
标准错误回复
标准错误回复是一个以 - 开头的字符串,\r\n 结尾
type StandardErrReply struct {
Status string
}
func MakeErrReply(status string) *StandardErrReply {
return &StandardErrReply{Status: status}
}
func (r *StandardErrReply) ToBytes() []byte {
return []byte("-" + r.Status + CRLF)
}
func (r *StandardErrReply) Error() string {
return r.Status
}
实现解析器 ParseStream
接下来实现用户发过来的指令,如何解析成 RESP 协议的数据类型
ParseStream 函数定义在 resp/parse/parse.go 文件中
首先定义一个结构体 Payload,用来保存解析后的数据,因为用户发送的指令和回复给用户的指令格式一样,所以 Payload 中的 Data 类型可以使用 resp.Reply
type Payload struct {
Data resp.Reply
Err error
}
我们还要定义一个解析器的状态
readingMultiLine:正在解析的数据是单行还是多行expectedArgsCount:正在读取的指令有几个参数,比如SET KEY VALUE指令,解析出来是3个参数msgType:传过来的消息类型,比如+、-、:、$、*,也就是RESP协议的数据类型args:传过来的数据本身,比如SET KEY VALUE,解析出来是[SET KEY VALUE]的字节bulkLen:数据块的长度
type readState struct {
readingMultiLine bool // 正在解析的数据是单行还是多行
expectedArgsCount int // 正在读取的指令有几个参数
msgType byte // 传过来的消息类型
args [][]byte // 传过来的数据本身
bulkLen int64 // 数据块的长度
}
判断解析是否完成,通过判断 expectedArgsCount 和 args 的长度是否相等,同时要排除掉参数为 0 的情况
func (s *readState) finished() bool {
return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
}
ParseStream 函数是解析器的入口,解析器的核心是 parse0 函数
解析的过程应该是异步的,来一条指令解析一条指令,所以在 ParseStream 函数中调用 parse0 需要用 goroutine
ParseStream 函数接收一个 io.Reader 类型的参数(上层 tcp 协议读取的字节流是 io.Reader 类型),结果通过 chan 返回出去(异步返回,不会阻塞)
func ParseStream(reader io.Reader) <-chan *Payload {
ch := make(chan *Payload)
go parse0(reader, ch)
return ch
}
解析器最核心部分是 parse0 函数,定义在 resp/parse/parse.go 文件中
接收两个参数:
io.Reader:最上层tcp读取的字节流是io.Reader类型chan<- *Payload:解析应该是异步的,来一条指令解析一条指令是一个并发的过程,所以使用chan<- *Payload类型
func parse0(reader io.Reader, ch chan<- *Payload) {}
实现 readLine
在实现核心解析器 parse0 之前,我们先实现一个 readLine 函数,用来读取一行数据
readLine 接收两个参数:
*bufio.Reader:bufio.Reader是一个带缓冲的io.Reader,可以提高读取效率*readState:解析器的状态
返回三个值:
[]byte:读取的数据bool:有没有io错误error:错误本身,读取的数据有没问题
怎么读取一行数据呢?读取的数据,直到遇到 \r\n 算一行
但是这样读取数据会有问题
如果数据本身带有 \r\n,那么这个数据就还没有读完,这时候应该按照数据的长度读取数据,数据长度是由 $数字 决定的
所以就要分情况读取
- 如果数据是
\r\n结尾,直接读取 - 如果数据是
$数字,按照数字读取数据
这两种情况如何分别呢?用 state.bulkLen == 0,如果 bulkLen 是 0,说明没有预设的个数,直接读取数据
解析指令:*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
state.bulkLen == 0 解析的是 *3\r\n,$3\r\n 这些数据,而 SET\r\n,ke\r\ny\r\n 将会进入 else 分支中
// *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
var msg []byte
var err error
// 1. \r\n 切分
// state.bulkLen == 0,表示没有预设的个数,直接读取数据
if state.bulkLen == 0 {
// 读到 \n 结束
msg, err = bufReader.ReadBytes('\n')
if err != nil {
return nil, true, err
}
// \n 前面是不是 \r,如果不是,说明数据有问题
// 数据没有长度,说明数据也有问题
if len(msg) == 0 || msg[len(msg)-2] != '\r' {
return nil, false, errors.New("protocol error:" + string(msg))
}
} else {
// 2. 如果有 $数字,按照数字读取字符
// 读取 SET\r\n,ke\r\ny\r\n 这样的内容
// 所以这个长度是 set 长度+ \r\n 长度
msg = make([]byte, state.bulkLen+2)
// io.ReadFull 读取指定长度的数据,塞满 msg
_, err = io.ReadFull(bufReader, msg)
if err != nil {
return nil, true, nil
}
// 判断数据是否有问题
// 数据长度不对
// 倒数第一个字符是不是 \n,倒数第二个字符是不是 \r
if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
return nil, false, errors.New("protocol error:" + string(msg))
}
// 读完数据之后,把 bulkLen 设置为 0,下次读取数据的时候,就会直接读取数据
state.bulkLen = 0
}
return msg, false, nil
}
实现 parseMultiBulkHeader
readLine 函数作用只是将数据切出来,但数据本身的含义还不知道,所以我们需要一个函数来解析数据的含义
解析数据的含义分为两个部分:header 和 body,header 是数据的长度,
数据的长度有两种:
$4\r\nPING\r\n:$4*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n:*3
parseMultiBulkHeader 函数是处理 *3 这种数据的长度,parseBulkHeader 函数是处理 $4 这种数据的长度
parseMultiBulkHeader 函数接收两个参数:
[]byte:读取的数据*readState:解析器的状态
// *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
func parseMultiBulkHeader(msg []byte, state *readState) error {
var err error
var expectedLine uint64
// 将 $3\r\n 的 3 解析出来
expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {
return errors.New("protocol error:" + string(msg))
}
// 如果 expectedLine == 0,表示没有数据,直接返回
if expectedLine == 0 {
state.expectedArgsCount = 0
return nil
} else if expectedLine > 0 {
// 如果 expectedLine > 0,表示有数据,设置状态
// 设置数据的长度
state.msgType = msg[0]
// 设置读取的数据是多行
state.readingMultiLine = true
// 设置数据的个数
state.expectedArgsCount = int(expectedLine)
// 设置数据的长度
state.args = make([][]byte, 0, expectedLine)
return nil
} else {
return errors.New("protocol error:" + string(msg))
}
}
实现 parseBulkHeader
parseBulkHeader 函数和 parseMultiBulkHeader 函数类似,需要多处理一种 state.bulkLen == -1 的情况
parseBulkHeader 函数接收两个参数:
[]byte:读取的数据*readState:解析器的状态
// $4\r\nPING\r\n
func parseBulkHeader(msg []byte, state *readState) error {
var err error
state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
if err != nil {
return errors.New("protocol error:" + string(msg))
}
// 处理 $-1\r\n
if state.bulkLen == -1 {
return nil
} else if state.bulkLen > 0 {
state.msgType = msg[0]
// 这种情况也是读取多行数据
// $4\r\nPING\r\n,有两组 \r\n
state.readingMultiLine = true
// 只需要接收 ping 一个参数
state.expectedArgsCount = 1
state.args = make([][]byte, 0, 1)
return nil
} else {
return errors.New("protocol error:" + string(msg))
}
}
实现 parseSingleLineReply
客户端也有可能会发送 +OK,-ERR 这样的数据,所以我们也需要一个解析器来解析这些数据
parseSingleLineReply 可以一次性把一条指令解析完,所以它的返回值是 resp.Reply 类型
// +OK\r\n -ERR\r\n :5\r\n
func parseSingleLineReply(msg []byte) (resp.Reply, error) {
// 把 \r\n 去掉
str := strings.TrimSuffix(string(msg), "\r\n")
var result resp.Reply
// 拿到第一个字符,判断是什么类型的数据
switch msg[0] {
case '+':
// 拿到 + 后面的数据,返回一个状态回复
result = reply.MakeStatusReply(str[1:])
case '-':
// 拿到 - 后面的数据,返回一个标准错误回复
result = reply.MakeErrReply(str[1:])
case ':':
// 拿到 : 后面的数据,返回一个数字回复
val, err := strconv.ParseInt(str[1:], 10, 64)
if err != nil {
return nil, errors.New("protocol error:" + string(msg))
}
result = reply.MakeIntReply(val)
}
return result, nil
}
实现 readBody
readBody 函数是解析数据内容
数据的内容有两种:
$4\r\nPING\r\n:$4\r\n已经被解析了,剩余未解析的PING\r\n*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n:*3\r\n已经被解析了,剩余未解析的$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
// *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n,*3\r\n 已经被解析了,剩余未解析的 $3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
// $4\r\nPING\r\n PING\r\n,$4\r\n 已经被解析了,剩余未解析的 PING\r\n
func readBody(msg []byte, state *readState) error {
// 切掉 \r\n
line := msg[0 : len(msg)-2]
var err error
// $3 这样的指令
// 如果第一个字符是 $,说明后面是描述数据的长度
if line[0] == '$' {
// 拿到 $ 后面的数据,解析数据的长度
state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return errors.New("protocol error:" + string(msg))
}
// 如果数据长度 <= 0,表示数据为空
if state.bulkLen <= 0 {
// 往 state.args 中添加一个空数据
state.args = append(state.args, []byte{})
state.bulkLen = 0
}
} else {
// SET 这样的指令
state.args = append(state.args, line)
// 这里不需要设置 bulkLen,因为在上面那个分支里已经设置了
}
return nil
}
实现 parse0
实现完解析器需要的工具函数之后,我们再来实现核心解析器 parse0 函数
handler 会调用 ParseStream 函数,ParseStream 函数会返回出去一个 chan,parse0 会把解析的结果放到 chan 中
parse0 内部是个无限循环,不断的读取数据,解析数据,直到 io.EOF,表示数据读取完毕,跳出循环
func parser0(reader io.Reader, ch chan<- *Payload) {
for {
msg, ioErr, err := readLine(bufReader, &state)
// 读到 io.EOF,表示数据读取完毨,跳出 for 循环
// 如果是 io.EOF,ioErr 变量值为 true
if err != nil {
if ioErr {
ch <- &Payload{Err: err}
close(ch)
return
}
// 如果 redis 协议解析出错,返回一个错误回复即可,无需结束程序
ch <- &Payload{Err: err}
state = readState{}
continue
}
}
}
但是如果某一条数据解析出现错误,出现了 panic,那么整个程序就会挂掉,所以我们需要使用 recover 来捕获错误
func parser0(reader io.Reader, ch chan<- *Payload) {
defer func() {
if err := recover(); err != nil {
logger.Error(string(debug.Stack()))
}
}()
}
在处理完异常之后,就是处理正常的解析
正常解析数据需要判断是不是多行解析模式,根据 state.readingMultiLine 来判断
也就是说只有头信息不是多行,因为在 parseMultiBulkHeader 和 parseBulkHeader 两个解析头信息的函数中设置了 state.readingMultiLine = true
所以 state.readingMultiLine 为 true 的时候,是在解析 body,state.readingMultiLine 为 false 的时候,是在解析 header
// 解析 * 开头的指令
if msg[0] == '*' {
err := parseMultiBulkHeader(msg, &state)
if err != nil {
ch <- &Payload{Err: errors.New("protocol error:" + string(msg))}
state = readState{}
continue
}
if state.expectedArgsCount == 0 {
ch <- &Payload{Data: reply.MakeEmptyMultiBulkReply()}
state = readState{}
continue
}
} else if msg[0] == '$' {
// 解析 $ 开头的指令
err := parseBulkHeader(msg, &state)
if err != nil {
ch <- &Payload{Err: errors.New("protocol error:" + string(msg))}
state = readState{}
continue
}
if state.bulkLen == -1 {
ch <- &Payload{Data: reply.MakeNullBulkReply()}
state = readState{}
continue
}
} else {
// 其他指令,比如 +OK\r\n -ERR\r\n 这种
result, err := parseSingleLineReply(msg)
ch <- &Payload{
Data: result,
Err: err,
}
state = readState{}
continue
}
解析完 header 之后,就要解析 body 了,解析 body 就是 state.readingMultiLine 为 true 的情况
// 每一个循环都会进来,将 msg 交给 readBody 函数处理
// readBody 函数会将 msg 解析成一个个的数据,然后添加到 state.args 中
err := readBody(msg, &state)
if err != nil {
ch <- &Payload{Err: errors.New("protocol error:" + string(msg))}
state = readState{}
continue
}
// 当解析完成之后,将解析的数据返回给用户
if state.finished() {
var result resp.Reply
if state.msgType == '*' {
// * 表示要返回一个数组
result = reply.MakeMultiBulkReply(state.args)
} else if state.msgType == '$' {
// $ 表示要返回一个字符串
result = reply.MakeBulkReply(state.args[0])
}
// 将数据通过 chan 返回给上层
ch <- &Payload{
Data: result,
Err: err,
}
state = readState{}
}
实现 connection
redis 解析协议实现后,我们再来实现 connection
connection 表示 redis 连接之后的记录的每一个用户信息
type Connection struct {
conn net.Conn // 连接信息
waitingReply wait.Wait // 在关掉 server 之前,需要等待回复给用户的指令完成
mu sync.Mutex // 在操作 conn 的时候,需要加锁
selectedDB int // 现在用户在操作哪个 db
}
然后这个结构体需要实现 resp.Connection 接口
// 给用户回复消息
func (c *Connection) Write(bytes []byte) error {
// 如果没有数据,直接返回
if len(bytes) == 0 {
return nil
}
// 写入数据的时候,需要加锁
c.mu.Lock()
// 每次写入数据,等待回复的指令 +1
c.waitingReply.Add(1)
defer func() {
// 写入数据完成之后,等待回复的指令 -1
c.waitingReply.Done()
// 解锁
c.mu.Unlock()
}()
// 写入数据
_, err := c.conn.Write(bytes)
return err
}
// 用户正在使用哪个 db
func (c *Connection) GetDBIndex() int {
return c.selectedDB
}
// 用户选择自己想用的 db
func (c *Connection) SelectDB(dbNum int) {
c.selectedDB = dbNum
}
以及需要实现一个 Close 方法,用来关闭连接
// 关闭连接直接调用 conn.Close() 即可
// 但是在关闭连接之前,需要等待回复给用户的指令完成
func (c *Connection) Close() error {
c.waitingReply.WaitWithTimeout(10 * time.Second)
_ = c.conn.Close()
return nil
}
实现 RespHandler
tcp 服务器需要一个 handler 来处理用户的请求
它的用途是调用 ParseStream 函数,解析用户的指令,然后根据用户的指令,返回给用户相应的回复
type RespHandler struct {
activeConn sync.Map // 记录用户的连接
db databaseface.Database // 核心数据库,操作 kev value 相关的逻辑
closing atomic.Boolean // 是否关闭
}
结构体定义好之后,就要实现 tcp.Handler 接口
这个接口有两个方法:
Handle:处理用户的请求Close:关闭连接
Close
Close 方法是关闭连接,关闭连接的时候,需要关闭所有的连接,关闭数据库
这里要注意的是 activeConn 是 sync.Map 类型,sync.Map 类型是线程安全的,所以在关闭连接的时候,需要遍历 activeConn
遍历的方法是调用 activeConn.Range 方法,接收一个匿名函数,在这个函数中做关闭连接的操作,并且最后需要返回 true,表示继续遍历
func (r *RespHandler) Close() error {
logger.Info("handler shutting down")
// 将关闭连接的标志设置为 true
r.closing.Set(true)
// 使用 Range 遍历 sync.Map
// 接收一个匿名函数,在这个函数中做关闭连接的操作,并且最后需要返回 true,表示继续遍历
r.activeConn.Range(func(key, value any) bool {
// 需要将 key 断言成 *connection.Connection 类型
client := key.(*connection.Connection)
// 关闭连接
_ = client.Close()
// 返回 true,表示继续遍历,返回 false,表示停止遍历
return true
})
// 关闭数据库
r.db.Close()
return nil
}
Handle
在实现 Handle 方法之前,我们需要先实现一个 closeClient 方法,这个方法是关闭一个连接
它和 Close 方法的区别是,Close 方法是关闭所有的连接,而 closeClient 方法是关闭一个连接
它的用途是在连接出错的时候,关闭连接,释放资源
// 关闭一个客户端
func (r *RespHandler) closeClient(client *connection.Connection) {
// 关闭客户端
_ = client.Close()
// 关闭客户端后的善后工作
r.db.AfterClientClose(client)
// 从 sync.Map 中删除掉这个客户端
r.activeConn.Delete(client)
}
// 处理用户的指令
func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) {
//是不是在关闭中
if r.closing.Get() {
_ = conn.Close()
}
// 新建连接
client := connection.NewConn(conn)
// 将连接存储到 sync.Map 中
r.activeConn.Store(client, struct{}{})
// 调用 ParseStream 函数,解析用户的指令
// 通过 chan 拿到解析后的结果
ch := parser.ParseStream(conn)
for payload := range ch {
// 处理 payload 中的错误
if payload.Err != nil {
// io 错误
// io.EOF 表示已经解析到结尾了,连接可以正常关闭了
// io.ErrUnexpectedEOF 表示在预期的数据结束之前遇到了 EOF,可能发生了数据传输突然中断了
// use of close network connection 表示使用已经关闭的连接,这个时候也需要关闭连接
// 只要出现这三种情况,就需要关闭连接
if payload.Err == io.EOF || errors.Is(payload.Err, io.ErrUnexpectedEOF) || strings.Contains(payload.Err.Error(), "use of close network connection") {
r.closeClient(client)
logger.Infof("connection closed: " + client.RemoteAddr().String())
return
}
// 解析 resp 协议出错,MakeErrReply 作用是将错误信息前面加上 -ERR,后面加上 \r\n
errReply := reply.MakeErrReply(payload.Err.Error())
// 将解析 resp 协议出错的信息返回给用户
err := client.Write(errReply.ToBytes())
// 给用户写入数据可能会出错,如果出错,关闭连接
if err != nil {
r.closeClient(client)
logger.Infof("connection closed: " + client.RemoteAddr().String())
return
}
continue
}
// 没有数据
if payload.Data == nil {
continue
}
replyResult, ok := payload.Data.(*reply.MultiBulkReply)
if !ok {
logger.Infof("require multi bulk reply")
continue
}
// 调用数据库的 Exec 方法,执行用户的指令
result := r.db.Exec(client, replyResult.Args)
// 将执行结果返回给用户
if result != nil {
_ = client.Write(result.ToBytes())
} else {
// 如果执行结果为空,返回一个未知错误
_ = client.Write(unknownErrReplyBytes)
}
}
}
源码:
转载自:https://juejin.cn/post/7388106238368251954