// Author: simon (ynwdlxm@163.com) // Date: 2025/9/29 09:28 // Desc: AOF持久化处理模块,负责将命令写入AOF文件实现数据持久化 package aof import ( "os" "strconv" "github.com/runningwater/go-redis/config" "github.com/runningwater/go-redis/interface/database" "github.com/runningwater/go-redis/lib/logger" "github.com/runningwater/go-redis/lib/utils" "github.com/runningwater/go-redis/resp/reply" ) // aofBufferSize AOF缓冲区大小,用于控制异步写入通道的容量 const aofBufferSize = 1 << 16 // payload AOF操作载荷,包含命令行和数据库索引 type payload struct { cmdLine database.CmdLine dbIndex int } // Handler AOF处理器结构体 type Handler struct { database database.Database // 数据库实例,用于执行AOF重放 aofChan chan *payload // AOF操作通道,用于异步处理写入请求 aofFile *os.File // AOF文件句柄 aofFileName string // AOF文件名 currentDB int // 当前数据库索引,用于判断是否需要切换数据库 } // NewAofHandler 创建一个新的AOF处理器实例 // // 参数: // // database - 数据库实例,用于执行AOF重放的命令 // // 返回值: // // *Handler - 成功创建的AOF处理器实例 // error - 创建过程中可能发生的错误 func NewAofHandler(database database.Database) (*Handler, error) { h := &Handler{} h.database = database h.aofFileName = config.Properties.AppendFilename // 加载现有的AOF文件内容 h.LoadAof() aofFile, err := os.OpenFile(config.Properties.AppendFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } h.aofFile = aofFile // 创建AOF操作通道,用于异步处理AOF写入请求 h.aofChan = make(chan *payload, aofBufferSize) // 启动后台协程处理AOF写入操作 go func() { h.handleAof() }() return h, nil } func (h *Handler) AddAof(dbIndex int, cmdLine database.CmdLine) { if config.Properties.AppendOnly && h.aofChan != nil { p := payload{cmdLine: cmdLine, dbIndex: dbIndex} h.aofChan <- &p } } func (h *Handler) handleAof() { h.currentDB = 0 for p := range h.aofChan { // 判断是否切换了 DB,如果切换使用 SELECT index if p.dbIndex != h.currentDB { data := reply.NewMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes() _, err := h.aofFile.Write(data) if err != nil { logger.Error("write aof file error:", err) continue } h.currentDB = p.dbIndex } data := reply.NewMultiBulkReply(p.cmdLine).ToBytes() _, err := h.aofFile.Write(data) if err != nil { logger.Error("write aof file error:", err) continue } } // end for } func (h *Handler) LoadAof() { }