// Author: simon (ynwdlxm@163.com) // Date: 2025/9/12 13:24 // Desc: Handling RESP protocol package handler import ( "context" "errors" "io" "net" "strings" "sync" "github.com/runningwater/go-redis/database" dbface "github.com/runningwater/go-redis/interface/database" "github.com/runningwater/go-redis/lib/logger" "github.com/runningwater/go-redis/lib/sync/atomic" "github.com/runningwater/go-redis/resp/connection" "github.com/runningwater/go-redis/resp/parser" "github.com/runningwater/go-redis/resp/reply" ) // RespHandler handles client connections and RESP protocol type RespHandler struct { activeConn sync.Map db dbface.Database closing atomic.Boolean } // NewHandler creates a new RespHandler func NewHandler() *RespHandler { return &RespHandler{ // db: database.NewEchoDatabase(), db: database.NewDatabase(), } } // Handle processes client connections func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) { if r.closing.Get() { _ = conn.Close() return } logger.Info("new connection from ", conn.RemoteAddr().String()) client := connection.NewConnection(conn) r.activeConn.Store(client, struct{}{}) // Parse data ch := parser.ParseStream(conn) for payload := range ch { logger.Info("received channel data: ", payload) // error if payload.Err != nil { if errors.Is(payload.Err, io.EOF) || errors.Is(payload.Err, io.ErrUnexpectedEOF) || strings.Contains(payload.Err.Error(), "use of closed network connection") { r.closeClient(client) logger.Info("connection closed: ", client.RemoteAddr().String()) return } // Protocol error logger.Error("protocol error: ", payload.Err.Error()) errReply := reply.NewErrReply(payload.Err.Error()) err := client.Write(errReply.ToBytes()) if err != nil { r.closeClient(client) logger.Error("client.Write:", err.Error()) return } continue } // exec if payload.Data == nil { logger.Error("empty payload") continue } bulkReply, ok := payload.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } // Command execution execResult := r.db.Exec(client, bulkReply.Args) if execResult == nil { execResult = reply.NewUnknownErrReply("aa") } // Return result logger.Info("exec result: ", execResult) _ = client.Write(execResult.ToBytes()) } // for end } // Close shuts down the RespHandler and releases resources // This function closes all active connections and the database connection // Return value: error - Error information for the close operation, currently always returns nil func (r *RespHandler) Close() error { logger.Info("handler shutting down...") r.closing.Set(true) // Iterate through all active connections and close them r.activeConn.Range( func(key, value any) bool { client := key.(*connection.Connection) _ = client.Close() return true }) // Close database connection r.db.Close() return nil } // closeClient closes a client connection // Parameters: // // client *connection.Connection - The client connection object to close func (r *RespHandler) closeClient(client *connection.Connection) { _ = client.Close() r.db.AfterClientClose(client) r.activeConn.Delete(client) }