likes
comments
collection
share

使用Go从零实现一个Redis(五):实现AOF持久化和AOF重写

作者站长头像
站长
· 阅读数 15

传送门:使用Go从零实现一个Redis(四):实现pubsub发布订阅命令

代码地址:JaricY/miniRedis (github.com)

此代码参考了大佬的godis。godis是一个非常值得学习的项目!!!

大佬的博客 :Finley (cnblogs.com)

大佬的源码 :github.com/hdt3213/god…


什么是AOF持久化

Redis的AOF(Append-Only File) 是一种持久化机制,用于在Redis服务器运行期间记录所有写操作。

AOF文件是一个追加文件,其中包含Redis服务器执行的所有写操作

与传统的快照持久化方式不同,AOF将每个写命令追加到文件中,从而可以完整地恢复数据,即使Redis在写命令执行之间崩溃,也可以保证数据不会丢失。

使用AOF持久化机制可以提供更好的数据持久化和可靠性,但需要更多的磁盘空间和写入磁盘的操作,可能会对Redis的性能产生一定的影响。

具体来说,AOF文件包含以下内容:

  1. Redis的命令(包括参数),以文本格式保存在文件中。这些命令可以是写命令(如SET、INCR等)或删除命令(如DEL)。

  2. Redis事务的命令,以MULTI、EXEC、DISCARD、WATCH等命令的形式保存在文件中。

  3. AOF文件还包含一些特殊的命令,用于处理服务器状态的持久化,如BGREWRITEAOF、SELECT、FLUSHALL等。

需要注意的是,AOF文件包含的是Redis执行的所有写操作,而不是所有的操作。例如,如果使用READONLY命令来设置Redis为只读模式,那么这个命令不会被记录在AOF文件中。此外,AOF文件中的命令顺序与它们执行的顺序相同,这保证了数据的一致性和可靠性。

根据不同的业务场景,Redis提供了三种不同的持久化方式。

  1. 每秒钟同步(Every Second):这种方式是默认的AOF持久化方式。Redis每隔一秒钟将AOF缓冲区中的所有写命令同步到磁盘上的AOF文件中。这种方式可以提供很好的持久化能力,同时也保证了较高的性能。

  2. 每个写命令同步(Every Write):这种方式是将每个写命令都同步到磁盘上的AOF文件中。虽然这种方式可以提供最强的持久化能力,但是由于需要频繁地进行磁盘写入操作,所以会对性能产生很大的影响,不建议在高负载环境下使用。

  3. 不同步(No):这种方式是不对AOF文件进行持久化。即使在Redis进程退出或崩溃时,也不会对AOF文件进行任何操作。这种方式的优点是对性能影响最小,缺点是在发生故障时可能会丢失数据,不建议在生产环境中使用。

需要注意的是,如果你希望在Redis进程退出或崩溃时能够快速地恢复数据,建议将AOF持久化方式设置为每秒钟同步或每个写命令同步。这样可以保证在发生故障时不会丢失太多的数据。同时,你也可以通过使用Redis提供的备份和恢复工具来创建备份文件,以进一步增强数据的可靠性和安全性。

什么是AOF重写

随着Redis服务器运行时间的增长,AOF文件会变得越来越大,这会导致AOF文件的读写操作变慢,并占用更多的磁盘空间。

AOF重写可以通过创建一个新的AOF文件来解决这个问题,该文件包含与原始AOF文件相同的数据,但是经过了一定的优化,可以显著减小AOF文件的大小,并提高读写性能。

进行AOF重写有几个主要的原因:

  1. 减少AOF文件的大小:随着Redis服务器运行时间的增长,AOF文件会不断变大,这会导致AOF文件的读写操作变慢,并且占用更多的磁盘空间。通过进行AOF重写,可以创建一个更小、更紧凑的AOF文件,从而减少磁盘空间的使用和读写操作的延迟。
  2. 优化AOF文件的读写性能:AOF重写可以通过去除不必要的命令,从而减少Redis服务器在AOF文件中的读写操作。这将导致更快的写入速度和更快的AOF文件重放,从而提高Redis服务器的整体性能。
  3. 消除AOF文件中的错误或不一致的数据:AOF重写将创建一个新的AOF文件,其中只包含能够完全重现当前数据库状态的命令。这意味着所有已过时或无效的命令都将被删除,从而消除AOF文件中的任何错误或不一致的数据。

