|
|
@@ -5,6 +5,7 @@
|
|
|
package aof
|
|
|
|
|
|
import (
|
|
|
+ "io"
|
|
|
"os"
|
|
|
"strconv"
|
|
|
|
|
|
@@ -12,6 +13,8 @@ import (
|
|
|
"github.com/runningwater/go-redis/interface/database"
|
|
|
"github.com/runningwater/go-redis/lib/logger"
|
|
|
"github.com/runningwater/go-redis/lib/utils"
|
|
|
+ "github.com/runningwater/go-redis/resp/connection"
|
|
|
+ "github.com/runningwater/go-redis/resp/parser"
|
|
|
"github.com/runningwater/go-redis/resp/reply"
|
|
|
)
|
|
|
|
|
|
@@ -66,12 +69,24 @@ func NewAofHandler(database database.Database) (*Handler, error) {
|
|
|
return h, nil
|
|
|
}
|
|
|
|
|
|
+// AddAof 将命令写入AOF持久化日志
|
|
|
+//
|
|
|
+// dbIndex: 数据库索引
|
|
|
+// cmdLine: 要执行的命令行
|
|
|
func (h *Handler) AddAof(dbIndex int, cmdLine database.CmdLine) {
|
|
|
+ // 检查是否启用了AOF持久化并且AOF通道不为空
|
|
|
if config.Properties.AppendOnly && h.aofChan != nil {
|
|
|
+ // 创建负载对象并发送到AOF通道
|
|
|
p := payload{cmdLine: cmdLine, dbIndex: dbIndex}
|
|
|
h.aofChan <- &p
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+// handleAof 处理AOF(Append Only File)持久化操作
|
|
|
+//
|
|
|
+// 该函数从aofChan通道中读取命令数据,并将其写入AOF文件
|
|
|
+// 当数据库索引发生变化时,会先写入SELECT命令切换数据库
|
|
|
+// 通过协程方式持续监听通道,实现异步AOF写入
|
|
|
func (h *Handler) handleAof() {
|
|
|
h.currentDB = 0
|
|
|
for p := range h.aofChan {
|
|
|
@@ -95,6 +110,51 @@ func (h *Handler) handleAof() {
|
|
|
} // end for
|
|
|
}
|
|
|
|
|
|
+// LoadAof 从AOF文件中加载数据并执行其中的命令
|
|
|
+//
|
|
|
+// 该函数会打开AOF文件,解析其中的命令并依次执行,用于恢复数据库状态
|
|
|
+// 参数: 无
|
|
|
+// 返回值: 无
|
|
|
func (h *Handler) LoadAof() {
|
|
|
+ // 打开AOF文件
|
|
|
+ file, err := os.Open(h.aofFileName)
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("open aof file error:", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ defer func(file *os.File) {
|
|
|
+ err := file.Close()
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }(file)
|
|
|
|
|
|
+ // 解析AOF文件流并逐条执行命令
|
|
|
+ ch := parser.ParseStream(file)
|
|
|
+ fakeConn := &connection.Connection{}
|
|
|
+ for p := range ch {
|
|
|
+ if p.Err != nil {
|
|
|
+ if p.Err == io.EOF {
|
|
|
+ // 文件读取完毕
|
|
|
+ break
|
|
|
+ }
|
|
|
+ logger.Error("parse aof file error:", p.Err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if p.Data == nil {
|
|
|
+ logger.Error("parse aof file error:", "data is nil")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ // 检查解析后的数据类型是否正确
|
|
|
+ cmd, ok := p.Data.(*reply.MultiBulkReply)
|
|
|
+ if !ok {
|
|
|
+ logger.Error("parse aof file error:", "data type error")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ // 执行解析出的命令
|
|
|
+ rep := h.database.Exec(fakeConn, cmd.Args)
|
|
|
+ if reply.IsErrReply(rep) {
|
|
|
+ logger.Error("parse aof file error:", rep)
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|