handler.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/9/12 13:24
  3. // Desc: Handling RESP protocol
  4. package handler
  5. import (
  6. "context"
  7. "errors"
  8. "io"
  9. "net"
  10. "strings"
  11. "sync"
  12. "github.com/runningwater/go-redis/database"
  13. dbface "github.com/runningwater/go-redis/interface/database"
  14. "github.com/runningwater/go-redis/lib/logger"
  15. "github.com/runningwater/go-redis/lib/sync/atomic"
  16. "github.com/runningwater/go-redis/resp/connection"
  17. "github.com/runningwater/go-redis/resp/parser"
  18. "github.com/runningwater/go-redis/resp/reply"
  19. )
  20. // RespHandler handles client connections and RESP protocol
  21. type RespHandler struct {
  22. activeConn sync.Map
  23. db dbface.Database
  24. closing atomic.Boolean
  25. }
  26. // NewHandler creates a new RespHandler
  27. func NewHandler() *RespHandler {
  28. return &RespHandler{
  29. db: database.NewEchoDatabase(),
  30. }
  31. }
  32. // Handle processes client connections
  33. func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) {
  34. if r.closing.Get() {
  35. _ = conn.Close()
  36. return
  37. }
  38. logger.Info("new connection from ", conn.RemoteAddr().String())
  39. client := connection.NewConnection(conn)
  40. r.activeConn.Store(client, struct{}{})
  41. // Parse data
  42. ch := parser.ParseStream(conn)
  43. for payload := range ch {
  44. logger.Info("received channel data: ", payload)
  45. // error
  46. if payload.Err != nil {
  47. if errors.Is(payload.Err, io.EOF) ||
  48. errors.Is(payload.Err, io.ErrUnexpectedEOF) ||
  49. strings.Contains(payload.Err.Error(), "use of closed network connection") {
  50. r.closeClient(client)
  51. logger.Info("connection closed: ", client.RemoteAddr().String())
  52. return
  53. }
  54. // Protocol error
  55. logger.Error("protocol error: ", payload.Err.Error())
  56. errReply := reply.NewErrReply(payload.Err.Error())
  57. err := client.Write(errReply.ToBytes())
  58. if err != nil {
  59. r.closeClient(client)
  60. logger.Error("client.Write:", err.Error())
  61. return
  62. }
  63. continue
  64. }
  65. // exec
  66. if payload.Data == nil {
  67. logger.Error("empty payload")
  68. continue
  69. }
  70. bulkReply, ok := payload.Data.(*reply.MultiBulkReply)
  71. if !ok {
  72. logger.Error("require multi bulk reply")
  73. continue
  74. }
  75. // Command execution
  76. execResult := r.db.Exec(client, bulkReply.Args)
  77. if execResult == nil {
  78. execResult = reply.NewUnknownErrReply("aa")
  79. }
  80. // Return result
  81. logger.Info("exec result: ", execResult)
  82. _ = client.Write(execResult.ToBytes())
  83. } // for end
  84. }
  85. // Close shuts down the RespHandler and releases resources
  86. // This function closes all active connections and the database connection
  87. // Return value: error - Error information for the close operation, currently always returns nil
  88. func (r *RespHandler) Close() error {
  89. logger.Info("handler shutting down...")
  90. r.closing.Set(true)
  91. // Iterate through all active connections and close them
  92. r.activeConn.Range(
  93. func(key, value any) bool {
  94. client := key.(*connection.Connection)
  95. _ = client.Close()
  96. return true
  97. })
  98. // Close database connection
  99. r.db.Close()
  100. return nil
  101. }
  102. // closeClient closes a client connection
  103. // Parameters:
  104. //
  105. // client *connection.Connection - The client connection object to close
  106. func (r *RespHandler) closeClient(client *connection.Connection) {
  107. _ = client.Close()
  108. r.db.AfterClientClose(client)
  109. r.activeConn.Delete(client)
  110. }