使用Go从零实现一个Redis(五):实现AOF持久化和AOF重写
传送门:使用Go从零实现一个Redis(四):实现pubsub发布订阅命令
此代码参考了大佬的godis。godis是一个非常值得学习的项目!!!
大佬的博客 :Finley (cnblogs.com)
大佬的源码 :github.com/hdt3213/god…
什么是AOF持久化
Redis的AOF(Append-Only File) 是一种持久化机制,用于在Redis服务器运行期间记录所有写操作。
AOF文件是一个追加文件,其中包含Redis服务器执行的所有写操作。
与传统的快照持久化方式不同,AOF将每个写命令追加到文件中,从而可以完整地恢复数据,即使Redis在写命令执行之间崩溃,也可以保证数据不会丢失。
使用AOF持久化机制可以提供更好的数据持久化和可靠性,但需要更多的磁盘空间和写入磁盘的操作,可能会对Redis的性能产生一定的影响。
具体来说,AOF文件包含以下内容:
-
Redis的命令(包括参数),以文本格式保存在文件中。这些命令可以是写命令(如SET、INCR等)或删除命令(如DEL)。
-
Redis事务的命令,以MULTI、EXEC、DISCARD、WATCH等命令的形式保存在文件中。
-
AOF文件还包含一些特殊的命令,用于处理服务器状态的持久化,如BGREWRITEAOF、SELECT、FLUSHALL等。
需要注意的是,AOF文件包含的是Redis执行的所有写操作,而不是所有的操作。例如,如果使用READONLY命令来设置Redis为只读模式,那么这个命令不会被记录在AOF文件中。此外,AOF文件中的命令顺序与它们执行的顺序相同,这保证了数据的一致性和可靠性。
根据不同的业务场景,Redis提供了三种不同的持久化方式。
-
每秒钟同步(Every Second):这种方式是默认的AOF持久化方式。Redis每隔一秒钟将AOF缓冲区中的所有写命令同步到磁盘上的AOF文件中。这种方式可以提供很好的持久化能力,同时也保证了较高的性能。
-
每个写命令同步(Every Write):这种方式是将每个写命令都同步到磁盘上的AOF文件中。虽然这种方式可以提供最强的持久化能力,但是由于需要频繁地进行磁盘写入操作,所以会对性能产生很大的影响,不建议在高负载环境下使用。
-
不同步(No):这种方式是不对AOF文件进行持久化。即使在Redis进程退出或崩溃时,也不会对AOF文件进行任何操作。这种方式的优点是对性能影响最小,缺点是在发生故障时可能会丢失数据,不建议在生产环境中使用。
需要注意的是,如果你希望在Redis进程退出或崩溃时能够快速地恢复数据,建议将AOF持久化方式设置为每秒钟同步或每个写命令同步。这样可以保证在发生故障时不会丢失太多的数据。同时,你也可以通过使用Redis提供的备份和恢复工具来创建备份文件,以进一步增强数据的可靠性和安全性。
什么是AOF重写
随着Redis服务器运行时间的增长,AOF文件会变得越来越大,这会导致AOF文件的读写操作变慢,并占用更多的磁盘空间。
AOF重写可以通过创建一个新的AOF文件来解决这个问题,该文件包含与原始AOF文件相同的数据,但是经过了一定的优化,可以显著减小AOF文件的大小,并提高读写性能。
进行AOF重写有几个主要的原因:
- 减少AOF文件的大小:随着Redis服务器运行时间的增长,AOF文件会不断变大,这会导致AOF文件的读写操作变慢,并且占用更多的磁盘空间。通过进行AOF重写,可以创建一个更小、更紧凑的AOF文件,从而减少磁盘空间的使用和读写操作的延迟。
- 优化AOF文件的读写性能:AOF重写可以通过去除不必要的命令,从而减少Redis服务器在AOF文件中的读写操作。这将导致更快的写入速度和更快的AOF文件重放,从而提高Redis服务器的整体性能。
- 消除AOF文件中的错误或不一致的数据:AOF重写将创建一个新的AOF文件,其中只包含能够完全重现当前数据库状态的命令。这意味着所有已过时或无效的命令都将被删除,从而消除AOF文件中的任何错误或不一致的数据。
需要注意的是,AOF重写是一个耗时的操作,需要一定的时间和资源。在进行AOF重写之前,应该仔细考虑AOF文件的大小、Redis服务器的性能需求以及重写操作的影响。通常情况下,建议在闲置时间进行AOF重写操作,并且定期进行AOF重写,以保持Redis服务器的最佳性能。
提问:如果在进行AOF重写的时候,有写操作执行,那么该怎么记录?
在进行AOF重写期间,如果有新的写操作执行,Redis会将这些操作同时写入旧的AOF文件和内存中的数据库。这是因为Redis不希望在AOF重写期间丢失任何数据。
当AOF重写完成后,Redis会将新的AOF文件加载到内存中,并用新的AOF文件覆盖旧的AOF文件。这时,Redis会根据新的AOF文件重新构建数据库,并忽略旧的AOF文件中的写操作。
需要注意的是,重写期间的写操作会导致新的AOF文件与旧的AOF文件之间存在一些不一致的地方,这可能会影响AOF文件的正确性和可靠性。为了解决这个问题,Redis在重写期间使用了一个机制,称为AOF重写缓冲区(AOF Rewrite Buffer),用于缓存重写期间的写操作。这样,即使在重写期间有新的写操作发生,也不会影响重写操作的正确性。
AOF重写缓冲区是Redis在进行AOF重写期间使用的一种机制,它的作用是缓存重写期间的写操作,以避免重写期间的写操作丢失或对AOF文件的正确性产生影响。
AOF重写缓冲区可以看作是一个暂存区,Redis将在AOF重写期间发生的所有写操作都缓存在这个区域中。具体来说,当Redis在重写AOF文件时,它会同时将新的写操作写入旧的AOF文件和AOF重写缓冲区中。这样一来,即使在重写期间有新的写操作发生,也不会影响重写操作的正确性。
当AOF重写完成后,Redis会将AOF重写缓冲区中缓存的所有写操作一次性写入新的AOF文件中,从而保证新的AOF文件包含了所有的写操作。然后,Redis会将新的AOF文件加载到内存中,并用新的AOF文件覆盖旧的AOF文件。
通过使用AOF重写缓冲区,Redis可以确保在AOF重写期间不会丢失任何数据,并保证新的AOF文件包含了所有的写操作。同时,AOF重写缓冲区还可以减少重写操作对性能的影响,因为Redis可以将写操作缓存在内存中,而不是直接写入磁盘,从而提高重写操作的速度。
实现AOF持久化
定义执行结构体
定义一个用于执行AOF持久化操作的执行者,执行者主要包括以下的属性:
- ctx 用于持久化的协程的上下文
- db 数据库引擎,表示需要执行的数据库所具备的方法
- aofFileName aofFile AOF文件名称和文件
- aofFsync 写入策略
- pausingAof锁,用于暂停AOF持久化
- currentDb 当前正在执行持久化的数据库编号
具体包括AOF重写的完整结构体如下(aof/aof.go):
type Persister struct { // 用于执行持久化的执行者
ctx context.Context //持久化协程的上下文
cancel context.CancelFunc // 持久化协程的取消函数
db database.DBEngine // 数据库引擎
tmpDBMaker func() database.DBEngine // 临时的数据库创建函数,用于AOF重写的时候创建临时数据库
aofChan chan *payload // 用于在持久化协程和Redis主协程之间传递任务的通道,一般用于在AOF重写的时候作为临时的重写缓冲区
aofFile *os.File // AOF文件。
aofFilename string // AOF名。
aofFsync string // AOF写入策略(always/everysec/no)
// 当aof任务完成并准备关闭时,aof goroutine将通过此通道向main goroutine发送消息。
aofFinished chan struct{} // 持久化协程完成后通知Redis主协程的通道
pausingAof sync.Mutex // 锁,用于在AOF重写期间暂停AOF持久化
currentDB int // 当前正在使用的数据库编号
listeners map[Listener]struct{} // Redis事件监听器
// reuse cmdLine buffer
buffer []CmdLine // 命令缓冲区
}
执行AOF持久化操作
执行AOF持久化操作:
- 开启listenCmd方法并开始监听命令
- 利用管道实现协程之间的通信传递命令
- 当有一个命令需要被写入到AOF文件的时候,执行SaveCmdLine方法,传递进入当前的数据库以及命令内容
listenCmd()监听命令 (aof/aof.go)
func (persister *Persister) listenCmd() {
// 监听命令
for p := range persister.aofChan {
persister.writeAof(p)
}
persister.aofFinished <- struct{}{}
}
SaveCmdLine()写入命令 (aof/aof.go)
func (persister *Persister) SaveCmdLine(dbIndex int, cmdLine CmdLine) {
if persister.aofChan == nil {
return
}
// FsyncAlways 策略表示每个命令都要进行AOF操作
if persister.aofFsync == FsyncAlways {
p := &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
}
persister.writeAof(p)
return
}
persister.aofChan <- &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
}
}
执行写入到AOF文件的过程:
- 清空缓冲区
- 开启锁避免其他协程暂停了AOF操作
- 修改需要执行AOF操作的数据库
- 判断当前命令是否需要被写入AOF文件,不需要则直接返回
- 创建一个命令格式的data,并将其存放入buffer中
- 将buffer中的数据写入到AOF文件中
- 执行监听器中的回调函数
- 同步刷新磁盘内容(如果持久化策略是Always)
writeAof()执行写入命令 (aof/aof.go)
func (persister *Persister) writeAof(p *payload) {
persister.buffer = persister.buffer[:0] // 清空缓冲区以便后续复用
persister.pausingAof.Lock() // 防止其他协程暂停AOF操作
defer persister.pausingAof.Unlock()
// 确保当前所在的数据库是正确的
if p.dbIndex != persister.currentDB {
// 修改数据库
selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))
persister.buffer = append(persister.buffer, selectCmd)
data := protocol.MakeMultiBulkReply(selectCmd).ToBytes()
_, err := persister.aofFile.Write(data)
if err != nil {
logger.Warn(err)
return
}
persister.currentDB = p.dbIndex
}
// 保存命令
data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes()
persister.buffer = append(persister.buffer, p.cmdLine)
_, err := persister.aofFile.Write(data)
if err != nil {
logger.Warn(err)
}
for listener := range persister.listeners {
listener.Callback(persister.buffer)
}
if persister.aofFsync == FsyncAlways {
_ = persister.aofFile.Sync() // 同步刷新到磁盘
}
}
同步策略是每秒 (aof/aof.go)
func (persister *Persister) fsyncEverySecond() {
// 创建一个计时器
ticker := time.NewTicker(time.Second)
go func() {
for {
select {
case <-ticker.C:
persister.pausingAof.Lock()
// 每秒执行一次同步操作
if err := persister.aofFile.Sync(); err != nil {
logger.Errorf("fsync failed: %v", err)
}
persister.pausingAof.Unlock()
case <-persister.ctx.Done():
return
}
}
}()
}
加载AOF文件
从AOF文件中加载数据到内存中的流程:
- 从文件中读取maxBytes长度的数据内容
- 创建一个新的客户端连接用于写入数据
- 将得到的命令进行判断,只写入需要的内容
- 如果存在修改数据库,则当前的AOF持久化数据库也要跟着执行
loadAof() (aof/aof.go)
// 将aof文件加载到内存
func (persister *Persister) LoadAof(maxBytes int) {
aofChan := persister.aofChan // 备份aofChan
persister.aofChan = nil
defer func(aofChan chan *payload) {
persister.aofChan = aofChan // 恢复aofchan
}(aofChan)
file, err := os.Open(persister.aofFilename)
if err != nil {
if _, ok := err.(*os.PathError); ok {
return
}
logger.Warn(err)
return
}
defer file.Close()
var reader io.Reader
if maxBytes > 0 { // 如果大于0说明只需要读取一部分
reader = io.LimitReader(file, int64(maxBytes))
} else {
reader = file
}
// 返回的ch是Payload类型,存储服务器解析到的数据
ch := parser.ParseStream(reader)
fakeConn := connection.NewFakeConn() // 创建一个虚拟连接,只用于保存当前的 dbIndex
for p := range ch {
if p.Err != nil {
if p.Err == io.EOF { // 如果是读到了文件末尾则退出
break
}
logger.Error("parse error: " + p.Err.Error())
continue
}
if p.Data == nil { // 读到了空命令
logger.Error("empty payload")
continue
}
r, ok := p.Data.(*protocol.MultiBulkReply) // 解析到MultiBulkReply
if !ok {
logger.Error("require multi bulk protocol")
continue
}
ret := persister.db.Exec(fakeConn, r.Args)
if protocol.IsErrorReply(ret) {
logger.Error("exec err", string(ret.ToBytes()))
}
if strings.ToLower(string(r.Args[0])) == "select" {
// execSelect success, here must be no error
dbIndex, err := strconv.Atoi(string(r.Args[1]))
if err == nil {
persister.currentDB = dbIndex
}
}
}
}
实现AOF重写
若我们对键a赋值100次会在AOF文件中产生100条指令但只有最后一条指令是有效的,为了减少持久化文件的大小需要进行AOF重写以删除无用的指令。
RewriteCtx:用于保存AOF重写的上下文
type RewriteCtx struct {
tmpFile *os.File // 重写过程中创建的临时文件指针,用于写入重写后的AOF文件
fileSize int64 // 当前AOF文件的大小,用于限制重写的最大长度
dbIdx int // 重写过程中选择的数据库索引
}
执行重写前操作
- 使用锁暂停AOF持久化
- 刷新AOF文件以确保持久化完成
- 获取AOF文件大小并创建一个大小一样的临时AOF文件
startRewrite() (aof/rewrite.go)
func (persister *Persister) StartRewrite() (*RewriteCtx, error) {
persister.pausingAof.Lock()
defer persister.pausingAof.Unlock()
err := persister.aofFile.Sync() // 刷新之前的AOF文件使之确定持久化成功
if err != nil {
logger.Warn("fsync failed")
return nil, err
}
// 获取当前的AOF文件大小
fileInfo, _ := os.Stat(persister.aofFilename) // 获取文件信息
filesize := fileInfo.Size()
// 创建一个临时AOF文件
file, err := ioutil.TempFile("", "*.aof")
if err != nil {
logger.Warn("tmp file create failed")
return nil, err
}
return &RewriteCtx{
tmpFile: file,
fileSize: filesize,
dbIdx: persister.currentDB,
}, nil
}
执行重写操作
重写必须在固定不变的数据集上进行,不能直接使用内存中的数据。Redis 重写的实现方式是进行 fork 并在子进程中遍历数据库内的数据重新生成AOF文件。由于 golang 不支持 fork 操作,我们只能采用读取AOF文件生成副本的方式来代替fork。
在这过程中我们使用一个新的AOF文件,操作都是在新的AOF文件中进行,所以若 AOF 重写失败或被中断,AOF 文件仍然保持重写之前的状态。
- 将原有的AOF文件写入到Redis中,保证数据一致性
- 从Redis中读取内容,并将其写入到临时AOF文件中
DoRewrite() (aof/rewrite.go)
func (persister *Persister) DoRewrite(ctx *RewriteCtx) error {
tmpFile := ctx.tmpFile
// load aof tmpFile
tmpAof := persister.newRewriteHandler()
tmpAof.LoadAof(int(ctx.fileSize))
// rewrite aof tmpFile
for i := 0; i < config.Properties.Databases; i++ {
// select db
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes()
_, err := tmpFile.Write(data)
if err != nil {
return err
}
// dump db
tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool {
cmd := EntityToCmd(key, entity)
if cmd != nil {
_, _ = tmpFile.Write(cmd.ToBytes())
}
if expiration != nil {
cmd := MakeExpireCmd(key, *expiration)
if cmd != nil {
_, _ = tmpFile.Write(cmd.ToBytes())
}
}
return true
})
}
return nil
}
执行重写后操作
该步骤最为复杂:
- 确保重写期间没有其他写操作,保证数据一致性。通过获取 persister.pausingAof 的锁来实现。
- 打开当前 AOF 文件(persister.aofFilename)以便复制数据。如果打开文件失败,则记录错误日志并返回。
- 通过 seek 函数将文件指针移到之前复制完成的位置。之前复制完成后,要在文件结尾加上 SELECT 语句。
- 修改 tmpFile 的 db index 为重写期间选择的数据库。将 SELECT 命令写入 tmpFile。
- 使用 io.Copy 函数将 AOF 文件中剩余的数据复制到 tmpFile 中。如果复制失败,则记录错误日志并返回。
- 关闭 tmpFile,获取其文件名(tmpFileName)并删除文件句柄。将重写完成的 AOF 文件(tmpFile)替换为当前 AOF 文件(persister.aofFilename)。如果替换失败,则记录错误日志并返回。
- 关闭当前 AOF 文件句柄(persister.aofFile),并使用 OpenFile 函数重新打开 AOF 文件以便进行进一步的写操作。
- 确保 AOF 文件与 persister.currentDB 具有相同的数据库索引。将 SELECT 命令写入 persister.aofFile。如果写入失败,则抛出 panic 异常。
这段代码实现了 AOF 重写过程的结束操作,确保了重写后的 AOF 文件与原始 AOF 文件中的所有数据都被正确地复制到了新的 AOF 文件中。
FinishRewrite() (aof/rewrite.go)
func (persister *Persister) FinishRewrite(ctx *RewriteCtx) {
persister.pausingAof.Lock() // 确保重写期间没有其他写操作,保证数据一致性
defer persister.pausingAof.Unlock()
tmpFile := ctx.tmpFile
src, err := os.Open(persister.aofFilename)
if err != nil {
logger.Error("open aofFilename failed: " + err.Error())
return
}
defer func() {
_ = src.Close()
}()
_, err = src.Seek(ctx.fileSize, 0) // seek函数用于修改文件指针的位置,在前面复制完成之后,要在结尾加上SELECT语句
if err != nil {
logger.Error("seek failed: " + err.Error())
return
}
// 修改为之前未选择的数据库
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes()
_, err = tmpFile.Write(data)
if err != nil {
logger.Error("tmp file rewrite failed: " + err.Error())
return
}
// 将原有的AOF文件的剩余内容复制到tmpFIle中
_, err = io.Copy(tmpFile, src)
if err != nil {
logger.Error("copy aof filed failed: " + err.Error())
return
}
tmpFileName := tmpFile.Name()
_ = tmpFile.Close()
// 使用临时的文件代替AOF文件
_ = persister.aofFile.Close()
_ = os.Rename(tmpFileName, persister.aofFilename)
// 修改当前的AOF文件并打开以便后续的操作
aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
panic(err)
}
persister.aofFile = aofFile
// 确保当前的数据库和AOF文件中的是同一个数据库索引
data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(persister.currentDB))).ToBytes()
_, err = persister.aofFile.Write(data)
if err != nil {
panic(err)
}
}
转载自:https://juejin.cn/post/7229159531895963709