Kaynağa Gözat

fix: fix som bugs

runningwater 1 ay önce
ebeveyn
işleme
8cdedbe3b0

+ 106 - 23
aof/aof.go

@@ -5,9 +5,13 @@
 package aof
 
 import (
+	"errors"
 	"io"
 	"os"
 	"strconv"
+	"sync"
+	"sync/atomic"
+	"time"
 
 	"github.com/runningwater/go-redis/config"
 	"github.com/runningwater/go-redis/interface/database"
@@ -33,7 +37,15 @@ type Handler struct {
 	aofChan     chan *payload     // AOF操作通道,用于异步处理写入请求
 	aofFile     *os.File          // AOF文件句柄
 	aofFileName string            // AOF文件名
-	currentDB   int               // 当前数据库索引,用于判断是否需要切换数据库
+
+	currentDB atomic.Int64 // 当前数据库索引,用于判断是否需要切换数据库
+}
+
+// payloadPool 负载对象池
+var payloadPool = sync.Pool{
+	New: func() any {
+		return &payload{}
+	},
 }
 
 // NewAofHandler 创建一个新的AOF处理器实例
@@ -65,20 +77,65 @@ func NewAofHandler(database database.Database) (*Handler, error) {
 	go func() {
 		h.handleAof()
 	}()
+	// 启动AOF刷新定时器
+	go h.startFlushTicker()
 
 	return h, nil
 }
 
+// Close 关闭AOF处理器
+func (h *Handler) Close() error {
+	logger.Info("aof shutting down...")
+	if h.aofChan != nil {
+		close(h.aofChan)
+	}
+	if h.aofFile != nil {
+		return h.aofFile.Close()
+	}
+	return nil
+}
+
 // AddAof 将命令写入AOF持久化日志
 //
 //	dbIndex: 数据库索引
 //	cmdLine: 要执行的命令行
 func (h *Handler) AddAof(dbIndex int, cmdLine database.CmdLine) {
 	// 检查是否启用了AOF持久化并且AOF通道不为空
-	if config.Properties.AppendOnly && h.aofChan != nil {
-		// 创建负载对象并发送到AOF通道
-		p := payload{cmdLine: cmdLine, dbIndex: dbIndex}
-		h.aofChan <- &p
+	if !config.Properties.AppendOnly || h.aofChan == nil {
+		return
+	}
+	// 创建负载对象并发送到AOF通道
+	p := payloadPool.Get().(*payload)
+	p.cmdLine = cmdLine
+	p.dbIndex = dbIndex
+
+	select {
+	case h.aofChan <- p:
+	default:
+		logger.Warn("aof channel is full, drop the command")
+		payloadPool.Put(p) // 回收负载对象
+		// TODO: 触发紧急刷盘或者报警
+		// h.startFlushTicker()
+	}
+}
+
+// startFlushTicker 启动AOF刷新定时器
+func (h *Handler) startFlushTicker() {
+	if config.Properties.AppendFsyncInterval == 0 {
+		config.Properties.AppendFsyncInterval = 5
+	} // in seconds
+	ticker := time.NewTicker(time.Duration(config.Properties.AppendFsyncInterval) * time.Second)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			logger.Debug("flush aof file")
+			// 每隔AppendFsyncInterval秒,将AOF文件进行同步
+			if err := h.aofFile.Sync(); err != nil {
+				logger.Error("sync aof file error:", err)
+			}
+		}
 	}
 }
 
@@ -88,33 +145,60 @@ func (h *Handler) AddAof(dbIndex int, cmdLine database.CmdLine) {
 //	当数据库索引发生变化时,会先写入SELECT命令切换数据库
 //	通过协程方式持续监听通道,实现异步AOF写入
 func (h *Handler) handleAof() {
-	h.currentDB = 0
+	defer func() {
+		if r := recover(); r != nil {
+			logger.Error("handle aof panic:", r)
+		}
+	}()
+
+	h.currentDB.Store(0)
 	for p := range h.aofChan {
+
 		// 判断是否切换了 DB,如果切换使用 SELECT index
-		if p.dbIndex != h.currentDB {
-			data := reply.NewMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
-			_, err := h.aofFile.Write(data)
-			if err != nil {
-				logger.Error("write aof file error:", err)
-				continue
-			}
-			h.currentDB = p.dbIndex
+		err := h.switchDatabaseIfNeed(p)
+		if err != nil {
+			logger.Error("write aof file error:", err)
+			continue
 		}
 
-		data := reply.NewMultiBulkReply(p.cmdLine).ToBytes()
-		_, err := h.aofFile.Write(data)
+		err = h.writeAofData(p.cmdLine)
 		if err != nil {
 			logger.Error("write aof file error:", err)
 			continue
 		}
+
+		// 回收负载对象
+		payloadPool.Put(p)
 	} // end for
 }
 
+// 封装AOF写入逻辑
+func (h *Handler) writeAofData(cmdLine database.CmdLine) error {
+	data := reply.NewMultiBulkReply(cmdLine).ToBytes()
+	_, err := h.aofFile.Write(data)
+	return err
+}
+
+func (h *Handler) switchDatabaseIfNeed(p *payload) error {
+	if p.dbIndex != int(h.currentDB.Load()) {
+		data := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))
+		err := h.writeAofData(data)
+		if err != nil {
+			logger.Error("write aof file error:", err)
+			return err
+		}
+		h.currentDB.Store(int64(p.dbIndex))
+	}
+	return nil
+}
+
 // LoadAof 从AOF文件中加载数据并执行其中的命令
 //
 //	该函数会打开AOF文件,解析其中的命令并依次执行,用于恢复数据库状态
 //	参数: 无
 //	返回值: 无