需要注意的是,AOF重写是一个耗时的操作,需要一定的时间和资源。在进行AOF重写之前,应该仔细考虑AOF文件的大小、Redis服务器的性能需求以及重写操作的影响。通常情况下,建议在闲置时间进行AOF重写操作,并且定期进行AOF重写,以保持Redis服务器的最佳性能。

提问:如果在进行AOF重写的时候,有写操作执行,那么该怎么记录?

在进行AOF重写期间,如果有新的写操作执行,Redis会将这些操作同时写入旧的AOF文件和内存中的数据库。这是因为Redis不希望在AOF重写期间丢失任何数据。

当AOF重写完成后,Redis会将新的AOF文件加载到内存中,并用新的AOF文件覆盖旧的AOF文件。这时,Redis会根据新的AOF文件重新构建数据库,并忽略旧的AOF文件中的写操作。

需要注意的是,重写期间的写操作会导致新的AOF文件与旧的AOF文件之间存在一些不一致的地方,这可能会影响AOF文件的正确性和可靠性。为了解决这个问题,Redis在重写期间使用了一个机制,称为AOF重写缓冲区(AOF Rewrite Buffer),用于缓存重写期间的写操作。这样,即使在重写期间有新的写操作发生,也不会影响重写操作的正确性。

AOF重写缓冲区是Redis在进行AOF重写期间使用的一种机制,它的作用是缓存重写期间的写操作,以避免重写期间的写操作丢失或对AOF文件的正确性产生影响。

AOF重写缓冲区可以看作是一个暂存区,Redis将在AOF重写期间发生的所有写操作都缓存在这个区域中。具体来说,当Redis在重写AOF文件时,它会同时将新的写操作写入旧的AOF文件和AOF重写缓冲区中。这样一来,即使在重写期间有新的写操作发生,也不会影响重写操作的正确性。

当AOF重写完成后,Redis会将AOF重写缓冲区中缓存的所有写操作一次性写入新的AOF文件中,从而保证新的AOF文件包含了所有的写操作。然后,Redis会将新的AOF文件加载到内存中,并用新的AOF文件覆盖旧的AOF文件。

通过使用AOF重写缓冲区,Redis可以确保在AOF重写期间不会丢失任何数据,并保证新的AOF文件包含了所有的写操作。同时,AOF重写缓冲区还可以减少重写操作对性能的影响,因为Redis可以将写操作缓存在内存中,而不是直接写入磁盘,从而提高重写操作的速度。

使用Go从零实现一个Redis(五):实现AOF持久化和AOF重写

实现AOF持久化

定义执行结构体

定义一个用于执行AOF持久化操作的执行者,执行者主要包括以下的属性:

  1. ctx 用于持久化的协程的上下文
  2. db 数据库引擎,表示需要执行的数据库所具备的方法
  3. aofFileName aofFile AOF文件名称和文件
  4. aofFsync 写入策略
  5. pausingAof锁,用于暂停AOF持久化
  6. currentDb 当前正在执行持久化的数据库编号

具体包括AOF重写的完整结构体如下(aof/aof.go):

type Persister struct { // 用于执行持久化的执行者
   ctx         context.Context          //持久化协程的上下文
   cancel      context.CancelFunc       // 持久化协程的取消函数
   db          database.DBEngine        // 数据库引擎
   tmpDBMaker  func() database.DBEngine // 临时的数据库创建函数,用于AOF重写的时候创建临时数据库
   aofChan     chan *payload            // 用于在持久化协程和Redis主协程之间传递任务的通道,一般用于在AOF重写的时候作为临时的重写缓冲区
   aofFile     *os.File                 // AOF文件。
   aofFilename string                   // AOF名。
   aofFsync    string                   // AOF写入策略(always/everysec/no)
   // 当aof任务完成并准备关闭时,aof goroutine将通过此通道向main goroutine发送消息。
   aofFinished chan struct{} // 持久化协程完成后通知Redis主协程的通道
   pausingAof sync.Mutex            // 锁,用于在AOF重写期间暂停AOF持久化
   currentDB  int                   // 当前正在使用的数据库编号
   listeners  map[Listener]struct{} // Redis事件监听器
   // reuse cmdLine buffer
   buffer []CmdLine // 命令缓冲区
}

