handler.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/9/12 13:24
  3. // Desc: Handling RESP protocol
  4. package handler
  5. import (
  6. "context"
  7. "errors"
  8. "io"
  9. "net"
  10. "strings"
  11. "sync"
  12. "github.com/runningwater/go-redis/cluster"
  13. "github.com/runningwater/go-redis/config"
  14. "github.com/runningwater/go-redis/database"
  15. dbface "github.com/runningwater/go-redis/interface/database"
  16. "github.com/runningwater/go-redis/lib/logger"
  17. "github.com/runningwater/go-redis/lib/sync/atomic"
  18. "github.com/runningwater/go-redis/resp/connection"
  19. "github.com/runningwater/go-redis/resp/parser"
  20. "github.com/runningwater/go-redis/resp/reply"
  21. )
  22. // RespHandler handles client connections and RESP protocol
  23. type RespHandler struct {
  24. activeConn sync.Map
  25. db dbface.Database
  26. closing atomic.Boolean
  27. }
  28. // NewHandler creates a new RespHandler
  29. func NewHandler() *RespHandler {
  30. var db dbface.Database
  31. // db = database.NewEchoDatabase()
  32. if config.Properties.Self != "" && len(config.Properties.Peers) > 0 {
  33. logger.Info("cluster mode")
  34. db = cluster.NewClusterDatabase()
  35. } else {
  36. logger.Info("standalone mode")
  37. db = database.NewStandaloneDatabase()
  38. }
  39. return &RespHandler{
  40. db: db,
  41. }
  42. }
  43. // Handle processes client connections
  44. func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) {
  45. if r.closing.Get() {
  46. _ = conn.Close()
  47. return
  48. }
  49. logger.Info("new connection from ", conn.RemoteAddr().String())
  50. client := connection.NewConnection(conn)
  51. r.activeConn.Store(client, struct{}{})
  52. // Parse data
  53. ch := parser.ParseStream(conn)
  54. for payload := range ch {
  55. logger.Info("received channel data: ", payload)
  56. // error
  57. if payload.Err != nil {
  58. if errors.Is(payload.Err, io.EOF) ||
  59. errors.Is(payload.Err, io.ErrUnexpectedEOF) ||
  60. strings.Contains(payload.Err.Error(), "use of closed network connection") {
  61. r.closeClient(client)
  62. logger.Info("connection closed: ", client.RemoteAddr().String())
  63. return
  64. }
  65. // Protocol error
  66. logger.Error("protocol error: ", payload.Err.Error())
  67. errReply := reply.NewErrReply(payload.Err.Error())
  68. err := client.Write(errReply.ToBytes())
  69. if err != nil {
  70. r.closeClient(client)
  71. logger.Error("client.Write:", err.Error())
  72. return
  73. }
  74. continue
  75. }
  76. // exec
  77. if payload.Data == nil {
  78. logger.Error("empty payload")
  79. continue
  80. }
  81. var bulkReply *reply.MultiBulkReply
  82. ok := errors.As(payload.Data, &bulkReply)
  83. if !ok {
  84. logger.Error("require multi bulk reply")
  85. continue
  86. }
  87. // Command execution
  88. execResult := r.db.Exec(client, bulkReply.Args)
  89. if execResult == nil {
  90. execResult = reply.NewUnknownErrReply("aa")
  91. }
  92. // Return result
  93. logger.Info("exec result: ", execResult)
  94. _ = client.Write(execResult.ToBytes())
  95. } // for end
  96. }
  97. // Close shuts down the RespHandler and releases resources
  98. // This function closes all active connections and the database connection
  99. // Return value: error - Error information for the close operation, currently always returns nil
  100. func (r *RespHandler) Close() error {
  101. logger.Info("handler shutting down...")
  102. r.closing.Set(true)
  103. // Iterate through all active connections and close them
  104. r.activeConn.Range(
  105. func(key, value any) bool {
  106. client := key.(*connection.Connection)
  107. _ = client.Close()
  108. return true
  109. })
  110. // Close database connection
  111. r.db.Close()
  112. return nil
  113. }
  114. // closeClient closes a client connection
  115. // Parameters:
  116. //
  117. // client *connection.Connection - The client connection object to close
  118. func (r *RespHandler) closeClient(client *connection.Connection) {
  119. _ = client.Close()
  120. r.db.AfterClientClose(client)
  121. r.activeConn.Delete(client)
  122. }