+//
+// TODO: 一次性加载整个文件可能导致内存占用过高,需要分批加载
 func (h *Handler) LoadAof() {
 	// 打开AOF文件
 	file, err := os.Open(h.aofFileName)
@@ -122,12 +206,11 @@ func (h *Handler) LoadAof() {
 		logger.Error("open aof file error:", err)
 		return
 	}
-	defer func(file *os.File) {
-		err := file.Close()
-		if err != nil {
-			return
+	defer func() {
+		if err := file.Close(); err != nil {
+			logger.Warn("close aof file error:", err)
 		}
-	}(file)
+	}()
 
 	// 解析AOF文件流并逐条执行命令
 	ch := parser.ParseStream(file)
@@ -146,8 +229,8 @@ func (h *Handler) LoadAof() {
 			continue
 		}
 		// 检查解析后的数据类型是否正确
-		cmd, ok := p.Data.(*reply.MultiBulkReply)
-		if !ok {
+		var cmd *reply.MultiBulkReply
+		if !errors.As(p.Data, &cmd) {
 			logger.Error("parse aof file error:", "data type error")
 			continue
 		}

+ 2 - 2
cluster/cluster_database.go

@@ -80,6 +80,6 @@ func (c *Database) Close() {
 	c.db.Close()
 }
 
-func (c *Database) AfterClientClose(client resp.Connection) {
-	c.db.AfterClientClose(client)
+func (c *Database) AfterClientClose() {
+	c.db.AfterClientClose()
 }

+ 3 - 0
config/config.go

@@ -21,6 +21,9 @@ type ServerProperties struct {
 	AppendOnly bool `cfg:"appendOnly"`
 	// AppendFilename AOF文件名
 	AppendFilename string `cfg:"appendFilename"`
+	// AppendFsyncInterval AOF文件同步间隔
+	AppendFsyncInterval int `cfg:"appendFsyncInterval"`
+
 	// MaxClients 最大客户端连接数
 	MaxClients int `cfg:"maxclients"`
 	// RequirePass 访问服务器所需的密码

+ 1 - 1
database/echo_database.go

@@ -24,6 +24,6 @@ func (e *EchoDatabase) Close() {
 	logger.Info("database closed")
 }
 
-func (e *EchoDatabase) AfterClientClose(_ resp.Connection) {
+func (e *EchoDatabase) AfterClientClose() {
 	logger.Info("client closed, AfterClientClose")
 }

+ 8 - 3
database/standalone_database.go

@@ -105,13 +105,18 @@ func (d *StandaloneDatabase) Exec(client resp.Connection, args [][]byte) resp.Re
 // Close 关闭数据库连接
 // TODO 实现数据库关闭逻辑
 func (d *StandaloneDatabase) Close() {
-	// TODO implement me
+	if d.aofHandler != nil {
+		err := d.aofHandler.Close()
+		if err != nil {
+			logger.Error(err)
+		}
+	}
 }
 
 // AfterClientClose 处理客户端连接关闭后的清理工作
 // TODO 实现客户端关闭后的处理逻辑
-func (d *StandaloneDatabase) AfterClientClose(client resp.Connection) {
-	// TODO implement me
+func (d *StandaloneDatabase) AfterClientClose() {
+
 }
 
 // execSelect 处理数据库选择命令

+ 1 - 1
interface/database/database.go

@@ -17,5 +17,5 @@ type DataEntity struct {
 type Database interface {
 	Exec(client resp.Connection, args [][]byte) resp.Reply
 	Close()
-	AfterClientClose(client resp.Connection)
+	AfterClientClose()
 }


+ 3 - 2
redis.conf

@@ -7,7 +7,8 @@ databases 16
 
 appendonly yes
 appendfilename aof.data
+appendFsyncInterval 20
 
 # cluster
-self  127.0.0.1:6379
-peers 127.0.0.1:6380
+# self  127.0.0.1:6379
+# peers 127.0.0.1:6380

+ 4 - 2
resp/handler/handler.go

@@ -132,7 +132,9 @@ func (r *RespHandler) Close() error {
 //
 //	client *connection.Connection - The client connection object to close
 func (r *RespHandler) closeClient(client *connection.Connection) {
-	_ = client.Close()
-	r.db.AfterClientClose(client)
+	if client != nil {
+		_ = client.Close()
+	}
+	r.db.AfterClientClose()
 	r.activeConn.Delete(client)
 }

+ 3 - 4
tcp/server.go

@@ -43,7 +43,7 @@ func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan
 
 	go func() {
 		<-closeChan
-		logger.Info("shutting down...")
+		logger.Info("server shutting down...")
 		_ = listener.Close()
 		_ = handler.Close()
 	}()
@@ -65,9 +65,8 @@ func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan
 		logger.Info("accept link")
 		waitDone.Add(1)
 		go func() {
-			defer func() {
-				waitDone.Done()
-			}()
+			defer waitDone.Done()
+
 			handler.Handle(ctx, conn)
 		}()
 	} // end for