aof.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/9/29 09:28
  3. // Desc: AOF持久化处理模块,负责将命令写入AOF文件实现数据持久化
  4. package aof
  5. import (
  6. "os"
  7. "strconv"
  8. "github.com/runningwater/go-redis/config"
  9. "github.com/runningwater/go-redis/interface/database"
  10. "github.com/runningwater/go-redis/lib/logger"
  11. "github.com/runningwater/go-redis/lib/utils"
  12. "github.com/runningwater/go-redis/resp/reply"
  13. )
  14. // aofBufferSize AOF缓冲区大小,用于控制异步写入通道的容量
  15. const aofBufferSize = 1 << 16
  16. // payload AOF操作载荷,包含命令行和数据库索引
  17. type payload struct {
  18. cmdLine database.CmdLine
  19. dbIndex int
  20. }
  21. // Handler AOF处理器结构体
  22. type Handler struct {
  23. database database.Database // 数据库实例,用于执行AOF重放
  24. aofChan chan *payload // AOF操作通道,用于异步处理写入请求
  25. aofFile *os.File // AOF文件句柄
  26. aofFileName string // AOF文件名
  27. currentDB int // 当前数据库索引,用于判断是否需要切换数据库
  28. }
  29. // NewAofHandler 创建一个新的AOF处理器实例
  30. //
  31. // 参数:
  32. //
  33. // database - 数据库实例,用于执行AOF重放的命令
  34. //
  35. // 返回值:
  36. //
  37. // *Handler - 成功创建的AOF处理器实例
  38. // error - 创建过程中可能发生的错误
  39. func NewAofHandler(database database.Database) (*Handler, error) {
  40. h := &Handler{}
  41. h.database = database
  42. h.aofFileName = config.Properties.AppendFilename
  43. // 加载现有的AOF文件内容
  44. h.LoadAof()
  45. aofFile, err := os.OpenFile(config.Properties.AppendFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
  46. if err != nil {
  47. return nil, err
  48. }
  49. h.aofFile = aofFile
  50. // 创建AOF操作通道,用于异步处理AOF写入请求
  51. h.aofChan = make(chan *payload, aofBufferSize)
  52. // 启动后台协程处理AOF写入操作
  53. go func() {
  54. h.handleAof()
  55. }()
  56. return h, nil
  57. }
  58. func (h *Handler) AddAof(dbIndex int, cmdLine database.CmdLine) {
  59. if config.Properties.AppendOnly && h.aofChan != nil {
  60. p := payload{cmdLine: cmdLine, dbIndex: dbIndex}
  61. h.aofChan <- &p
  62. }
  63. }
  64. func (h *Handler) handleAof() {
  65. h.currentDB = 0
  66. for p := range h.aofChan {
  67. // 判断是否切换了 DB,如果切换使用 SELECT index
  68. if p.dbIndex != h.currentDB {
  69. data := reply.NewMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
  70. _, err := h.aofFile.Write(data)
  71. if err != nil {
  72. logger.Error("write aof file error:", err)
  73. continue
  74. }
  75. h.currentDB = p.dbIndex
  76. }
  77. data := reply.NewMultiBulkReply(p.cmdLine).ToBytes()
  78. _, err := h.aofFile.Write(data)
  79. if err != nil {
  80. logger.Error("write aof file error:", err)
  81. continue
  82. }
  83. } // end for
  84. }
  85. func (h *Handler) LoadAof() {
  86. }