// Author: simon (ynwdlxm@163.com) // Date: 2025/9/12 13:24 // Desc: 处理 RESP 协议 package handler import ( "context" "errors" "io" "net" "strings" "sync" "github.com/runningwater/go-redis/database" dbface "github.com/runningwater/go-redis/interface/database" "github.com/runningwater/go-redis/lib/logger" "github.com/runningwater/go-redis/lib/sync/atomic" "github.com/runningwater/go-redis/resp/connection" "github.com/runningwater/go-redis/resp/parser" "github.com/runningwater/go-redis/resp/reply" ) type RespHandler struct { activeConn sync.Map db dbface.Database closing atomic.Boolean } func NewHandler() *RespHandler { var db dbface.Database db = database.NewEchoDatabase() return &RespHandler{ db: db, } } func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) { if r.closing.Get() { _ = conn.Close() } logger.Info("new connection from ", conn.RemoteAddr().String()) client := connection.NewConnection(conn) r.activeConn.Store(client, struct{}{}) // 解析数据 ch := parser.ParseStream(conn) for payload := range ch { logger.Info("收到 通道数据: ", payload) // error if payload.Err != nil { if errors.Is(payload.Err, io.EOF) || errors.Is(payload.Err, io.ErrUnexpectedEOF) || strings.Contains(payload.Err.Error(), "use of closed network connection") { r.closeClient(client) logger.Info("connection closed: ", client.RemoteAddr().String()) return } // 协议错误 logger.Error("协议错误: ", payload.Err.Error()) errReplay := reply.NewErrReply(payload.Err.Error()) err := client.Write(errReplay.ToBytes()) if err != nil { r.closeClient(client) logger.Error("client.Write:", err.Error()) return } continue } // exec if payload.Data == nil { logger.Error("empty payload") continue } bulkReply, ok := payload.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } // 命令执行 execResult := r.db.Exec(client, bulkReply.Args) if execResult == nil { execResult = reply.NewUnknownErrReply() } // 返回结果 logger.Info("exec result: ", execResult) _ = client.Write(execResult.ToBytes()) } // for end } // Close 关闭RespHandler处理器,释放相关资源 // 该函数会关闭所有活跃连接并关闭数据库连接 // 返回值: error - 关闭操作的错误信息,目前始终返回nil func (r *RespHandler) Close() error { logger.Info("handler shutting down...") r.closing.Set(true) // 遍历所有活跃连接并关闭它们 r.activeConn.Range( func(key, value any) bool { client := key.(*connection.Connection) _ = client.Close() return true }) // 关闭数据库连接 r.db.Close() return nil } // closeClient 关闭客户端连接 // 参数: // // client *connection.Connection - 需要关闭的客户端连接对象 func (r *RespHandler) closeClient(client *connection.Connection) { _ = client.Close() r.db.AfterClientClose(client) r.activeConn.Delete(client) }