| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- // Author: simon (ynwdlxm@163.com)
- // Date: 2025/10/17 14:53
- // Desc: 客户端
- package client_
- import (
- "errors"
- "net"
- "runtime/debug"
- "sync"
- "time"
- "github.com/runningwater/go-redis/interface/resp"
- "github.com/runningwater/go-redis/lib/logger"
- "github.com/runningwater/go-redis/lib/sync/wait"
- "github.com/runningwater/go-redis/resp/parser"
- "github.com/runningwater/go-redis/resp/reply"
- )
- // Client 表示一个 Redis 客户端,用于与 Redis 服务器通信
- type Client struct {
- conn net.Conn // 与 Redis 服务器的网络连接
- pendingReqs chan *request // 存放待发送的请求队列
- waitingReqs chan *request // 存放已发送但等待响应的请求队列
- ticker *time.Ticker // 心跳定时器
- addr string // Redis 服务器地址
- working *sync.WaitGroup // 同步机制,跟踪正在进行的请求
- mu sync.RWMutex // 保护连接的读写锁
- closed bool // 客户端是否已关闭
- }
- // NewClient 创建一个新的 Redis 客户端实例
- // addr: Redis 服务器地址(例如 "localhost:6379")
- // 返回创建的客户端实例和可能的错误
- func NewClient(addr string) (*Client, error) {
- // 建立与 Redis 服务器的 TCP 连接
- dial, err := net.Dial("tcp", addr)
- if err != nil {
- return nil, err
- }
- // 初始化客户端结构体
- return &Client{
- conn: dial,
- pendingReqs: make(chan *request, chanSize), // 设置缓冲区大小为 chanSize
- waitingReqs: make(chan *request, chanSize),
- addr: addr,
- working: &sync.WaitGroup{},
- closed: false,
- }, nil
- }
- // Start 启动客户端,开始处理请求和心跳
- func (c *Client) Start() {
- c.mu.RLock()
- if c.closed {
- c.mu.RUnlock()
- return
- }
- c.mu.RUnlock()
- // 创建每10秒触发一次的心跳定时器
- c.ticker = time.NewTicker(10 * time.Second)
- // 启动写入协程,处理发送请求
- go c.handleWrite()
- // 启动读取协程,处理服务器响应
- go func() {
- err := c.handleRead()
- if err != nil {
- logger.Error("handle read error: ", err)
- }
- }()
- // 启动心跳协程
- go c.heartbeat()
- }
- // Stop 停止客户端,释放资源
- func (c *Client) Stop() {
- c.mu.Lock()
- if c.closed {
- c.mu.Unlock()
- return
- }
- c.closed = true
- c.mu.Unlock()
- // 停止心跳定时器
- if c.ticker != nil {
- c.ticker.Stop()
- }
- // 关闭待处理请求通道
- close(c.pendingReqs)
- // 等待所有正在进行的请求完成
- c.working.Wait()
- // 关闭网络连接和等待响应的通道
- c.mu.Lock()
- if c.conn != nil {
- _ = c.conn.Close()
- }
- c.mu.Unlock()
- // 关闭等待响应的通道
- close(c.waitingReqs)
- logger.Info("client stopped")
- }
- // Send 向 Redis 服务器发送命令并等待响应
- //
- // args: 要发送的命令及其参数
- //
- // 返回服务器响应或错误信息
- func (c *Client) Send(args [][]byte) resp.Reply {
- c.mu.Lock()
- if c.closed {
- c.mu.Unlock()
- return reply.NewErrReply("client is closed")
- }
- c.mu.Unlock()
- // 创建新的请求对象
- req := &request{
- id: uint64(time.Now().Unix()), // 使用时间戳作为请求ID
- args: args,
- heartbeat: false,
- waiting: &wait.Wait{}, // 用于等待响应的同步机制
- }
- // 增加等待计数器
- req.waiting.Add(1)
- c.working.Add(1)
- defer c.working.Done()
- // 将请求放入待处理队列
- select {
- case c.pendingReqs <- req:
- case <-time.After(maxWait):
- req.waiting.Done()
- return reply.NewErrReply("待处理队列已满")
- }
- // 等待服务器响应,设置最大等待时间为 maxWait
- timeout := req.waiting.WaitWithTimeout(maxWait)
- if timeout {
- return reply.NewErrReply("send server timeout")
- }
- // 检查请求是否有错误
- if req.err != nil {
- return reply.NewErrReply("request failed")
- }
- // 返回服务器响应
- return req.reply
- }
- // handleWrite 处理待发送的请求队列
- func (c *Client) handleWrite() {
- // 不断从待处理队列中取出请求并发送
- for req := range c.pendingReqs {
- c.doRequest(req)
- }
- }
- // doRequest 执行实际的请求发送操作
- // req: 要发送的请求对象
- func (c *Client) doRequest(req *request) {
- // 检查请求有效性
- if req == nil || len(req.args) == 0 {
- return
- }
- c.mu.RLock()
- if c.closed {
- c.mu.RUnlock()
- if req.waiting != nil {
- req.err = errors.New("client is closed for doRequest")
- req.waiting.Done()
- }
- return
- }
- conn := c.conn
- c.mu.RUnlock()
- // 将命令参数封装成 RESP 协议格式
- re := reply.NewMultiBulkReply(req.args)
- bytes := re.ToBytes()
- // 发送数据到服务器
- _, err := conn.Write(bytes)
- i := 0
- // 如果发送失败,尝试最多3次重连
- for err != nil && i < 3 {
- logger.Warn("write to server error:", err)
- logger.Info("reconnecting 第", i+1, " 次...")
- err = c.handleConnectionError(err)
- if err == nil {
- c.mu.RLock()
- if !c.closed {
- _, err = c.conn.Write(bytes)
- } else {
- c.mu.RUnlock()
- req.err = errors.New("client is closed for doRequest")
- req.waiting.Done()
- return
- }
- c.mu.RUnlock()
- }
- i++
- }
- // 如果发送成功,将请求移到等待响应队列
- if err == nil {
- select {
- case c.waitingReqs <- req:
- default:
- req.err = errors.New("waiting queue is full")
- req.waiting.Done()
- }
- } else {
- // 如果发送失败,标记错误并通知等待方
- req.err = err
- req.waiting.Done()
- }
- }
- // handleConnectionError 处理连接错误并尝试重新连接
- // err: 原始错误信息
- // 返回重连结果
- func (c *Client) handleConnectionError(err error) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.closed {
- return errors.New("client is closed for handleConnectionError")
- }
- // 关闭当前连接
- if c.conn != nil {
- err1 := c.conn.Close()
- if err1 != nil {
- var opErr *net.OpError
- if errors.As(err1, &opErr) {
- // 如果是连接已关闭的正常错误,则忽略
- if opErr.Err.Error() != "use of closed network connection" {
- return err1
- }
- } else {
- return err1
- }
- }
- }
- // 建立新连接
- dial, err1 := net.Dial("tcp", c.addr)
- if err1 != nil {
- logger.Error(err1)
- return err1
- }
- // 更新连接
- c.conn = dial
- logger.Info("reconnected")
- return nil
- }
- // handleRead 处理从服务器读取的响应消息
- // 返回处理过程中可能发生的错误
- func (c *Client) handleRead() error {
- c.mu.RLock()
- if c.closed || c.conn == nil {
- c.mu.RUnlock()
- return nil
- }
- conn := c.conn
- c.mu.RUnlock()
- // 使用解析器解析从连接中读取的数据流
- ch := parser.ParseStream(conn)
- // 不断处理解析出来的响应数据
- for payload := range ch {
- if payload.Err != nil {
- // 如果解析出错,记录错误并结束对应请求
- logger.Error("parse message error:", payload.Err)
- c.finishRequest(reply.NewErrReply(payload.Err.Error()))
- continue
- }
- // 正常处理响应数据
- c.finishRequest(payload.Data)
- }
- return nil
- }
- // finishRequest 完成请求处理,将响应返回给调用方
- // data: 从服务器收到的响应数据
- func (c *Client) finishRequest(data resp.Reply) {
- // 使用 defer 捕获可能出现的 panic
- defer func() {
- if err := recover(); err != nil {
- debug.PrintStack()
- logger.Error("panic in finish request:", err)
- }
- }()
- // 检查客户端是否已关闭
- c.mu.RLock()
- if c.closed {
- c.mu.RUnlock()
- return
- }
- c.mu.RUnlock()
- // 从等待响应队列中取出对应的请求
- select {
- case req := <-c.waitingReqs:
- if req == nil {
- return
- }
- // 设置响应数据
- req.reply = data
- // 通知等待方请求已完成
- if req.waiting != nil {
- req.waiting.Done()
- }
- case <-time.After(100 * time.Millisecond):
- // 防止在通道为空时阻塞过久
- logger.Warn("no request waiting for response")
- return
- }
- }
- // heartbeat 发送心跳包维持连接
- func (c *Client) heartbeat() {
- c.mu.RLock()
- if c.closed {
- c.mu.RUnlock()
- return
- }
- c.mu.RUnlock()
- // 每当定时器触发时发送心跳
- for range c.ticker.C {
- c.mu.RLock()
- if c.closed {
- c.mu.RUnlock()
- return
- }
- c.mu.RUnlock()
- // 发送心跳包
- c.doHeartbeat()
- }
- }
- // doHeartbeat 执行心跳操作
- func (c *Client) doHeartbeat() {
- // 创建心跳请求(PING 命令)
- req := &request{
- id: uint64(time.Now().UnixNano()),
- heartbeat: true,
- waiting: &wait.Wait{},
- args: [][]byte{[]byte("ping")},
- }
- // 设置等待机制
- req.waiting.Add(1)
- c.working.Add(1)
- defer c.working.Done()
- // 发送心跳请求
- select {
- case c.pendingReqs <- req:
- case <-time.After(maxWait):
- req.waiting.Done()
- logger.Warn("send heartbeat timeout")
- return
- }
- // 等待响应(不关心结果)
- req.waiting.WaitWithTimeout(maxWait)
- }
- // request 表示一个客户端请求
- type request struct {
- id uint64 // 请求唯一标识符
- args [][]byte // 请求参数
- reply resp.Reply // 服务器响应
- heartbeat bool // 是否为心跳请求
- waiting *wait.Wait // 等待同步机制
- err error // 请求过程中的错误
- }
- // 常量定义
- const (
- chanSize = 256 // 通道缓冲区大小
- maxWait = 3 * time.Second // 最大等待时间
- )
|