| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- // Author: simon (ynwdlxm@163.com)
- // Date: 2025/9/12 13:24
- // Desc: Handling RESP protocol
- package handler
- import (
- "context"
- "errors"
- "io"
- "net"
- "strings"
- "sync"
- "github.com/runningwater/go-redis/cluster"
- "github.com/runningwater/go-redis/config"
- "github.com/runningwater/go-redis/database"
- dbface "github.com/runningwater/go-redis/interface/database"
- "github.com/runningwater/go-redis/lib/logger"
- "github.com/runningwater/go-redis/lib/sync/atomic"
- "github.com/runningwater/go-redis/resp/connection"
- "github.com/runningwater/go-redis/resp/parser"
- "github.com/runningwater/go-redis/resp/reply"
- )
- // RespHandler handles client connections and RESP protocol
- type RespHandler struct {
- activeConn sync.Map
- db dbface.Database
- closing atomic.Boolean
- }
- // NewHandler creates a new RespHandler
- func NewHandler() *RespHandler {
- var db dbface.Database
- // db = database.NewEchoDatabase()
- if config.Properties.Self != "" && len(config.Properties.Peers) > 0 {
- logger.Info("cluster mode")
- db = cluster.NewClusterDatabase()
- } else {
- logger.Info("standalone mode")
- db = database.NewStandaloneDatabase()
- }
- return &RespHandler{
- db: db,
- }
- }
- // Handle processes client connections
- func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) {
- if r.closing.Get() {
- _ = conn.Close()
- return
- }
- logger.Info("new connection from ", conn.RemoteAddr().String())
- client := connection.NewConnection(conn)
- r.activeConn.Store(client, struct{}{})
- // Parse data
- ch := parser.ParseStream(conn)
- for payload := range ch {
- logger.Info("received channel data: ", payload)
- // error
- if payload.Err != nil {
- if errors.Is(payload.Err, io.EOF) ||
- errors.Is(payload.Err, io.ErrUnexpectedEOF) ||
- strings.Contains(payload.Err.Error(), "use of closed network connection") {
- r.closeClient(client)
- logger.Info("connection closed: ", client.RemoteAddr().String())
- return
- }
- // Protocol error
- logger.Error("protocol error: ", payload.Err.Error())
- errReply := reply.NewErrReply(payload.Err.Error())
- err := client.Write(errReply.ToBytes())
- if err != nil {
- r.closeClient(client)
- logger.Error("client.Write:", err.Error())
- return
- }
- continue
- }
- // exec
- if payload.Data == nil {
- logger.Error("empty payload")
- continue
- }
- var bulkReply *reply.MultiBulkReply
- ok := errors.As(payload.Data, &bulkReply)
- if !ok {
- logger.Error("require multi bulk reply")
- continue
- }
- // Command execution
- execResult := r.db.Exec(client, bulkReply.Args)
- if execResult == nil {
- execResult = reply.NewUnknownErrReply("aa")
- }
- // Return result
- logger.Info("exec result: ", execResult)
- _ = client.Write(execResult.ToBytes())
- } // for end
- }
- // Close shuts down the RespHandler and releases resources
- // This function closes all active connections and the database connection
- // Return value: error - Error information for the close operation, currently always returns nil
- func (r *RespHandler) Close() error {
- logger.Info("handler shutting down...")
- r.closing.Set(true)
- // Iterate through all active connections and close them
- r.activeConn.Range(
- func(key, value any) bool {
- client := key.(*connection.Connection)
- _ = client.Close()
- return true
- })
- // Close database connection
- r.db.Close()
- return nil
- }
- // closeClient closes a client connection
- // Parameters:
- //
- // client *connection.Connection - The client connection object to close
- func (r *RespHandler) closeClient(client *connection.Connection) {
- _ = client.Close()
- r.db.AfterClientClose(client)
- r.activeConn.Delete(client)
- }
|