client.go 8.9 KB


  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/10/17 14:53
  3. // Desc: 客户端
  4. package client_
  5. import (
  6. "errors"
  7. "net"
  8. "runtime/debug"
  9. "sync"
  10. "time"
  11. "github.com/runningwater/go-redis/interface/resp"
  12. "github.com/runningwater/go-redis/lib/logger"
  13. "github.com/runningwater/go-redis/lib/sync/wait"
  14. "github.com/runningwater/go-redis/resp/parser"
  15. "github.com/runningwater/go-redis/resp/reply"
  16. )
  17. // Client 表示一个 Redis 客户端,用于与 Redis 服务器通信
  18. type Client struct {
  19. conn net.Conn // 与 Redis 服务器的网络连接
  20. pendingReqs chan *request // 存放待发送的请求队列
  21. waitingReqs chan *request // 存放已发送但等待响应的请求队列
  22. ticker *time.Ticker // 心跳定时器
  23. addr string // Redis 服务器地址
  24. working *sync.WaitGroup // 同步机制,跟踪正在进行的请求
  25. mu sync.RWMutex // 保护连接的读写锁
  26. closed bool // 客户端是否已关闭
  27. }
  28. // NewClient 创建一个新的 Redis 客户端实例
  29. // addr: Redis 服务器地址(例如 "localhost:6379")
  30. // 返回创建的客户端实例和可能的错误
  31. func NewClient(addr string) (*Client, error) {
  32. // 建立与 Redis 服务器的 TCP 连接
  33. dial, err := net.Dial("tcp", addr)
  34. if err != nil {
  35. return nil, err
  36. }
  37. // 初始化客户端结构体
  38. return &Client{
  39. conn: dial,
  40. pendingReqs: make(chan *request, chanSize), // 设置缓冲区大小为 chanSize
  41. waitingReqs: make(chan *request, chanSize),
  42. addr: addr,
  43. working: &sync.WaitGroup{},
  44. closed: false,
  45. }, nil
  46. }
  47. // Start 启动客户端,开始处理请求和心跳
  48. func (c *Client) Start() {
  49. c.mu.RLock()
  50. if c.closed {
  51. c.mu.RUnlock()
  52. return
  53. }
  54. c.mu.RUnlock()
  55. // 创建每10秒触发一次的心跳定时器
  56. c.ticker = time.NewTicker(10 * time.Second)
  57. // 启动写入协程,处理发送请求
  58. go c.handleWrite()
  59. // 启动读取协程,处理服务器响应
  60. go func() {
  61. err := c.handleRead()
  62. if err != nil {
  63. logger.Error("handle read error: ", err)
  64. }
  65. }()
  66. // 启动心跳协程
  67. go c.heartbeat()
  68. }
  69. // Stop 停止客户端,释放资源
  70. func (c *Client) Stop() {
  71. c.mu.Lock()
  72. if c.closed {
  73. c.mu.Unlock()
  74. return
  75. }
  76. c.closed = true
  77. c.mu.Unlock()
  78. // 停止心跳定时器
  79. if c.ticker != nil {
  80. c.ticker.Stop()
  81. }
  82. // 关闭待处理请求通道
  83. close(c.pendingReqs)
  84. // 等待所有正在进行的请求完成
  85. c.working.Wait()
  86. // 关闭网络连接和等待响应的通道
  87. c.mu.Lock()
  88. if c.conn != nil {
  89. _ = c.conn.Close()
  90. }
  91. c.mu.Unlock()
  92. // 关闭等待响应的通道
  93. close(c.waitingReqs)
  94. logger.Info("client stopped")
  95. }
  96. // Send 向 Redis 服务器发送命令并等待响应
  97. //
  98. // args: 要发送的命令及其参数
  99. //
  100. // 返回服务器响应或错误信息
  101. func (c *Client) Send(args [][]byte) resp.Reply {
  102. c.mu.Lock()
  103. if c.closed {
  104. c.mu.Unlock()
  105. return reply.NewErrReply("client is closed")
  106. }
  107. c.mu.Unlock()
  108. // 创建新的请求对象
  109. req := &request{
  110. id: uint64(time.Now().Unix()), // 使用时间戳作为请求ID
  111. args: args,
  112. heartbeat: false,
  113. waiting: &wait.Wait{}, // 用于等待响应的同步机制
  114. }
  115. // 增加等待计数器
  116. req.waiting.Add(1)
  117. c.working.Add(1)
  118. defer c.working.Done()
  119. // 将请求放入待处理队列
  120. select {
  121. case c.pendingReqs <- req:
  122. case <-time.After(maxWait):
  123. req.waiting.Done()
  124. return reply.NewErrReply("待处理队列已满")
  125. }
  126. // 等待服务器响应,设置最大等待时间为 maxWait
  127. timeout := req.waiting.WaitWithTimeout(maxWait)
  128. if timeout {
  129. return reply.NewErrReply("send server timeout")
  130. }
  131. // 检查请求是否有错误
  132. if req.err != nil {
  133. return reply.NewErrReply("request failed")
  134. }
  135. // 返回服务器响应
  136. return req.reply
  137. }
  138. // handleWrite 处理待发送的请求队列
  139. func (c *Client) handleWrite() {
  140. // 不断从待处理队列中取出请求并发送
  141. for req := range c.pendingReqs {
  142. c.doRequest(req)
  143. }
  144. }
  145. // doRequest 执行实际的请求发送操作
  146. // req: 要发送的请求对象
  147. func (c *Client) doRequest(req *request) {
  148. // 检查请求有效性
  149. if req == nil || len(req.args) == 0 {
  150. return
  151. }
  152. c.mu.RLock()
  153. if c.closed {
  154. c.mu.RUnlock()
  155. if req.waiting != nil {
  156. req.err = errors.New("client is closed for doRequest")
  157. req.waiting.Done()
  158. }
  159. return
  160. }
  161. conn := c.conn
  162. c.mu.RUnlock()
  163. // 将命令参数封装成 RESP 协议格式
  164. re := reply.NewMultiBulkReply(req.args)
  165. bytes := re.ToBytes()
  166. // 发送数据到服务器
  167. _, err := conn.Write(bytes)
  168. i := 0
  169. // 如果发送失败,尝试最多3次重连
  170. for err != nil && i < 3 {
  171. logger.Warn("write to server error:", err)
  172. logger.Info("reconnecting 第", i+1, " 次...")
  173. err = c.handleConnectionError(err)
  174. if err == nil {
  175. c.mu.RLock()
  176. if !c.closed {
  177. _, err = c.conn.Write(bytes)
  178. } else {
  179. c.mu.RUnlock()
  180. req.err = errors.New("client is closed for doRequest")
  181. req.waiting.Done()
  182. return
  183. }
  184. c.mu.RUnlock()
  185. }
  186. i++
  187. }
  188. // 如果发送成功,将请求移到等待响应队列
  189. if err == nil {
  190. select {
  191. case c.waitingReqs <- req:
  192. default:
  193. req.err = errors.New("waiting queue is full")
  194. req.waiting.Done()
  195. }
  196. } else {
  197. // 如果发送失败,标记错误并通知等待方
  198. req.err = err
  199. req.waiting.Done()
  200. }
  201. }
  202. // handleConnectionError 处理连接错误并尝试重新连接
  203. // err: 原始错误信息
  204. // 返回重连结果
  205. func (c *Client) handleConnectionError(err error) error {
  206. c.mu.Lock()
  207. defer c.mu.Unlock()
  208. if c.closed {
  209. return errors.New("client is closed for handleConnectionError")
  210. }
  211. // 关闭当前连接
  212. if c.conn != nil {
  213. err1 := c.conn.Close()
  214. if err1 != nil {
  215. var opErr *net.OpError
  216. if errors.As(err1, &opErr) {
  217. // 如果是连接已关闭的正常错误,则忽略
  218. if opErr.Err.Error() != "use of closed network connection" {
  219. return err1
  220. }
  221. } else {
  222. return err1
  223. }
  224. }
  225. }
  226. // 建立新连接
  227. dial, err1 := net.Dial("tcp", c.addr)
  228. if err1 != nil {
  229. logger.Error(err1)
  230. return err1
  231. }
  232. // 更新连接
  233. c.conn = dial
  234. logger.Info("reconnected")
  235. return nil
  236. }
  237. // handleRead 处理从服务器读取的响应消息
  238. // 返回处理过程中可能发生的错误
  239. func (c *Client) handleRead() error {
  240. c.mu.RLock()
  241. if c.closed || c.conn == nil {
  242. c.mu.RUnlock()
  243. return nil
  244. }
  245. conn := c.conn
  246. c.mu.RUnlock()
  247. // 使用解析器解析从连接中读取的数据流
  248. ch := parser.ParseStream(conn)
  249. // 不断处理解析出来的响应数据
  250. for payload := range ch {
  251. if payload.Err != nil {
  252. // 如果解析出错,记录错误并结束对应请求
  253. logger.Error("parse message error:", payload.Err)
  254. c.finishRequest(reply.NewErrReply(payload.Err.Error()))
  255. continue
  256. }
  257. // 正常处理响应数据
  258. c.finishRequest(payload.Data)
  259. }
  260. return nil
  261. }
  262. // finishRequest 完成请求处理,将响应返回给调用方
  263. // data: 从服务器收到的响应数据
  264. func (c *Client) finishRequest(data resp.Reply) {
  265. // 使用 defer 捕获可能出现的 panic
  266. defer func() {
  267. if err := recover(); err != nil {
  268. debug.PrintStack()
  269. logger.Error("panic in finish request:", err)
  270. }
  271. }()
  272. // 检查客户端是否已关闭
  273. c.mu.RLock()
  274. if c.closed {
  275. c.mu.RUnlock()
  276. return
  277. }
  278. c.mu.RUnlock()
  279. // 从等待响应队列中取出对应的请求
  280. select {
  281. case req := <-c.waitingReqs:
  282. if req == nil {
  283. return
  284. }
  285. // 设置响应数据
  286. req.reply = data
  287. // 通知等待方请求已完成
  288. if req.waiting != nil {
  289. req.waiting.Done()
  290. }
  291. case <-time.After(100 * time.Millisecond):
  292. // 防止在通道为空时阻塞过久
  293. logger.Warn("no request waiting for response")
  294. return
  295. }
  296. }
  297. // heartbeat 发送心跳包维持连接
  298. func (c *Client) heartbeat() {
  299. c.mu.RLock()
  300. if c.closed {
  301. c.mu.RUnlock()
  302. return
  303. }
  304. c.mu.RUnlock()
  305. // 每当定时器触发时发送心跳
  306. for range c.ticker.C {
  307. c.mu.RLock()
  308. if c.closed {
  309. c.mu.RUnlock()
  310. return
  311. }
  312. c.mu.RUnlock()
  313. // 发送心跳包
  314. c.doHeartbeat()
  315. }
  316. }
  317. // doHeartbeat 执行心跳操作
  318. func (c *Client) doHeartbeat() {
  319. // 创建心跳请求(PING 命令)
  320. req := &request{
  321. id: uint64(time.Now().UnixNano()),
  322. heartbeat: true,
  323. waiting: &wait.Wait{},
  324. args: [][]byte{[]byte("ping")},
  325. }
  326. // 设置等待机制
  327. req.waiting.Add(1)
  328. c.working.Add(1)
  329. defer c.working.Done()
  330. // 发送心跳请求
  331. select {
  332. case c.pendingReqs <- req:
  333. case <-time.After(maxWait):
  334. req.waiting.Done()
  335. logger.Warn("send heartbeat timeout")
  336. return
  337. }
  338. // 等待响应(不关心结果)
  339. req.waiting.WaitWithTimeout(maxWait)
  340. }
  341. // request 表示一个客户端请求
  342. type request struct {
  343. id uint64 // 请求唯一标识符
  344. args [][]byte // 请求参数
  345. reply resp.Reply // 服务器响应
  346. heartbeat bool // 是否为心跳请求
  347. waiting *wait.Wait // 等待同步机制
  348. err error // 请求过程中的错误
  349. }
  350. // 常量定义
  351. const (
  352. chanSize = 256 // 通道缓冲区大小
  353. maxWait = 3 * time.Second // 最大等待时间
  354. )