likes
comments
collection
share

使用Go从零实现一个Redis(三):实现RESP协议解析器

作者站长头像
站长
· 阅读数 11

此代码参考了大佬的godis。godis是一个非常值得学习的项目!!!


在上一篇中已经实现了创建TCP服务器,TCP服务器用于客户端和服务端之间的通信交互。Redis客户端发送的命令是基于RESP协议的,所以在执行命令之前需要对命令进行解析,也就是从RESP协议中获取到我们需要的内容。

RESP协议是什么

RESP全称是Redis Serialization Protocol,是客户端和服务端在TCP协议上层采用的一种通讯标准方式。

RESP协议主要在以下几点做了折衷:

  • 简单的实现
  • 快速地被计算机解析
  • 简单得可以能被人工解析

RESP协议是在Redis1.2引入的,但是在Redis 2.0中,这就成为了与Redis服务器通讯的标准方式。

在这个统一协议里,发送给Redis服务端的所有参数都是二进制安全的。RESP实际上是一个支持以下数据类型的序列化协议。 RESP中,某些数据类型取决于第一个字节,并且每行以\r\n(CRLF)结束。

  • 单行回复/请求(单行字符串回复/请求),回复/请求的第一个字节将是“+”
  • 错误消息(单行字符串回复的另外展示形式),回复/请求的第一个字节将是“-”
  • 整型回复/请求(正整形数字回复/请求),回复/请求的第一个字节将是“:”
  • 批量回复/请求(多行字符串回复/请求),回复/请求的第一个字节将是“$”
  • 数组批量回复/请求(数组回复/请求),回复/请求的第一个字节将是“*”

例如在客户端中发送一个Get k1 命令,对应的RESP编码如下所示:

>*2\r\n
> $3\r\n
> Get\r\n
> $2\r\n
> k1\r\n

*2表示批量请求的数组中有两个分别是Get和K1。

$3表示批量字符串,长度是3,然后获取下一行中长度为3的字符串,也就是获取到了GET。

同理可得到k1。

同样的,在服务端中如果需要响应命令PING并返回PONG,可以使用如下的返回。

1.
>+PONG\r\n
2.
>*1\r\n
>$4\r\n
>PONG\r\n

解析RESP协议

解析只是用于解析出我们想要的命令内容,还没有做对命令响应的内容

解析RESP协议的主思路是:

  1. 读取出每一行,也就是以\n结尾的字符串。
  2. 解析读到的内容,如果是空行则不做处理
  3. 如果不是空行,则删除最后的\r\n,得到当前得到的行的内容。
  4. 读取这一行的第一个字符,判断其是属于哪一种
  5. 如果是+,则说明是一个简单的状态回复
  6. 如果是-,则说明这是一个错误信息
  7. 如果是:,则说明这是一个整数
  8. 如果是$,则说明这是单行二进制安全的字符串,传入parseBulkString中解析这行的内容
  9. 如果是*,说明这是多行的字符串,传入parseArray中继续解析每行的内容
  10. 如果都不是,则说明请求内容有错。

使用Go从零实现一个Redis(三):实现RESP协议解析器

redis/parser/parser.go

