| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- // Author: simon (ynwdlxm@163.com)
- // Date: 2025/9/29 09:28
- // Desc: AOF持久化处理模块,负责将命令写入AOF文件实现数据持久化
- package aof
- import (
- "os"
- "strconv"
- "github.com/runningwater/go-redis/config"
- "github.com/runningwater/go-redis/interface/database"
- "github.com/runningwater/go-redis/lib/logger"
- "github.com/runningwater/go-redis/lib/utils"
- "github.com/runningwater/go-redis/resp/reply"
- )
- // aofBufferSize AOF缓冲区大小,用于控制异步写入通道的容量
- const aofBufferSize = 1 << 16
- // payload AOF操作载荷,包含命令行和数据库索引
- type payload struct {
- cmdLine database.CmdLine
- dbIndex int
- }
- // Handler AOF处理器结构体
- type Handler struct {
- database database.Database // 数据库实例,用于执行AOF重放
- aofChan chan *payload // AOF操作通道,用于异步处理写入请求
- aofFile *os.File // AOF文件句柄
- aofFileName string // AOF文件名
- currentDB int // 当前数据库索引,用于判断是否需要切换数据库
- }
- // NewAofHandler 创建一个新的AOF处理器实例
- //
- // 参数:
- //
- // database - 数据库实例,用于执行AOF重放的命令
- //
- // 返回值:
- //
- // *Handler - 成功创建的AOF处理器实例
- // error - 创建过程中可能发生的错误
- func NewAofHandler(database database.Database) (*Handler, error) {
- h := &Handler{}
- h.database = database
- h.aofFileName = config.Properties.AppendFilename
- // 加载现有的AOF文件内容
- h.LoadAof()
- aofFile, err := os.OpenFile(config.Properties.AppendFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
- if err != nil {
- return nil, err
- }
- h.aofFile = aofFile
- // 创建AOF操作通道,用于异步处理AOF写入请求
- h.aofChan = make(chan *payload, aofBufferSize)
- // 启动后台协程处理AOF写入操作
- go func() {
- h.handleAof()
- }()
- return h, nil
- }
- func (h *Handler) AddAof(dbIndex int, cmdLine database.CmdLine) {
- if config.Properties.AppendOnly && h.aofChan != nil {
- p := payload{cmdLine: cmdLine, dbIndex: dbIndex}
- h.aofChan <- &p
- }
- }
- func (h *Handler) handleAof() {
- h.currentDB = 0
- for p := range h.aofChan {
- // 判断是否切换了 DB,如果切换使用 SELECT index
- if p.dbIndex != h.currentDB {
- data := reply.NewMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
- _, err := h.aofFile.Write(data)
- if err != nil {
- logger.Error("write aof file error:", err)
- continue
- }
- h.currentDB = p.dbIndex
- }
- data := reply.NewMultiBulkReply(p.cmdLine).ToBytes()
- _, err := h.aofFile.Write(data)
- if err != nil {
- logger.Error("write aof file error:", err)
- continue
- }
- } // end for
- }
- func (h *Handler) LoadAof() {
- }
|