handler.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/9/12 13:24
  3. // Desc: 处理 RESP 协议
  4. package handler
  5. import (
  6. "context"
  7. "errors"
  8. "io"
  9. "net"
  10. "strings"
  11. "sync"
  12. "github.com/runningwater/go-redis/database"
  13. dbface "github.com/runningwater/go-redis/interface/database"
  14. "github.com/runningwater/go-redis/lib/logger"
  15. "github.com/runningwater/go-redis/lib/sync/atomic"
  16. "github.com/runningwater/go-redis/resp/connection"
  17. "github.com/runningwater/go-redis/resp/parser"
  18. "github.com/runningwater/go-redis/resp/reply"
  19. )
  20. type RespHandler struct {
  21. activeConn sync.Map
  22. db dbface.Database
  23. closing atomic.Boolean
  24. }
  25. func NewHandler() *RespHandler {
  26. var db dbface.Database
  27. db = database.NewEchoDatabase()
  28. return &RespHandler{
  29. db: db,
  30. }
  31. }
  32. func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) {
  33. if r.closing.Get() {
  34. _ = conn.Close()
  35. }
  36. logger.Info("new connection from ", conn.RemoteAddr().String())
  37. client := connection.NewConnection(conn)
  38. r.activeConn.Store(client, struct{}{})
  39. // 解析数据
  40. ch := parser.ParseStream(conn)
  41. for payload := range ch {
  42. logger.Info("收到 通道数据: ", payload)
  43. // error
  44. if payload.Err != nil {
  45. if errors.Is(payload.Err, io.EOF) ||
  46. errors.Is(payload.Err, io.ErrUnexpectedEOF) ||
  47. strings.Contains(payload.Err.Error(), "use of closed network connection") {
  48. r.closeClient(client)
  49. logger.Info("connection closed: ", client.RemoteAddr().String())
  50. return
  51. }
  52. // 协议错误
  53. logger.Error("协议错误: ", payload.Err.Error())
  54. errReplay := reply.NewErrReply(payload.Err.Error())
  55. err := client.Write(errReplay.ToBytes())
  56. if err != nil {
  57. r.closeClient(client)
  58. logger.Error("client.Write:", err.Error())
  59. return
  60. }
  61. continue
  62. }
  63. // exec
  64. if payload.Data == nil {
  65. logger.Error("empty payload")
  66. continue
  67. }
  68. bulkReply, ok := payload.Data.(*reply.MultiBulkReply)
  69. if !ok {
  70. logger.Error("require multi bulk reply")
  71. continue
  72. }
  73. // 命令执行
  74. execResult := r.db.Exec(client, bulkReply.Args)
  75. if execResult == nil {
  76. execResult = reply.NewUnknownErrReply()
  77. }
  78. // 返回结果
  79. logger.Info("exec result: ", execResult)
  80. _ = client.Write(execResult.ToBytes())
  81. } // for end
  82. }
  83. // Close 关闭RespHandler处理器,释放相关资源
  84. // 该函数会关闭所有活跃连接并关闭数据库连接
  85. // 返回值: error - 关闭操作的错误信息,目前始终返回nil
  86. func (r *RespHandler) Close() error {
  87. logger.Info("handler shutting down...")
  88. r.closing.Set(true)
  89. // 遍历所有活跃连接并关闭它们
  90. r.activeConn.Range(
  91. func(key, value any) bool {
  92. client := key.(*connection.Connection)
  93. _ = client.Close()
  94. return true
  95. })
  96. // 关闭数据库连接
  97. r.db.Close()
  98. return nil
  99. }
  100. // closeClient 关闭客户端连接
  101. // 参数:
  102. //
  103. // client *connection.Connection - 需要关闭的客户端连接对象
  104. func (r *RespHandler) closeClient(client *connection.Connection) {
  105. _ = client.Close()
  106. r.db.AfterClientClose(client)
  107. r.activeConn.Delete(client)
  108. }