执行AOF持久化操作

执行AOF持久化操作:

  1. 开启listenCmd方法并开始监听命令
  2. 利用管道实现协程之间的通信传递命令
  3. 当有一个命令需要被写入到AOF文件的时候,执行SaveCmdLine方法,传递进入当前的数据库以及命令内容

listenCmd()监听命令 (aof/aof.go)

func (persister *Persister) listenCmd() {
   // 监听命令
   for p := range persister.aofChan {
      persister.writeAof(p)
   }
   persister.aofFinished <- struct{}{}
}

SaveCmdLine()写入命令 (aof/aof.go)

func (persister *Persister) SaveCmdLine(dbIndex int, cmdLine CmdLine) {
   if persister.aofChan == nil {
      return
   }
   // FsyncAlways 策略表示每个命令都要进行AOF操作
   if persister.aofFsync == FsyncAlways {
      p := &payload{
         cmdLine: cmdLine,
         dbIndex: dbIndex,
      }
      persister.writeAof(p)
      return
   }
   persister.aofChan <- &payload{
      cmdLine: cmdLine,
      dbIndex: dbIndex,
   }
}

执行写入到AOF文件的过程:

  1. 清空缓冲区
  2. 开启锁避免其他协程暂停了AOF操作
  3. 修改需要执行AOF操作的数据库
  4. 判断当前命令是否需要被写入AOF文件,不需要则直接返回
  5. 创建一个命令格式的data,并将其存放入buffer中
  6. 将buffer中的数据写入到AOF文件中
  7. 执行监听器中的回调函数
  8. 同步刷新磁盘内容(如果持久化策略是Always)

writeAof()执行写入命令 (aof/aof.go)

func (persister *Persister) writeAof(p *payload) {
   persister.buffer = persister.buffer[:0] // 清空缓冲区以便后续复用
   persister.pausingAof.Lock()             // 防止其他协程暂停AOF操作
   defer persister.pausingAof.Unlock()
   // 确保当前所在的数据库是正确的
   if p.dbIndex != persister.currentDB {
      // 修改数据库
      selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))
      persister.buffer = append(persister.buffer, selectCmd)
      data := protocol.MakeMultiBulkReply(selectCmd).ToBytes()
      _, err := persister.aofFile.Write(data)
      if err != nil {
         logger.Warn(err)
         return 
      }
      persister.currentDB = p.dbIndex
   }

   // 保存命令
   data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes()
   persister.buffer = append(persister.buffer, p.cmdLine)
   _, err := persister.aofFile.Write(data)
   if err != nil {
      logger.Warn(err)
   }
   for listener := range persister.listeners {
      listener.Callback(persister.buffer)
   }
   if persister.aofFsync == FsyncAlways {
      _ = persister.aofFile.Sync() // 同步刷新到磁盘
   }
}

同步策略是每秒 (aof/aof.go)

func (persister *Persister) fsyncEverySecond() {
   // 创建一个计时器
   ticker := time.NewTicker(time.Second)
   go func() {
      for {
         select {
         case <-ticker.C:
            persister.pausingAof.Lock()
            // 每秒执行一次同步操作
            if err := persister.aofFile.Sync(); err != nil {
               logger.Errorf("fsync failed: %v", err)
            }
            persister.pausingAof.Unlock()
         case <-persister.ctx.Done():
            return
         }
      }
   }()
}

加载AOF文件

从AOF文件中加载数据到内存中的流程:

  1. 从文件中读取maxBytes长度的数据内容
  2. 创建一个新的客户端连接用于写入数据
  3. 将得到的命令进行判断,只写入需要的内容
  4. 如果存在修改数据库,则当前的AOF持久化数据库也要跟着执行

loadAof() (aof/aof.go)

