| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- // Author: simon (ynwdlxm@163.com)
- // Date: 2025/9/12 13:24
- // Desc: 处理 RESP 协议
- package handler
- import (
- "context"
- "errors"
- "io"
- "net"
- "strings"
- "sync"
- "github.com/runningwater/go-redis/database"
- dbface "github.com/runningwater/go-redis/interface/database"
- "github.com/runningwater/go-redis/lib/logger"
- "github.com/runningwater/go-redis/lib/sync/atomic"
- "github.com/runningwater/go-redis/resp/connection"
- "github.com/runningwater/go-redis/resp/parser"
- "github.com/runningwater/go-redis/resp/reply"
- )
- type RespHandler struct {
- activeConn sync.Map
- db dbface.Database
- closing atomic.Boolean
- }
- func NewHandler() *RespHandler {
- var db dbface.Database
- db = database.NewEchoDatabase()
- return &RespHandler{
- db: db,
- }
- }
- func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) {
- if r.closing.Get() {
- _ = conn.Close()
- }
- logger.Info("new connection from ", conn.RemoteAddr().String())
- client := connection.NewConnection(conn)
- r.activeConn.Store(client, struct{}{})
- // 解析数据
- ch := parser.ParseStream(conn)
- for payload := range ch {
- logger.Info("收到 通道数据: ", payload)
- // error
- if payload.Err != nil {
- if errors.Is(payload.Err, io.EOF) ||
- errors.Is(payload.Err, io.ErrUnexpectedEOF) ||
- strings.Contains(payload.Err.Error(), "use of closed network connection") {
- r.closeClient(client)
- logger.Info("connection closed: ", client.RemoteAddr().String())
- return
- }
- // 协议错误
- logger.Error("协议错误: ", payload.Err.Error())
- errReplay := reply.NewErrReply(payload.Err.Error())
- err := client.Write(errReplay.ToBytes())
- if err != nil {
- r.closeClient(client)
- logger.Error("client.Write:", err.Error())
- return
- }
- continue
- }
- // exec
- if payload.Data == nil {
- logger.Error("empty payload")
- continue
- }
- bulkReply, ok := payload.Data.(*reply.MultiBulkReply)
- if !ok {
- logger.Error("require multi bulk reply")
- continue
- }
- // 命令执行
- execResult := r.db.Exec(client, bulkReply.Args)
- if execResult == nil {
- execResult = reply.NewUnknownErrReply()
- }
- // 返回结果
- logger.Info("exec result: ", execResult)
- _ = client.Write(execResult.ToBytes())
- } // for end
- }
- // Close 关闭RespHandler处理器,释放相关资源
- // 该函数会关闭所有活跃连接并关闭数据库连接
- // 返回值: error - 关闭操作的错误信息,目前始终返回nil
- func (r *RespHandler) Close() error {
- logger.Info("handler shutting down...")
- r.closing.Set(true)
- // 遍历所有活跃连接并关闭它们
- r.activeConn.Range(
- func(key, value any) bool {
- client := key.(*connection.Connection)
- _ = client.Close()
- return true
- })
- // 关闭数据库连接
- r.db.Close()
- return nil
- }
- // closeClient 关闭客户端连接
- // 参数:
- //
- // client *connection.Connection - 需要关闭的客户端连接对象
- func (r *RespHandler) closeClient(client *connection.Connection) {
- _ = client.Close()
- r.db.AfterClientClose(client)
- r.activeConn.Delete(client)
- }
|