communication.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/10/20 14:13
  3. // Desc:
  4. package cluster
  5. import (
  6. "context"
  7. "errors"
  8. "strconv"
  9. "github.com/runningwater/go-redis/interface/resp"
  10. "github.com/runningwater/go-redis/lib/utils"
  11. client_ "github.com/runningwater/go-redis/resp/client"
  12. "github.com/runningwater/go-redis/resp/reply"
  13. )
  14. const (
  15. ErrNodeNotFound = "node connection not found"
  16. )
  17. func (c *Database) getPeerClient(peer string) (*client_.Client, error) {
  18. pool, ok := c.peerConnection[peer]
  19. if !ok {
  20. return nil, errors.New(ErrNodeNotFound)
  21. }
  22. object, err := pool.BorrowObject(context.Background())
  23. if err != nil {
  24. return nil, err
  25. }
  26. client, ok := object.(*client_.Client)
  27. if !ok {
  28. return nil, errors.New("type mismatch")
  29. }
  30. return client, nil
  31. }
  32. func (c *Database) returnPeerClient(peer string, peerClient *client_.Client) error {
  33. pool, ok := c.peerConnection[peer]
  34. if !ok {
  35. return errors.New(ErrNodeNotFound)
  36. }
  37. return pool.ReturnObject(context.Background(), peerClient)
  38. }
  39. // relay 将命令转发给指定节点
  40. func (c *Database) relay(peer string, conn resp.Connection, args [][]byte) resp.Reply {
  41. // 自己节点
  42. if peer == c.self {
  43. return c.db.Exec(conn, args)
  44. }
  45. // 操作其它节点
  46. peerClient, err := c.getPeerClient(peer)
  47. if err != nil {
  48. return reply.NewErrReply(err.Error())
  49. }
  50. defer func() {
  51. // 归还连接
  52. _ = c.returnPeerClient(peer, peerClient)
  53. }()
  54. selectCmd := utils.ToCmdLine("select", strconv.Itoa(conn.GetDBIndex()))
  55. _ = peerClient.Send(selectCmd)
  56. return peerClient.Send(args)
  57. }
  58. func (c *Database) broadcast(conn resp.Connection, args [][]byte) map[string]resp.Reply {
  59. results := make(map[string]resp.Reply)
  60. for _, node := range c.nodes {
  61. result := c.relay(node, conn, args)
  62. results[node] = result
  63. }
  64. return results
  65. }