// 将aof文件加载到内存
func (persister *Persister) LoadAof(maxBytes int) {
   aofChan := persister.aofChan // 备份aofChan
   persister.aofChan = nil
   defer func(aofChan chan *payload) {
      persister.aofChan = aofChan // 恢复aofchan
   }(aofChan)

   file, err := os.Open(persister.aofFilename)
   if err != nil {
      if _, ok := err.(*os.PathError); ok {
         return
      }
      logger.Warn(err)
      return
   }
   defer file.Close()

   var reader io.Reader
   if maxBytes > 0 { // 如果大于0说明只需要读取一部分
      reader = io.LimitReader(file, int64(maxBytes))
   } else {
      reader = file
   }

   // 返回的ch是Payload类型,存储服务器解析到的数据
   ch := parser.ParseStream(reader)
   fakeConn := connection.NewFakeConn() // 创建一个虚拟连接,只用于保存当前的 dbIndex
   for p := range ch {
      if p.Err != nil {
         if p.Err == io.EOF { // 如果是读到了文件末尾则退出
            break
         }
         logger.Error("parse error: " + p.Err.Error())
         continue
      }
      if p.Data == nil { // 读到了空命令
         logger.Error("empty payload")
         continue
      }
      r, ok := p.Data.(*protocol.MultiBulkReply) // 解析到MultiBulkReply
      if !ok {
         logger.Error("require multi bulk protocol")
         continue
      }
      ret := persister.db.Exec(fakeConn, r.Args)
      if protocol.IsErrorReply(ret) {
         logger.Error("exec err", string(ret.ToBytes()))
      }
      if strings.ToLower(string(r.Args[0])) == "select" {
         // execSelect success, here must be no error
         dbIndex, err := strconv.Atoi(string(r.Args[1]))
         if err == nil {
            persister.currentDB = dbIndex
         }
      }
   }
}

使用Go从零实现一个Redis(五):实现AOF持久化和AOF重写

实现AOF重写

若我们对键a赋值100次会在AOF文件中产生100条指令但只有最后一条指令是有效的,为了减少持久化文件的大小需要进行AOF重写以删除无用的指令。

RewriteCtx:用于保存AOF重写的上下文

type RewriteCtx struct {
   tmpFile  *os.File // 重写过程中创建的临时文件指针,用于写入重写后的AOF文件
   fileSize int64    // 当前AOF文件的大小,用于限制重写的最大长度
   dbIdx    int      // 重写过程中选择的数据库索引
}

执行重写前操作

  1. 使用锁暂停AOF持久化
  2. 刷新AOF文件以确保持久化完成
  3. 获取AOF文件大小并创建一个大小一样的临时AOF文件

startRewrite() (aof/rewrite.go)

func (persister *Persister) StartRewrite() (*RewriteCtx, error) {
   persister.pausingAof.Lock() 
   defer persister.pausingAof.Unlock()

   err := persister.aofFile.Sync() // 刷新之前的AOF文件使之确定持久化成功
   if err != nil {
      logger.Warn("fsync failed")
      return nil, err
   }

   // 获取当前的AOF文件大小
   fileInfo, _ := os.Stat(persister.aofFilename) // 获取文件信息
   filesize := fileInfo.Size()

   // 创建一个临时AOF文件
   file, err := ioutil.TempFile("", "*.aof")
   if err != nil {
      logger.Warn("tmp file create failed")
      return nil, err
   }
   return &RewriteCtx{
      tmpFile:  file,
      fileSize: filesize,
      dbIdx:    persister.currentDB,
   }, nil
}

执行重写操作

重写必须在固定不变的数据集上进行,不能直接使用内存中的数据。Redis 重写的实现方式是进行 fork 并在子进程中遍历数据库内的数据重新生成AOF文件。由于 golang 不支持 fork 操作,我们只能采用读取AOF文件生成副本的方式来代替fork。

在这过程中我们使用一个新的AOF文件,操作都是在新的AOF文件中进行,所以若 AOF 重写失败或被中断,AOF 文件仍然保持重写之前的状态。

  1. 将原有的AOF文件写入到Redis中,保证数据一致性
  2. 从Redis中读取内容,并将其写入到临时AOF文件中

DoRewrite() (aof/rewrite.go)

