| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- // Author: simon (ynwdlxm@163.com)
- // Date: 2025/9/29 09:28
- // Desc: AOF持久化处理模块,负责将命令写入AOF文件实现数据持久化
- package aof
- import (
- "errors"
- "io"
- "os"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
- "github.com/runningwater/go-redis/config"
- "github.com/runningwater/go-redis/interface/database"
- "github.com/runningwater/go-redis/lib/logger"
- "github.com/runningwater/go-redis/lib/utils"
- "github.com/runningwater/go-redis/resp/connection"
- "github.com/runningwater/go-redis/resp/parser"
- "github.com/runningwater/go-redis/resp/reply"
- )
- // aofBufferSize AOF缓冲区大小,用于控制异步写入通道的容量
- const aofBufferSize = 1 << 16
- // payload AOF操作载荷,包含命令行和数据库索引
- type payload struct {
- cmdLine database.CmdLine
- dbIndex int
- }
- // Handler AOF处理器结构体
- type Handler struct {
- database database.Database // 数据库实例,用于执行AOF重放
- aofChan chan *payload // AOF操作通道,用于异步处理写入请求
- aofFile *os.File // AOF文件句柄
- aofFileName string // AOF文件名
- currentDB atomic.Int64 // 当前数据库索引,用于判断是否需要切换数据库
- }
- // payloadPool 负载对象池
- var payloadPool = sync.Pool{
- New: func() any {
- return &payload{}
- },
- }
- // NewAofHandler 创建一个新的AOF处理器实例
- //
- // 参数:
- //
- // database - 数据库实例,用于执行AOF重放的命令
- //
- // 返回值:
- //
- // *Handler - 成功创建的AOF处理器实例
- // error - 创建过程中可能发生的错误
- func NewAofHandler(database database.Database) (*Handler, error) {
- h := &Handler{}
- h.database = database
- h.aofFileName = config.Properties.AppendFilename
- // 加载现有的AOF文件内容
- h.LoadAof()
- aofFile, err := os.OpenFile(config.Properties.AppendFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
- if err != nil {
- return nil, err
- }
- h.aofFile = aofFile
- // 创建AOF操作通道,用于异步处理AOF写入请求
- h.aofChan = make(chan *payload, aofBufferSize)
- // 启动后台协程处理AOF写入操作
- go func() {
- h.handleAof()
- }()
- // 启动AOF刷新定时器
- go h.startFlushTicker()
- return h, nil
- }
- // Close 关闭AOF处理器
- func (h *Handler) Close() error {
- logger.Info("aof shutting down...")
- if h.aofChan != nil {
- close(h.aofChan)
- }
- if h.aofFile != nil {
- return h.aofFile.Close()
- }
- return nil
- }
- // AddAof 将命令写入AOF持久化日志
- //
- // dbIndex: 数据库索引
- // cmdLine: 要执行的命令行
- func (h *Handler) AddAof(dbIndex int, cmdLine database.CmdLine) {
- // 检查是否启用了AOF持久化并且AOF通道不为空
- if !config.Properties.AppendOnly || h.aofChan == nil {
- return
- }
- // 创建负载对象并发送到AOF通道
- p := payloadPool.Get().(*payload)
- p.cmdLine = cmdLine
- p.dbIndex = dbIndex
- select {
- case h.aofChan <- p:
- default:
- logger.Warn("aof channel is full, drop the command")
- payloadPool.Put(p) // 回收负载对象
- // TODO: 触发紧急刷盘或者报警
- // h.startFlushTicker()
- }
- }
- // startFlushTicker 启动AOF刷新定时器
- func (h *Handler) startFlushTicker() {
- if config.Properties.AppendFsyncInterval == 0 {
- config.Properties.AppendFsyncInterval = 5
- } // in seconds
- ticker := time.NewTicker(time.Duration(config.Properties.AppendFsyncInterval) * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- logger.Debug("flush aof file")
- // 每隔AppendFsyncInterval秒,将AOF文件进行同步
- if err := h.aofFile.Sync(); err != nil {
- logger.Error("sync aof file error:", err)
- }
- }
- }
- }
- // handleAof 处理AOF(Append Only File)持久化操作
- //
- // 该函数从aofChan通道中读取命令数据,并将其写入AOF文件
- // 当数据库索引发生变化时,会先写入SELECT命令切换数据库
- // 通过协程方式持续监听通道,实现异步AOF写入
- func (h *Handler) handleAof() {
- defer func() {
- if r := recover(); r != nil {
- logger.Error("handle aof panic:", r)
- }
- }()
- h.currentDB.Store(0)
- for p := range h.aofChan {
- // 判断是否切换了 DB,如果切换使用 SELECT index
- err := h.switchDatabaseIfNeed(p)
- if err != nil {
- logger.Error("write aof file error:", err)
- continue
- }
- err = h.writeAofData(p.cmdLine)
- if err != nil {
- logger.Error("write aof file error:", err)
- continue
- }
- // 回收负载对象
- payloadPool.Put(p)
- } // end for
- }
- // 封装AOF写入逻辑
- func (h *Handler) writeAofData(cmdLine database.CmdLine) error {
- data := reply.NewMultiBulkReply(cmdLine).ToBytes()
- _, err := h.aofFile.Write(data)
- return err
- }
- func (h *Handler) switchDatabaseIfNeed(p *payload) error {
- if p.dbIndex != int(h.currentDB.Load()) {
- data := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))
- err := h.writeAofData(data)
- if err != nil {
- logger.Error("write aof file error:", err)
- return err
- }
- h.currentDB.Store(int64(p.dbIndex))
- }
- return nil
- }
- // LoadAof 从AOF文件中加载数据并执行其中的命令
- //
- // 该函数会打开AOF文件,解析其中的命令并依次执行,用于恢复数据库状态
- // 参数: 无
- // 返回值: 无
- //
- // TODO: 一次性加载整个文件可能导致内存占用过高,需要分批加载
- func (h *Handler) LoadAof() {
- // 打开AOF文件
- file, err := os.Open(h.aofFileName)
- if err != nil {
- logger.Error("open aof file error:", err)
- return
- }
- defer func() {
- if err := file.Close(); err != nil {
- logger.Warn("close aof file error:", err)
- }
- }()
- // 解析AOF文件流并逐条执行命令
- ch := parser.ParseStream(file)
- fakeConn := &connection.Connection{}
- for p := range ch {
- if p.Err != nil {
- if p.Err == io.EOF {
- // 文件读取完毕
- break
- }
- logger.Error("parse aof file error:", p.Err)
- continue
- }
- if p.Data == nil {
- logger.Error("parse aof file error:", "data is nil")
- continue
- }
- // 检查解析后的数据类型是否正确
- var cmd *reply.MultiBulkReply
- if !errors.As(p.Data, &cmd) {
- logger.Error("parse aof file error:", "data type error")
- continue
- }
- // 执行解析出的命令
- rep := h.database.Exec(fakeConn, cmd.Args)
- if reply.IsErrReply(rep) {
- logger.Error("parse aof file error:", rep)
- }
- }
- }
|