2 次代码提交 5dfe94ca40 ... f0dc4c2f84

作者 SHA1 备注 提交日期
  runningwater f0dc4c2f84 feat: AOF 持久化功能 4 周之前
  runningwater 7d7f2b9338 feat: .gitignore 1 月之前
共有 5 个文件被更改,包括 64 次插入3 次删除
  1. 1 1
      .gitignore
  2. 0 0
      aof.data
  3. 60 0
      aof/aof.go
  4. 2 1
      resp/parser/parser.go
  5. 1 1
      resp/reply/consts.go

+ 1 - 1
.gitignore

@@ -156,7 +156,7 @@ bh_unicode_properties.cache
 # https://packagecontrol.io/packages/sublime-github
 GitHub.sublime-settings
 
-
+*.data
 
 logs/
 tmp/

+ 0 - 0
aof.data


+ 60 - 0
aof/aof.go

@@ -5,6 +5,7 @@
 package aof
 
 import (
+	"io"
 	"os"
 	"strconv"
 
@@ -12,6 +13,8 @@ import (
 	"github.com/runningwater/go-redis/interface/database"
 	"github.com/runningwater/go-redis/lib/logger"
 	"github.com/runningwater/go-redis/lib/utils"
+	"github.com/runningwater/go-redis/resp/connection"
+	"github.com/runningwater/go-redis/resp/parser"
 	"github.com/runningwater/go-redis/resp/reply"
 )
 
@@ -66,12 +69,24 @@ func NewAofHandler(database database.Database) (*Handler, error) {
 	return h, nil
 }
 
+// AddAof 将命令写入AOF持久化日志
+//
+//	dbIndex: 数据库索引
+//	cmdLine: 要执行的命令行
 func (h *Handler) AddAof(dbIndex int, cmdLine database.CmdLine) {
+	// 检查是否启用了AOF持久化并且AOF通道不为空
 	if config.Properties.AppendOnly && h.aofChan != nil {
+		// 创建负载对象并发送到AOF通道
 		p := payload{cmdLine: cmdLine, dbIndex: dbIndex}
 		h.aofChan <- &p
 	}
 }
+
+// handleAof 处理AOF(Append Only File)持久化操作
+//
+//	该函数从aofChan通道中读取命令数据,并将其写入AOF文件
+//	当数据库索引发生变化时,会先写入SELECT命令切换数据库
+//	通过协程方式持续监听通道,实现异步AOF写入
 func (h *Handler) handleAof() {
 	h.currentDB = 0
 	for p := range h.aofChan {
@@ -95,6 +110,51 @@ func (h *Handler) handleAof() {
 	} // end for
 }
 
+// LoadAof 从AOF文件中加载数据并执行其中的命令
+//
+//	该函数会打开AOF文件,解析其中的命令并依次执行,用于恢复数据库状态
+//	参数: 无
+//	返回值: 无
 func (h *Handler) LoadAof() {
+	// 打开AOF文件
+	file, err := os.Open(h.aofFileName)
+	if err != nil {
+		logger.Error("open aof file error:", err)
+		return
+	}
+	defer func(file *os.File) {
+		err := file.Close()
+		if err != nil {
+			return
+		}
+	}(file)
 
+	// 解析AOF文件流并逐条执行命令
+	ch := parser.ParseStream(file)
+	fakeConn := &connection.Connection{}
+	for p := range ch {
+		if p.Err != nil {
+			if p.Err == io.EOF {
+				// 文件读取完毕
+				break
+			}
+			logger.Error("parse aof file error:", p.Err)
+			continue
+		}
+		if p.Data == nil {
+			logger.Error("parse aof file error:", "data is nil")
+			continue
+		}
+		// 检查解析后的数据类型是否正确
+		cmd, ok := p.Data.(*reply.MultiBulkReply)
+		if !ok {
+			logger.Error("parse aof file error:", "data type error")
+			continue
+		}
+		// 执行解析出的命令
+		rep := h.database.Exec(fakeConn, cmd.Args)
+		if reply.IsErrReply(rep) {
+			logger.Error("parse aof file error:", rep)
+		}
+	}
 }

+ 2 - 1
resp/parser/parser.go

@@ -21,7 +21,8 @@ import (
 )
 
 // Payload is the payload of the parser
-// Used to store the state information of the parser
+//
+//	Used to store the state information of the parser
 type Payload struct {
 	Data resp.Reply // Parsed response object, consistent with the return data structure
 	Err  error      // Errors that occurred during parsing

+ 1 - 1
resp/reply/consts.go

@@ -94,7 +94,7 @@ func NewEmptyMultiBulkReply() *EmptyMultiBulkReply {
 }
 
 // IsOkReply 判断回复是否为OK响应
-func IsOkReply(reply interface{}) bool {
+func IsOkReply(reply any) bool {
 	_, ok := reply.(*OkReply)
 	return ok
 }