func (persister *Persister) DoRewrite(ctx *RewriteCtx) error {
   tmpFile := ctx.tmpFile

   // load aof tmpFile
   tmpAof := persister.newRewriteHandler()
   tmpAof.LoadAof(int(ctx.fileSize))

   // rewrite aof tmpFile
   for i := 0; i < config.Properties.Databases; i++ {
      // select db
      data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes()
      _, err := tmpFile.Write(data)
      if err != nil {
         return err
      }
      // dump db
      tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool {
         cmd := EntityToCmd(key, entity)
         if cmd != nil {
            _, _ = tmpFile.Write(cmd.ToBytes())
         }
         if expiration != nil {
            cmd := MakeExpireCmd(key, *expiration)
            if cmd != nil {
               _, _ = tmpFile.Write(cmd.ToBytes())
            }
         }
         return true
      })
   }
   return nil
}

执行重写后操作

该步骤最为复杂:

  1. 确保重写期间没有其他写操作,保证数据一致性。通过获取 persister.pausingAof 的锁来实现。
  2. 打开当前 AOF 文件(persister.aofFilename)以便复制数据。如果打开文件失败,则记录错误日志并返回。
  3. 通过 seek 函数将文件指针移到之前复制完成的位置。之前复制完成后,要在文件结尾加上 SELECT 语句。
  4. 修改 tmpFile 的 db index 为重写期间选择的数据库。将 SELECT 命令写入 tmpFile。
  5. 使用 io.Copy 函数将 AOF 文件中剩余的数据复制到 tmpFile 中。如果复制失败,则记录错误日志并返回。
  6. 关闭 tmpFile,获取其文件名(tmpFileName)并删除文件句柄。将重写完成的 AOF 文件(tmpFile)替换为当前 AOF 文件(persister.aofFilename)。如果替换失败,则记录错误日志并返回。
  7. 关闭当前 AOF 文件句柄(persister.aofFile),并使用 OpenFile 函数重新打开 AOF 文件以便进行进一步的写操作。
  8. 确保 AOF 文件与 persister.currentDB 具有相同的数据库索引。将 SELECT 命令写入 persister.aofFile。如果写入失败,则抛出 panic 异常。

这段代码实现了 AOF 重写过程的结束操作,确保了重写后的 AOF 文件与原始 AOF 文件中的所有数据都被正确地复制到了新的 AOF 文件中。

FinishRewrite() (aof/rewrite.go)

func (persister *Persister) FinishRewrite(ctx *RewriteCtx) {
   persister.pausingAof.Lock() // 确保重写期间没有其他写操作,保证数据一致性
   defer persister.pausingAof.Unlock()

   tmpFile := ctx.tmpFile

   src, err := os.Open(persister.aofFilename)
   if err != nil {
      logger.Error("open aofFilename failed: " + err.Error())
      return
   }
   defer func() {
      _ = src.Close()
   }()
   _, err = src.Seek(ctx.fileSize, 0) // seek函数用于修改文件指针的位置,在前面复制完成之后,要在结尾加上SELECT语句
   if err != nil {
      logger.Error("seek failed: " + err.Error())
      return
   }

   // 修改为之前未选择的数据库
   data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes()
   _, err = tmpFile.Write(data)
   if err != nil {
      logger.Error("tmp file rewrite failed: " + err.Error())
      return
   }

   // 将原有的AOF文件的剩余内容复制到tmpFIle中
   _, err = io.Copy(tmpFile, src)
   if err != nil {
      logger.Error("copy aof filed failed: " + err.Error())
      return
   }

   tmpFileName := tmpFile.Name()
   _ = tmpFile.Close()
   // 使用临时的文件代替AOF文件
   _ = persister.aofFile.Close()
   _ = os.Rename(tmpFileName, persister.aofFilename)

   // 修改当前的AOF文件并打开以便后续的操作
   aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
   if err != nil {
      panic(err)
   }
   persister.aofFile = aofFile

   // 确保当前的数据库和AOF文件中的是同一个数据库索引
   data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(persister.currentDB))).ToBytes()
   _, err = persister.aofFile.Write(data)
   if err != nil {
      panic(err)
   }
}

使用Go从零实现一个Redis(五):实现AOF持久化和AOF重写