handler.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/9/12 13:24
  3. // Desc: Handling RESP protocol
  4. package handler
  5. import (
  6. "context"
  7. "errors"
  8. "io"
  9. "net"
  10. "strings"
  11. "sync"
  12. "github.com/runningwater/go-redis/database"
  13. dbface "github.com/runningwater/go-redis/interface/database"
  14. "github.com/runningwater/go-redis/lib/logger"
  15. "github.com/runningwater/go-redis/lib/sync/atomic"
  16. "github.com/runningwater/go-redis/resp/connection"
  17. "github.com/runningwater/go-redis/resp/parser"
  18. "github.com/runningwater/go-redis/resp/reply"
  19. )
  20. // RespHandler handles client connections and RESP protocol
  21. type RespHandler struct {
  22. activeConn sync.Map
  23. db dbface.Database
  24. closing atomic.Boolean
  25. }
  26. // NewHandler creates a new RespHandler
  27. func NewHandler() *RespHandler {
  28. return &RespHandler{
  29. // db: database.NewEchoDatabase(),
  30. db: database.NewDatabase(),
  31. }
  32. }
  33. // Handle processes client connections
  34. func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) {
  35. if r.closing.Get() {
  36. _ = conn.Close()
  37. return
  38. }
  39. logger.Info("new connection from ", conn.RemoteAddr().String())
  40. client := connection.NewConnection(conn)
  41. r.activeConn.Store(client, struct{}{})
  42. // Parse data
  43. ch := parser.ParseStream(conn)
  44. for payload := range ch {
  45. logger.Info("received channel data: ", payload)
  46. // error
  47. if payload.Err != nil {
  48. if errors.Is(payload.Err, io.EOF) ||
  49. errors.Is(payload.Err, io.ErrUnexpectedEOF) ||
  50. strings.Contains(payload.Err.Error(), "use of closed network connection") {
  51. r.closeClient(client)
  52. logger.Info("connection closed: ", client.RemoteAddr().String())
  53. return
  54. }
  55. // Protocol error
  56. logger.Error("protocol error: ", payload.Err.Error())
  57. errReply := reply.NewErrReply(payload.Err.Error())
  58. err := client.Write(errReply.ToBytes())
  59. if err != nil {
  60. r.closeClient(client)
  61. logger.Error("client.Write:", err.Error())
  62. return
  63. }
  64. continue
  65. }
  66. // exec
  67. if payload.Data == nil {
  68. logger.Error("empty payload")
  69. continue
  70. }
  71. bulkReply, ok := payload.Data.(*reply.MultiBulkReply)
  72. if !ok {
  73. logger.Error("require multi bulk reply")
  74. continue
  75. }
  76. // Command execution
  77. execResult := r.db.Exec(client, bulkReply.Args)
  78. if execResult == nil {
  79. execResult = reply.NewUnknownErrReply("aa")
  80. }
  81. // Return result
  82. logger.Info("exec result: ", execResult)
  83. _ = client.Write(execResult.ToBytes())
  84. } // for end
  85. }
  86. // Close shuts down the RespHandler and releases resources
  87. // This function closes all active connections and the database connection
  88. // Return value: error - Error information for the close operation, currently always returns nil
  89. func (r *RespHandler) Close() error {
  90. logger.Info("handler shutting down...")
  91. r.closing.Set(true)
  92. // Iterate through all active connections and close them
  93. r.activeConn.Range(
  94. func(key, value any) bool {
  95. client := key.(*connection.Connection)
  96. _ = client.Close()
  97. return true
  98. })
  99. // Close database connection
  100. r.db.Close()
  101. return nil
  102. }
  103. // closeClient closes a client connection
  104. // Parameters:
  105. //
  106. // client *connection.Connection - The client connection object to close
  107. func (r *RespHandler) closeClient(client *connection.Connection) {
  108. _ = client.Close()
  109. r.db.AfterClientClose(client)
  110. r.activeConn.Delete(client)
  111. }