aof.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/9/29 09:28
  3. // Desc: AOF持久化处理模块,负责将命令写入AOF文件实现数据持久化
  4. package aof
  5. import (
  6. "errors"
  7. "io"
  8. "os"
  9. "strconv"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/runningwater/go-redis/config"
  14. "github.com/runningwater/go-redis/interface/database"
  15. "github.com/runningwater/go-redis/lib/logger"
  16. "github.com/runningwater/go-redis/lib/utils"
  17. "github.com/runningwater/go-redis/resp/connection"
  18. "github.com/runningwater/go-redis/resp/parser"
  19. "github.com/runningwater/go-redis/resp/reply"
  20. )
  21. // aofBufferSize AOF缓冲区大小,用于控制异步写入通道的容量
  22. const aofBufferSize = 1 << 16
  23. // payload AOF操作载荷,包含命令行和数据库索引
  24. type payload struct {
  25. cmdLine database.CmdLine
  26. dbIndex int
  27. }
  28. // Handler AOF处理器结构体
  29. type Handler struct {
  30. database database.Database // 数据库实例,用于执行AOF重放
  31. aofChan chan *payload // AOF操作通道,用于异步处理写入请求
  32. aofFile *os.File // AOF文件句柄
  33. aofFileName string // AOF文件名
  34. currentDB atomic.Int64 // 当前数据库索引,用于判断是否需要切换数据库
  35. }
  36. // payloadPool 负载对象池
  37. var payloadPool = sync.Pool{
  38. New: func() any {
  39. return &payload{}
  40. },
  41. }
  42. // NewAofHandler 创建一个新的AOF处理器实例
  43. //
  44. // 参数:
  45. //
  46. // database - 数据库实例,用于执行AOF重放的命令
  47. //
  48. // 返回值:
  49. //
  50. // *Handler - 成功创建的AOF处理器实例
  51. // error - 创建过程中可能发生的错误
  52. func NewAofHandler(database database.Database) (*Handler, error) {
  53. h := &Handler{}
  54. h.database = database
  55. h.aofFileName = config.Properties.AppendFilename
  56. // 加载现有的AOF文件内容
  57. h.LoadAof()
  58. aofFile, err := os.OpenFile(config.Properties.AppendFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
  59. if err != nil {
  60. return nil, err
  61. }
  62. h.aofFile = aofFile
  63. // 创建AOF操作通道,用于异步处理AOF写入请求
  64. h.aofChan = make(chan *payload, aofBufferSize)
  65. // 启动后台协程处理AOF写入操作
  66. go func() {
  67. h.handleAof()
  68. }()
  69. // 启动AOF刷新定时器
  70. go h.startFlushTicker()
  71. return h, nil
  72. }
  73. // Close 关闭AOF处理器
  74. func (h *Handler) Close() error {
  75. logger.Info("aof shutting down...")
  76. if h.aofChan != nil {
  77. close(h.aofChan)
  78. }
  79. if h.aofFile != nil {
  80. return h.aofFile.Close()
  81. }
  82. return nil
  83. }
  84. // AddAof 将命令写入AOF持久化日志
  85. //
  86. // dbIndex: 数据库索引
  87. // cmdLine: 要执行的命令行
  88. func (h *Handler) AddAof(dbIndex int, cmdLine database.CmdLine) {
  89. // 检查是否启用了AOF持久化并且AOF通道不为空
  90. if !config.Properties.AppendOnly || h.aofChan == nil {
  91. return
  92. }
  93. // 创建负载对象并发送到AOF通道
  94. p := payloadPool.Get().(*payload)
  95. p.cmdLine = cmdLine
  96. p.dbIndex = dbIndex
  97. select {
  98. case h.aofChan <- p:
  99. default:
  100. logger.Warn("aof channel is full, drop the command")
  101. payloadPool.Put(p) // 回收负载对象
  102. // TODO: 触发紧急刷盘或者报警
  103. // h.startFlushTicker()
  104. }
  105. }
  106. // startFlushTicker 启动AOF刷新定时器
  107. func (h *Handler) startFlushTicker() {
  108. if config.Properties.AppendFsyncInterval == 0 {
  109. config.Properties.AppendFsyncInterval = 5
  110. } // in seconds
  111. ticker := time.NewTicker(time.Duration(config.Properties.AppendFsyncInterval) * time.Second)
  112. defer ticker.Stop()
  113. for {
  114. select {
  115. case <-ticker.C:
  116. logger.Debug("flush aof file")
  117. // 每隔AppendFsyncInterval秒,将AOF文件进行同步
  118. if err := h.aofFile.Sync(); err != nil {
  119. logger.Error("sync aof file error:", err)
  120. }
  121. }
  122. }
  123. }
  124. // handleAof 处理AOF(Append Only File)持久化操作
  125. //
  126. // 该函数从aofChan通道中读取命令数据,并将其写入AOF文件
  127. // 当数据库索引发生变化时,会先写入SELECT命令切换数据库
  128. // 通过协程方式持续监听通道,实现异步AOF写入
  129. func (h *Handler) handleAof() {
  130. defer func() {
  131. if r := recover(); r != nil {
  132. logger.Error("handle aof panic:", r)
  133. }
  134. }()
  135. h.currentDB.Store(0)
  136. for p := range h.aofChan {
  137. // 判断是否切换了 DB,如果切换使用 SELECT index
  138. err := h.switchDatabaseIfNeed(p)
  139. if err != nil {
  140. logger.Error("write aof file error:", err)
  141. continue
  142. }
  143. err = h.writeAofData(p.cmdLine)
  144. if err != nil {
  145. logger.Error("write aof file error:", err)
  146. continue
  147. }
  148. // 回收负载对象
  149. payloadPool.Put(p)
  150. } // end for
  151. }
  152. // 封装AOF写入逻辑
  153. func (h *Handler) writeAofData(cmdLine database.CmdLine) error {
  154. data := reply.NewMultiBulkReply(cmdLine).ToBytes()
  155. _, err := h.aofFile.Write(data)
  156. return err
  157. }
  158. func (h *Handler) switchDatabaseIfNeed(p *payload) error {
  159. if p.dbIndex != int(h.currentDB.Load()) {
  160. data := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))
  161. err := h.writeAofData(data)
  162. if err != nil {
  163. logger.Error("write aof file error:", err)
  164. return err
  165. }
  166. h.currentDB.Store(int64(p.dbIndex))
  167. }
  168. return nil
  169. }
  170. // LoadAof 从AOF文件中加载数据并执行其中的命令
  171. //
  172. // 该函数会打开AOF文件,解析其中的命令并依次执行,用于恢复数据库状态
  173. // 参数: 无
  174. // 返回值: 无
  175. //
  176. // TODO: 一次性加载整个文件可能导致内存占用过高,需要分批加载
  177. func (h *Handler) LoadAof() {
  178. // 打开AOF文件
  179. file, err := os.Open(h.aofFileName)
  180. if err != nil {
  181. logger.Error("open aof file error:", err)
  182. return
  183. }
  184. defer func() {
  185. if err := file.Close(); err != nil {
  186. logger.Warn("close aof file error:", err)
  187. }
  188. }()
  189. // 解析AOF文件流并逐条执行命令
  190. ch := parser.ParseStream(file)
  191. fakeConn := &connection.Connection{}
  192. for p := range ch {
  193. if p.Err != nil {
  194. if p.Err == io.EOF {
  195. // 文件读取完毕
  196. break
  197. }
  198. logger.Error("parse aof file error:", p.Err)
  199. continue
  200. }
  201. if p.Data == nil {
  202. logger.Error("parse aof file error:", "data is nil")
  203. continue
  204. }
  205. // 检查解析后的数据类型是否正确
  206. var cmd *reply.MultiBulkReply
  207. if !errors.As(p.Data, &cmd) {
  208. logger.Error("parse aof file error:", "data type error")
  209. continue
  210. }
  211. // 执行解析出的命令
  212. rep := h.database.Exec(fakeConn, cmd.Args)
  213. if reply.IsErrReply(rep) {
  214. logger.Error("parse aof file error:", rep)
  215. }
  216. }
  217. }