// 解析每行的内容
func parse0(rawReader io.Reader, ch chan<- *Payload) {
   // 发生异常,记录
   defer func() {
      if err := recover(); err != nil {
         logger.Error(err, string(debug.Stack()))
      }
   }()

   //net.conn 只存储客户端发送给服务端的数据
   reader := bufio.NewReader(rawReader)
   for {
      // 读取出一行,以\n结尾
      line, err := reader.ReadBytes('\n')

      //当读取错误的时候说明已经读取完成
      if err != nil {
         ch <- &Payload{Err: err}
         close(ch)
         return
      }
      length := len(line)

      // 说明是空行,不处理
      if length <= 2 || line[length-2] != '\r' {
         // there are some empty lines within replication traffic, ignore this error
         //protocolError(ch, "empty line")
         continue
      }
      // 从尾部删除\r\n之后的切片
      line = bytes.TrimSuffix(line, []byte{'\r', '\n'})
      switch line[0] { // 读取协议类型
      case '+':

         content := string(line[1:])
         ch <- &Payload{
            // 返回一个状态的回复
            Data: protocol.MakeStatusReply(content),
         }
      case '-':
         //说明是一个错误
         ch <- &Payload{
            Data: protocol.MakeErrReply(string(line[1:])),
         }
      case ':':
         // 说明是一个整数
         value, err := strconv.ParseInt(string(line[1:]), 10, 64)
         if err != nil {
            protocolError(ch, "illegal number "+string(line[1:]))
            continue
         }
         ch <- &Payload{
            Data: protocol.MakeIntReply(value),
         }
      case '$':
         // 单行的字符串,二进制安全的
         err = parseBulkString(line, reader, ch)
         if err != nil {
            ch <- &Payload{Err: err}
            close(ch)
            return
         }
      case '*':
         // 数组类型 line的当前行,剩下的交给parseArray处理,reader是得到的全部数据,ch写入内容的管道,
         err = parseArray(line, reader, ch)
         if err != nil {
            ch <- &Payload{Err: err}
            close(ch)
            return
         }
      default:
         args := bytes.Split(line, []byte{' '})
         ch <- &Payload{
            Data: protocol.MakeMultiBulkReply(args),
         }
      }
   }
}

解析单行命令的parseBulkString方法

解析单行命令的思路是:

  1. 获取$符之后的数字,也就是下一行的字符串长度
  2. 根据字符串长度获取下一行的内容
  3. 将内容写入到多行结果中(多行结果中只保存了单行的结果)。

redis/parser/parser.go

// parseBulkString 解析单行命令
func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {
   // 当前header只包含 $长度
   strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)
   if err != nil || strLen < -1 {
      protocolError(ch, "illegal bulk string header: "+string(header))
      return nil
   } else if strLen == -1 {
      ch <- &Payload{
         Data: protocol.MakeNullBulkReply(),
      }
      return nil
   }
   body := make([]byte, strLen+2)
   // 继续获取
   _, err = io.ReadFull(reader, body)
   if err != nil {
      return err
   }
   ch <- &Payload{
      Data: protocol.MakeBulkReply(body[:len(body)-2]),
   }
   return nil
}

解析多行命令的parseArray方法

解析多行命令的思路如下:

  1. 获取*之后的数字,也就是命令行的数量
  2. 遍历命令行,继续读命令的内容
  3. 对于每一行中判断开头是否是$,也判断其是否是一行有效的命令行
  4. 如果是当前行有效,则继续往后读这行对应的内容,思路与parseBulkString一样,并存储在一个切片中
  5. 通过切片生成一个解析结果。

redis/parser/parser.go

// 解析数组类型(*开头)
func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {
   /*
      *2\r\n
       $3\r\n
       get
       $2\r\n
       k1
   */

   //获取长度
   nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64)
   if err != nil || nStrs < 0 {
      protocolError(ch, "illegal array header "+string(header[1:]))
      return nil
   } else if nStrs == 0 {
      ch <- &Payload{
         Data: protocol.MakeEmptyMultiBulkReply(),
      }
      return nil
   }

   // nstrs表示共有多少行,lines用于存储命令参数,是一个切片,切片元素是字节数组
   lines := make([][]byte, 0, nStrs)
   // 处理每一行
   for i := int64(0); i < nStrs; i++ {
      var line []byte
      // 处理一行以\n结尾(在上个方法中已经读了一行,所以这里是第二行开始)
      // line = $3\r\n
      line, err = reader.ReadBytes('\n')
      if err != nil {
         return err
      }
      length := len(line)

      //判断是否是一行数据
      if length < 4 || line[length-2] != '\r' || line[0] != '$' {
         protocolError(ch, "illegal bulk string header "+string(line))
         break
      }
      // strlen = 3
      strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64)
      if err != nil || strLen < -1 {
         protocolError(ch, "illegal bulk string length "+string(line))
         break
      } else if strLen == -1 {
         lines = append(lines, []byte{})
      } else {
         //strlen+2 = 5
         body := make([]byte, strLen+2)
         //读取一行数据 读取了get
         _, err := io.ReadFull(reader, body)
         if err != nil {
            return err
         }
         lines = append(lines, body[:len(body)-2])
      }
   }
   ch <- &Payload{
      Data: protocol.MakeMultiBulkReply(lines),
   }
   return nil
}

