// Author: simon (ynwdlxm@163.com) // Date: 2025/10/17 14:53 // Desc: 客户端 package client_ import ( "errors" "net" "runtime/debug" "sync" "time" "github.com/runningwater/go-redis/interface/resp" "github.com/runningwater/go-redis/lib/logger" "github.com/runningwater/go-redis/lib/sync/wait" "github.com/runningwater/go-redis/resp/parser" "github.com/runningwater/go-redis/resp/reply" ) // Client 表示一个 Redis 客户端,用于与 Redis 服务器通信 type Client struct { conn net.Conn // 与 Redis 服务器的网络连接 pendingReqs chan *request // 存放待发送的请求队列 waitingReqs chan *request // 存放已发送但等待响应的请求队列 ticker *time.Ticker // 心跳定时器 addr string // Redis 服务器地址 working *sync.WaitGroup // 同步机制,跟踪正在进行的请求 mu sync.RWMutex // 保护连接的读写锁 closed bool // 客户端是否已关闭 } // NewClient 创建一个新的 Redis 客户端实例 // addr: Redis 服务器地址(例如 "localhost:6379") // 返回创建的客户端实例和可能的错误 func NewClient(addr string) (*Client, error) { // 建立与 Redis 服务器的 TCP 连接 dial, err := net.Dial("tcp", addr) if err != nil { return nil, err } // 初始化客户端结构体 return &Client{ conn: dial, pendingReqs: make(chan *request, chanSize), // 设置缓冲区大小为 chanSize waitingReqs: make(chan *request, chanSize), addr: addr, working: &sync.WaitGroup{}, closed: false, }, nil } // Start 启动客户端,开始处理请求和心跳 func (c *Client) Start() { c.mu.RLock() if c.closed { c.mu.RUnlock() return } c.mu.RUnlock() // 创建每10秒触发一次的心跳定时器 c.ticker = time.NewTicker(10 * time.Second) // 启动写入协程,处理发送请求 go c.handleWrite() // 启动读取协程,处理服务器响应 go func() { err := c.handleRead() if err != nil { logger.Error("handle read error: ", err) } }() // 启动心跳协程 go c.heartbeat() } // Stop 停止客户端,释放资源 func (c *Client) Stop() { c.mu.Lock() if c.closed { c.mu.Unlock() return } c.closed = true c.mu.Unlock() // 停止心跳定时器 if c.ticker != nil { c.ticker.Stop() } // 关闭待处理请求通道 close(c.pendingReqs) // 等待所有正在进行的请求完成 c.working.Wait() // 关闭网络连接和等待响应的通道 c.mu.Lock() if c.conn != nil { _ = c.conn.Close() } c.mu.Unlock() // 关闭等待响应的通道 close(c.waitingReqs) logger.Info("client stopped") } // Send 向 Redis 服务器发送命令并等待响应 // // args: 要发送的命令及其参数 // // 返回服务器响应或错误信息 func (c *Client) Send(args [][]byte) resp.Reply { c.mu.Lock() if c.closed { c.mu.Unlock() return reply.NewErrReply("client is closed") } c.mu.Unlock() // 创建新的请求对象 req := &request{ id: uint64(time.Now().Unix()), // 使用时间戳作为请求ID args: args, heartbeat: false, waiting: &wait.Wait{}, // 用于等待响应的同步机制 } // 增加等待计数器 req.waiting.Add(1) c.working.Add(1) defer c.working.Done() // 将请求放入待处理队列 select { case c.pendingReqs <- req: case <-time.After(maxWait): req.waiting.Done() return reply.NewErrReply("待处理队列已满") } // 等待服务器响应,设置最大等待时间为 maxWait timeout := req.waiting.WaitWithTimeout(maxWait) if timeout { return reply.NewErrReply("send server timeout") } // 检查请求是否有错误 if req.err != nil { return reply.NewErrReply("request failed") } // 返回服务器响应 return req.reply } // handleWrite 处理待发送的请求队列 func (c *Client) handleWrite() { // 不断从待处理队列中取出请求并发送 for req := range c.pendingReqs { c.doRequest(req) } } // doRequest 执行实际的请求发送操作 // req: 要发送的请求对象 func (c *Client) doRequest(req *request) { // 检查请求有效性 if req == nil || len(req.args) == 0 { return } c.mu.RLock() if c.closed { c.mu.RUnlock() if req.waiting != nil { req.err = errors.New("client is closed for doRequest") req.waiting.Done() } return } conn := c.conn c.mu.RUnlock() // 将命令参数封装成 RESP 协议格式 re := reply.NewMultiBulkReply(req.args) bytes := re.ToBytes() // 发送数据到服务器 _, err := conn.Write(bytes) i := 0 // 如果发送失败,尝试最多3次重连 for err != nil && i < 3 { logger.Warn("write to server error:", err) logger.Info("reconnecting 第", i+1, " 次...") err = c.handleConnectionError(err) if err == nil { c.mu.RLock() if !c.closed { _, err = c.conn.Write(bytes) } else { c.mu.RUnlock() req.err = errors.New("client is closed for doRequest") req.waiting.Done() return } c.mu.RUnlock() } i++ } // 如果发送成功,将请求移到等待响应队列 if err == nil { select { case c.waitingReqs <- req: default: req.err = errors.New("waiting queue is full") req.waiting.Done() } } else { // 如果发送失败,标记错误并通知等待方 req.err = err req.waiting.Done() } } // handleConnectionError 处理连接错误并尝试重新连接 // err: 原始错误信息 // 返回重连结果 func (c *Client) handleConnectionError(err error) error { c.mu.Lock() defer c.mu.Unlock() if c.closed { return errors.New("client is closed for handleConnectionError") } // 关闭当前连接 if c.conn != nil { err1 := c.conn.Close() if err1 != nil { var opErr *net.OpError if errors.As(err1, &opErr) { // 如果是连接已关闭的正常错误,则忽略 if opErr.Err.Error() != "use of closed network connection" { return err1 } } else { return err1 } } } // 建立新连接 dial, err1 := net.Dial("tcp", c.addr) if err1 != nil { logger.Error(err1) return err1 } // 更新连接 c.conn = dial logger.Info("reconnected") return nil } // handleRead 处理从服务器读取的响应消息 // 返回处理过程中可能发生的错误 func (c *Client) handleRead() error { c.mu.RLock() if c.closed || c.conn == nil { c.mu.RUnlock() return nil } conn := c.conn c.mu.RUnlock() // 使用解析器解析从连接中读取的数据流 ch := parser.ParseStream(conn) // 不断处理解析出来的响应数据 for payload := range ch { if payload.Err != nil { // 如果解析出错,记录错误并结束对应请求 logger.Error("parse message error:", payload.Err) c.finishRequest(reply.NewErrReply(payload.Err.Error())) continue } // 正常处理响应数据 c.finishRequest(payload.Data) } return nil } // finishRequest 完成请求处理,将响应返回给调用方 // data: 从服务器收到的响应数据 func (c *Client) finishRequest(data resp.Reply) { // 使用 defer 捕获可能出现的 panic defer func() { if err := recover(); err != nil { debug.PrintStack() logger.Error("panic in finish request:", err) } }() // 检查客户端是否已关闭 c.mu.RLock() if c.closed { c.mu.RUnlock() return } c.mu.RUnlock() // 从等待响应队列中取出对应的请求 select { case req := <-c.waitingReqs: if req == nil { return } // 设置响应数据 req.reply = data // 通知等待方请求已完成 if req.waiting != nil { req.waiting.Done() } case <-time.After(100 * time.Millisecond): // 防止在通道为空时阻塞过久 logger.Warn("no request waiting for response") return } } // heartbeat 发送心跳包维持连接 func (c *Client) heartbeat() { c.mu.RLock() if c.closed { c.mu.RUnlock() return } c.mu.RUnlock() // 每当定时器触发时发送心跳 for range c.ticker.C { c.mu.RLock() if c.closed { c.mu.RUnlock() return } c.mu.RUnlock() // 发送心跳包 c.doHeartbeat() } } // doHeartbeat 执行心跳操作 func (c *Client) doHeartbeat() { // 创建心跳请求(PING 命令) req := &request{ id: uint64(time.Now().UnixNano()), heartbeat: true, waiting: &wait.Wait{}, args: [][]byte{[]byte("ping")}, } // 设置等待机制 req.waiting.Add(1) c.working.Add(1) defer c.working.Done() // 发送心跳请求 select { case c.pendingReqs <- req: case <-time.After(maxWait): req.waiting.Done() logger.Warn("send heartbeat timeout") return } // 等待响应(不关心结果) req.waiting.WaitWithTimeout(maxWait) } // request 表示一个客户端请求 type request struct { id uint64 // 请求唯一标识符 args [][]byte // 请求参数 reply resp.Reply // 服务器响应 heartbeat bool // 是否为心跳请求 waiting *wait.Wait // 等待同步机制 err error // 请求过程中的错误 } // 常量定义 const ( chanSize = 256 // 通道缓冲区大小 maxWait = 3 * time.Second // 最大等待时间 )