使用Go从零实现一个Redis(三):实现RESP协议解析器
此代码参考了大佬的godis。godis是一个非常值得学习的项目!!!
在上一篇中已经实现了创建TCP服务器,TCP服务器用于客户端和服务端之间的通信交互。Redis客户端发送的命令是基于RESP协议的,所以在执行命令之前需要对命令进行解析,也就是从RESP协议中获取到我们需要的内容。
RESP协议是什么
RESP全称是Redis Serialization Protocol,是客户端和服务端在TCP协议上层采用的一种通讯标准方式。
RESP协议主要在以下几点做了折衷:
- 简单的实现
- 快速地被计算机解析
- 简单得可以能被人工解析
RESP协议是在Redis1.2引入的,但是在Redis 2.0中,这就成为了与Redis服务器通讯的标准方式。
在这个统一协议里,发送给Redis服务端的所有参数都是二进制安全的。RESP实际上是一个支持以下数据类型的序列化协议。 RESP中,某些数据类型取决于第一个字节,并且每行以\r\n(CRLF)结束。
- 单行回复/请求(单行字符串回复/请求),回复/请求的第一个字节将是“+”
- 错误消息(单行字符串回复的另外展示形式),回复/请求的第一个字节将是“-”
- 整型回复/请求(正整形数字回复/请求),回复/请求的第一个字节将是“:”
- 批量回复/请求(多行字符串回复/请求),回复/请求的第一个字节将是“$”
- 数组批量回复/请求(数组回复/请求),回复/请求的第一个字节将是“*”
例如在客户端中发送一个Get k1 命令,对应的RESP编码如下所示:
>*2\r\n
> $3\r\n
> Get\r\n
> $2\r\n
> k1\r\n
*2表示批量请求的数组中有两个分别是Get和K1。
$3表示批量字符串,长度是3,然后获取下一行中长度为3的字符串,也就是获取到了GET。
同理可得到k1。
同样的,在服务端中如果需要响应命令PING并返回PONG,可以使用如下的返回。
1.
>+PONG\r\n
2.
>*1\r\n
>$4\r\n
>PONG\r\n
解析RESP协议
解析只是用于解析出我们想要的命令内容,还没有做对命令响应的内容
解析RESP协议的主思路是:
- 读取出每一行,也就是以\n结尾的字符串。
- 解析读到的内容,如果是空行则不做处理
- 如果不是空行,则删除最后的\r\n,得到当前得到的行的内容。
- 读取这一行的第一个字符,判断其是属于哪一种
- 如果是+,则说明是一个简单的状态回复
- 如果是-,则说明这是一个错误信息
- 如果是:,则说明这是一个整数
- 如果是$,则说明这是单行二进制安全的字符串,传入parseBulkString中解析这行的内容
- 如果是*,说明这是多行的字符串,传入parseArray中继续解析每行的内容
- 如果都不是,则说明请求内容有错。
redis/parser/parser.go
// 解析每行的内容
func parse0(rawReader io.Reader, ch chan<- *Payload) {
// 发生异常,记录
defer func() {
if err := recover(); err != nil {
logger.Error(err, string(debug.Stack()))
}
}()
//net.conn 只存储客户端发送给服务端的数据
reader := bufio.NewReader(rawReader)
for {
// 读取出一行,以\n结尾
line, err := reader.ReadBytes('\n')
//当读取错误的时候说明已经读取完成
if err != nil {
ch <- &Payload{Err: err}
close(ch)
return
}
length := len(line)
// 说明是空行,不处理
if length <= 2 || line[length-2] != '\r' {
// there are some empty lines within replication traffic, ignore this error
//protocolError(ch, "empty line")
continue
}
// 从尾部删除\r\n之后的切片
line = bytes.TrimSuffix(line, []byte{'\r', '\n'})
switch line[0] { // 读取协议类型
case '+':
content := string(line[1:])
ch <- &Payload{
// 返回一个状态的回复
Data: protocol.MakeStatusReply(content),
}
case '-':
//说明是一个错误
ch <- &Payload{
Data: protocol.MakeErrReply(string(line[1:])),
}
case ':':
// 说明是一个整数
value, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
protocolError(ch, "illegal number "+string(line[1:]))
continue
}
ch <- &Payload{
Data: protocol.MakeIntReply(value),
}
case '$':
// 单行的字符串,二进制安全的
err = parseBulkString(line, reader, ch)
if err != nil {
ch <- &Payload{Err: err}
close(ch)
return
}
case '*':
// 数组类型 line的当前行,剩下的交给parseArray处理,reader是得到的全部数据,ch写入内容的管道,
err = parseArray(line, reader, ch)
if err != nil {
ch <- &Payload{Err: err}
close(ch)
return
}
default:
args := bytes.Split(line, []byte{' '})
ch <- &Payload{
Data: protocol.MakeMultiBulkReply(args),
}
}
}
}
解析单行命令的parseBulkString方法
解析单行命令的思路是:
- 获取$符之后的数字,也就是下一行的字符串长度
- 根据字符串长度获取下一行的内容
- 将内容写入到多行结果中(多行结果中只保存了单行的结果)。
redis/parser/parser.go
// parseBulkString 解析单行命令
func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {
// 当前header只包含 $长度
strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)
if err != nil || strLen < -1 {
protocolError(ch, "illegal bulk string header: "+string(header))
return nil
} else if strLen == -1 {
ch <- &Payload{
Data: protocol.MakeNullBulkReply(),
}
return nil
}
body := make([]byte, strLen+2)
// 继续获取
_, err = io.ReadFull(reader, body)
if err != nil {
return err
}
ch <- &Payload{
Data: protocol.MakeBulkReply(body[:len(body)-2]),
}
return nil
}
解析多行命令的parseArray方法
解析多行命令的思路如下:
- 获取*之后的数字,也就是命令行的数量
- 遍历命令行,继续读命令的内容
- 对于每一行中判断开头是否是$,也判断其是否是一行有效的命令行
- 如果是当前行有效,则继续往后读这行对应的内容,思路与parseBulkString一样,并存储在一个切片中
- 通过切片生成一个解析结果。
redis/parser/parser.go
// 解析数组类型(*开头)
func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {
/*
*2\r\n
$3\r\n
get
$2\r\n
k1
*/
//获取长度
nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64)
if err != nil || nStrs < 0 {
protocolError(ch, "illegal array header "+string(header[1:]))
return nil
} else if nStrs == 0 {
ch <- &Payload{
Data: protocol.MakeEmptyMultiBulkReply(),
}
return nil
}
// nstrs表示共有多少行,lines用于存储命令参数,是一个切片,切片元素是字节数组
lines := make([][]byte, 0, nStrs)
// 处理每一行
for i := int64(0); i < nStrs; i++ {
var line []byte
// 处理一行以\n结尾(在上个方法中已经读了一行,所以这里是第二行开始)
// line = $3\r\n
line, err = reader.ReadBytes('\n')
if err != nil {
return err
}
length := len(line)
//判断是否是一行数据
if length < 4 || line[length-2] != '\r' || line[0] != '$' {
protocolError(ch, "illegal bulk string header "+string(line))
break
}
// strlen = 3
strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64)
if err != nil || strLen < -1 {
protocolError(ch, "illegal bulk string length "+string(line))
break
} else if strLen == -1 {
lines = append(lines, []byte{})
} else {
//strlen+2 = 5
body := make([]byte, strLen+2)
//读取一行数据 读取了get
_, err := io.ReadFull(reader, body)
if err != nil {
return err
}
lines = append(lines, body[:len(body)-2])
}
}
ch <- &Payload{
Data: protocol.MakeMultiBulkReply(lines),
}
return nil
}
对于一个请求命令,例如Get k1 ,在解析完之后就可以对数据库进行操作了。
解析步骤只是用于解析到我们需要的内容,但还并不对内容进行处理。
处理解析的结果
得到解析出来的结果后就可以对其进行处理了,处理的思路是这样的:
- 判断是否是错误的处理结果,如果是错误则执行对应的错误流程
- 如果是空结果,则不进行处理
- 如果是整数、错误、和状态,则暂时不进行处理
- 如果是多行结果(包括了单行结果),则执行与数据库的交互
- 得到交互结果写入到客户端中。
redis/server/server.go Handle()方法
// 解析完成后得到的管道
ch := parser.ParseStream(conn)
for payload := range ch {
// 处理错误结果
if payload.Err != nil {
if payload.Err == io.EOF ||
payload.Err == io.ErrUnexpectedEOF ||
strings.Contains(payload.Err.Error(), "use of closed network connection") {
// connection closed
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr().String())
return
}
// protocol err
errReply := protocol.MakeErrReply(payload.Err.Error())
_, err := client.Write(errReply.ToBytes())
if err != nil {
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr().String())
return
}
continue
}
//处理空结果
if payload.Data == nil {
logger.Error("empty payload")
continue
}
// 如果是多行结果,也就是*开头的
r, ok := payload.Data.(*protocol.MultiBulkReply)
if !ok {
logger.Error("require multi bulk protocol")
continue
}
// 执行参数
result := h.db.Exec(client, r.Args)
if result != nil {
// 写回响应
_, _ = client.Write(result.ToBytes())
} else {
_, _ = client.Write(unknownErrReplyBytes)
}
Redis源码分析
在Redis源码中,RESP 协议解析的实现位于 src/networking.c
文件中。
当 Redis 接收到客户端发送的命令时,会先将请求数据读取到输入缓冲区中,然后调用 processInputBuffer()
函数进行解析。
processInputBuffer()
函数首先会读取输入缓冲区中的第一个字符,判断它是 $
、*
、+
、-
、或者是数字字符,根据不同的字符类型选择不同的解析方式:
- 如果是
$
,表示下一个字符串的长度,调用processBulkString()
函数解析字符串。 - 如果是
*
,表示后面跟着的是一个数组,调用processMultiBulkBuffer()
函数解析数组。 - 如果是
+
,表示是一个单行字符串,调用processInlineBuffer()
函数解析字符串。 - 如果是
-
,表示是一个错误响应,调用processInlineBuffer()
函数解析错误信息。 - 如果是数字字符,表示是一个整数或者多条命令的数量,调用
processMultibulkLength()
函数解析数字。
这些解析函数会依次从输入缓冲区中读取数据,解析出对应的命令参数,并将它们保存到一个命令结构体中。当所有命令参数都解析完毕后,Redis 就会执行对应的命令处理函数。
注意:这里只是对解析进行一个粗略的介绍,在Redis中RESP 协议的解析过程比较复杂,需要处理多种边界条件和错误情况,因此需要详细研究的朋友请自行查看源码。
下一篇传送门:
转载自:https://juejin.cn/post/7224205585125965884