对于一个请求命令,例如Get k1 ,在解析完之后就可以对数据库进行操作了。

解析步骤只是用于解析到我们需要的内容,但还并不对内容进行处理。

处理解析的结果

得到解析出来的结果后就可以对其进行处理了,处理的思路是这样的:

  1. 判断是否是错误的处理结果,如果是错误则执行对应的错误流程
  2. 如果是空结果,则不进行处理
  3. 如果是整数、错误、和状态,则暂时不进行处理
  4. 如果是多行结果(包括了单行结果),则执行与数据库的交互
  5. 得到交互结果写入到客户端中。

redis/server/server.go Handle()方法

// 解析完成后得到的管道
ch := parser.ParseStream(conn)

for payload := range ch {
   // 处理错误结果
   if payload.Err != nil {
      if payload.Err == io.EOF ||
         payload.Err == io.ErrUnexpectedEOF ||
         strings.Contains(payload.Err.Error(), "use of closed network connection") {
         // connection closed
         h.closeClient(client)
         logger.Info("connection closed: " + client.RemoteAddr().String())
         return
      }
      // protocol err
      errReply := protocol.MakeErrReply(payload.Err.Error())
      _, err := client.Write(errReply.ToBytes())
      if err != nil {
         h.closeClient(client)
         logger.Info("connection closed: " + client.RemoteAddr().String())
         return
      }
      continue
   }

   //处理空结果
   if payload.Data == nil {
      logger.Error("empty payload")
      continue
   }

   // 如果是多行结果,也就是*开头的
   r, ok := payload.Data.(*protocol.MultiBulkReply)
   if !ok {
      logger.Error("require multi bulk protocol")
      continue
   }

   // 执行参数
   result := h.db.Exec(client, r.Args)
   if result != nil {
      // 写回响应
      _, _ = client.Write(result.ToBytes())
   } else {
      _, _ = client.Write(unknownErrReplyBytes)
   }

Redis源码分析

在Redis源码中,RESP 协议解析的实现位于 src/networking.c 文件中。 当 Redis 接收到客户端发送的命令时,会先将请求数据读取到输入缓冲区中,然后调用 processInputBuffer() 函数进行解析。

使用Go从零实现一个Redis(三):实现RESP协议解析器

processInputBuffer() 函数首先会读取输入缓冲区中的第一个字符,判断它是 $*+-、或者是数字字符,根据不同的字符类型选择不同的解析方式:

  • 如果是 $,表示下一个字符串的长度,调用 processBulkString() 函数解析字符串。
  • 如果是 *,表示后面跟着的是一个数组,调用 processMultiBulkBuffer() 函数解析数组。
  • 如果是 +,表示是一个单行字符串,调用 processInlineBuffer() 函数解析字符串。
  • 如果是 -,表示是一个错误响应,调用 processInlineBuffer() 函数解析错误信息。
  • 如果是数字字符,表示是一个整数或者多条命令的数量,调用 processMultibulkLength() 函数解析数字。

这些解析函数会依次从输入缓冲区中读取数据,解析出对应的命令参数,并将它们保存到一个命令结构体中。当所有命令参数都解析完毕后,Redis 就会执行对应的命令处理函数。

使用Go从零实现一个Redis(三):实现RESP协议解析器

注意:这里只是对解析进行一个粗略的介绍,在Redis中RESP 协议的解析过程比较复杂,需要处理多种边界条件和错误情况,因此需要详细研究的朋友请自行查看源码。

下一篇传送门: