Prechádzať zdrojové kódy

feat: aof 存储功能

runningwater 2 mesiacov pred
rodič
commit
5dfe94ca40
8 zmenil súbory, kde vykonal 184 pridanie a 18 odobranie
  1. 0 0
      aof.data
  2. 100 0
      aof/aof.go
  3. 6 4
      database/db.go
  4. 10 1
      database/generic.go
  5. 39 7
      database/resp_database.go
  6. 4 0
      database/string.go
  7. 20 0
      lib/utils/utils.go
  8. 5 6
      redis.conf

+ 0 - 0
aof.data


+ 100 - 0
aof/aof.go

@@ -0,0 +1,100 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/9/29 09:28
+// Desc: AOF持久化处理模块,负责将命令写入AOF文件实现数据持久化
+
+package aof
+
+import (
+	"os"
+	"strconv"
+
+	"github.com/runningwater/go-redis/config"
+	"github.com/runningwater/go-redis/interface/database"
+	"github.com/runningwater/go-redis/lib/logger"
+	"github.com/runningwater/go-redis/lib/utils"
+	"github.com/runningwater/go-redis/resp/reply"
+)
+
+// aofBufferSize AOF缓冲区大小,用于控制异步写入通道的容量
+const aofBufferSize = 1 << 16
+
+// payload AOF操作载荷,包含命令行和数据库索引
+type payload struct {
+	cmdLine database.CmdLine
+	dbIndex int
+}
+
+// Handler AOF处理器结构体
+type Handler struct {
+	database    database.Database // 数据库实例,用于执行AOF重放
+	aofChan     chan *payload     // AOF操作通道,用于异步处理写入请求
+	aofFile     *os.File          // AOF文件句柄
+	aofFileName string            // AOF文件名
+	currentDB   int               // 当前数据库索引,用于判断是否需要切换数据库
+}
+
+// NewAofHandler 创建一个新的AOF处理器实例
+//
+//	 参数:
+//
+//		database - 数据库实例,用于执行AOF重放的命令
+//
+// 返回值:
+//
+//	*Handler - 成功创建的AOF处理器实例
+//	error - 创建过程中可能发生的错误
+func NewAofHandler(database database.Database) (*Handler, error) {
+	h := &Handler{}
+	h.database = database
+	h.aofFileName = config.Properties.AppendFilename
+	// 加载现有的AOF文件内容
+	h.LoadAof()
+	aofFile, err := os.OpenFile(config.Properties.AppendFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
+	if err != nil {
+		return nil, err
+	}
+	h.aofFile = aofFile
+
+	// 创建AOF操作通道,用于异步处理AOF写入请求
+	h.aofChan = make(chan *payload, aofBufferSize)
+
+	// 启动后台协程处理AOF写入操作
+	go func() {
+		h.handleAof()
+	}()
+
+	return h, nil
+}
+
+func (h *Handler) AddAof(dbIndex int, cmdLine database.CmdLine) {
+	if config.Properties.AppendOnly && h.aofChan != nil {
+		p := payload{cmdLine: cmdLine, dbIndex: dbIndex}
+		h.aofChan <- &p
+	}
+}
+func (h *Handler) handleAof() {
+	h.currentDB = 0
+	for p := range h.aofChan {
+		// 判断是否切换了 DB,如果切换使用 SELECT index
+		if p.dbIndex != h.currentDB {
+			data := reply.NewMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
+			_, err := h.aofFile.Write(data)
+			if err != nil {
+				logger.Error("write aof file error:", err)
+				continue
+			}
+			h.currentDB = p.dbIndex
+		}
+
+		data := reply.NewMultiBulkReply(p.cmdLine).ToBytes()
+		_, err := h.aofFile.Write(data)
+		if err != nil {
+			logger.Error("write aof file error:", err)
+			continue
+		}
+	} // end for
+}
+
+func (h *Handler) LoadAof() {
+
+}

+ 6 - 4
database/db.go

@@ -13,10 +13,11 @@ import (
 	"github.com/runningwater/go-redis/resp/reply"
 )
 
-// DB 表示一个 Redis 数据库实例
+// DB 表示一个 Redis 数据库实例(Database 里面的 dbSet)
 type DB struct {
-	index int       // 数据库索引
-	data  dict.Dict // 存储键值对的数据结构
+	index  int           // 数据库索引
+	data   dict.Dict     // 存储键值对的数据结构
+	addAof func(CmdLine) // 添加AOF记录的函数指针
 }
 
 // ExecFunc 定义命令执行函数的类型
@@ -36,7 +37,8 @@ type CmdLine = [][]byte
 //   - *DB: 新创建的数据库实例
 func NewDB() *DB {
 	return &DB{
-		data: dict.NewSyncDict(),
+		data:   dict.NewSyncDict(),
+		addAof: func(cmd CmdLine) {}, // 默认的添加AOF记录函数
 	}
 }
 

+ 10 - 1
database/generic.go

@@ -6,6 +6,7 @@ package database
 
 import (
 	"github.com/runningwater/go-redis/interface/resp"
+	"github.com/runningwater/go-redis/lib/utils"
 	"github.com/runningwater/go-redis/lib/wildcard"
 	"github.com/runningwater/go-redis/resp/reply"
 )
@@ -22,7 +23,12 @@ func execDel(db *DB, args [][]byte) resp.Reply {
 	for i, v := range args {
 		keys[i] = string(v)
 	}
-	return reply.NewIntReply(int64(db.Removes(keys...)))
+	deleted := db.Removes(keys...)
+	if deleted > 0 {
+		db.addAof(utils.ToCmdLine2("DEL", args...))
+	}
+
+	return reply.NewIntReply(int64(deleted))
 }
 
 // 语法:
@@ -51,6 +57,7 @@ func execExists(db *DB, args [][]byte) resp.Reply {
 //	Simple string reply: OK.
 func execFlushDb(db *DB, args [][]byte) resp.Reply {
 	db.Flush()
+	db.addAof(utils.ToCmdLine2("FLUSHDB", args...))
 	return reply.NewOkReply()
 }
 
@@ -129,6 +136,7 @@ func execRename(db *DB, args [][]byte) resp.Reply {
 	}
 	db.PutEntity(dest, entity)
 	db.Remove(src)
+	db.addAof(utils.ToCmdLine2("RENAME", args...))
 	return reply.NewOkReply()
 }
 
@@ -155,6 +163,7 @@ func execRenamenx(db *DB, args [][]byte) resp.Reply {
 	}
 	db.PutEntity(dest, entity)
 	db.Remove(src)
+	db.addAof(utils.ToCmdLine2("RENAMENX", args...))
 	return reply.NewIntReply(1)
 }
 

+ 39 - 7
database/resp_database.go

@@ -8,6 +8,7 @@ import (
 	"strconv"
 	"strings"
 
+	"github.com/runningwater/go-redis/aof"
 	"github.com/runningwater/go-redis/config"
 	"github.com/runningwater/go-redis/interface/resp"
 	"github.com/runningwater/go-redis/lib/logger"
@@ -15,22 +16,38 @@ import (
 )
 
 type Database struct {
-	dbSet []*DB
+	dbSet      []*DB
+	aofHandler *aof.Handler
 }
 
 func NewDatabase() *Database {
 	if config.Properties.Databases == 0 {
 		config.Properties.Databases = 16
 	}
-	dbs := &Database{}
-	dbs.dbSet = make([]*DB, config.Properties.Databases)
-	for i := range dbs.dbSet {
+	database := &Database{}
+	database.dbSet = make([]*DB, config.Properties.Databases)
+	// 初始化数据库集合
+	for i := range database.dbSet {
 		db := NewDB()
 		db.index = i
-		dbs.dbSet[i] = db
+		database.dbSet[i] = db
 	}
 
-	return dbs
+	if config.Properties.AppendOnly {
+		aofHandler, err := aof.NewAofHandler(database)
+		if err != nil {
+			panic(err)
+		}
+		database.aofHandler = aofHandler
+	}
+
+	for _, db := range database.dbSet {
+		d := db
+		d.addAof = func(line CmdLine) {
+			database.aofHandler.AddAof(d.index, line)
+		}
+	}
+	return database
 }
 
 func (d *Database) Exec(client resp.Connection, args [][]byte) resp.Reply {
@@ -64,15 +81,30 @@ func (d *Database) AfterClientClose(client resp.Connection) {
 
 }
 
-// select db 命令
+// execSelect 处理数据库选择命令
+//
+//	参数:
+//
+//			c: 客户端连接对象,用于执行数据库切换操作
+//			database: 数据库实例,包含所有可用的数据库集合
+//			args: 命令参数,第一个参数为要选择的数据库索引
+//
+//	 返回值:
+//
+//		resp.Reply: 执行结果回复,成功返回OK,失败返回错误信息
 func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply {
+	// 解析数据库索引参数
 	dbIndex, err := strconv.Atoi(string(args[0]))
 	if err != nil {
 		return reply.NewErrReply("ERR invalid DB index")
 	}
+
+	// 验证数据库索引范围
 	if dbIndex >= len(database.dbSet) {
 		return reply.NewErrReply("ERR DB index is out of range")
 	}
+
+	// 执行数据库切换操作
 	c.SelectDB(dbIndex)
 	return reply.NewOkReply()
 }

+ 4 - 0
database/string.go

@@ -7,6 +7,7 @@ package database
 import (
 	"github.com/runningwater/go-redis/interface/database"
 	"github.com/runningwater/go-redis/interface/resp"
+	"github.com/runningwater/go-redis/lib/utils"
 	"github.com/runningwater/go-redis/resp/reply"
 )
 
@@ -49,6 +50,7 @@ func execSet(db *DB, args [][]byte) resp.Reply {
 		Data: value,
 	}
 	_ = db.PutEntity(key, entity)
+	db.addAof(utils.ToCmdLine2("SET", args...))
 	return reply.NewOkReply()
 }
 
@@ -66,6 +68,7 @@ func execSetNX(db *DB, args [][]byte) resp.Reply {
 		Data: value,
 	}
 	absent := db.PutIfAbsent(key, entity)
+	db.addAof(utils.ToCmdLine2("SETNX", args...))
 	return reply.NewIntReply(int64(absent))
 }
 
@@ -81,6 +84,7 @@ func execGetSet(db *DB, args [][]byte) resp.Reply {
 	value := args[1]
 	entity, exists := db.GetEntity(key)
 	_ = db.PutEntity(key, &database.DataEntity{Data: value})
+	db.addAof(utils.ToCmdLine2("GETSET", args...))
 	if !exists {
 		return reply.NewNullBulkReply()
 	}

+ 20 - 0
lib/utils/utils.go

@@ -0,0 +1,20 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/9/29 18:23
+// Desc:
+
+package utils
+
+func ToCmdLine(cmd ...string) [][]byte {
+	args := make([][]byte, len(cmd))
+	for i, s := range cmd {
+		args[i] = []byte(s)
+	}
+	return args
+}
+
+func ToCmdLine2(commandName string, args ...[]byte) [][]byte {
+	result := make([][]byte, len(args)+1)
+	result[0] = []byte(commandName)
+	copy(result[1:], args)
+	return result
+}

+ 5 - 6
redis.conf

@@ -1,10 +1,9 @@
 bind 0.0.0.0
 protected-mode no
 port 6379
+
 databases 16
-daemonize yes
-pidfile /var/run/redis_6379.pid
-logfile "/var/log/redis_6379.log"
-dir "/var/lib/redis/6379"
-maxmemory 1gb
-maxmemory-policy allkeys-lru
+
+
+appendonly yes
+appendfilename aof.data