// Author: simon (ynwdlxm@163.com) // Date: 2025/10/20 14:13 // Desc: package cluster import ( "context" "errors" "strconv" "github.com/runningwater/go-redis/interface/resp" "github.com/runningwater/go-redis/lib/utils" client_ "github.com/runningwater/go-redis/resp/client" "github.com/runningwater/go-redis/resp/reply" ) const ( ErrNodeNotFound = "node connection not found" ) func (c *Database) getPeerClient(peer string) (*client_.Client, error) { pool, ok := c.peerConnection[peer] if !ok { return nil, errors.New(ErrNodeNotFound) } object, err := pool.BorrowObject(context.Background()) if err != nil { return nil, err } client, ok := object.(*client_.Client) if !ok { return nil, errors.New("type mismatch") } return client, nil } func (c *Database) returnPeerClient(peer string, peerClient *client_.Client) error { pool, ok := c.peerConnection[peer] if !ok { return errors.New(ErrNodeNotFound) } return pool.ReturnObject(context.Background(), peerClient) } // relay 将命令转发给指定节点 func (c *Database) relay(peer string, conn resp.Connection, args [][]byte) resp.Reply { // 自己节点 if peer == c.self { return c.db.Exec(conn, args) } // 操作其它节点 peerClient, err := c.getPeerClient(peer) if err != nil { return reply.NewErrReply(err.Error()) } defer func() { // 归还连接 _ = c.returnPeerClient(peer, peerClient) }() selectCmd := utils.ToCmdLine("select", strconv.Itoa(conn.GetDBIndex())) _ = peerClient.Send(selectCmd) return peerClient.Send(args) } func (c *Database) broadcast(conn resp.Connection, args [][]byte) map[string]resp.Reply { results := make(map[string]resp.Reply) for _, node := range c.nodes { result := c.relay(node, conn, args) results[node] = result } return results }