| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- // Author: simon (ynwdlxm@163.com)
- // Date: 2025/10/20 14:13
- // Desc:
- package cluster
- import (
- "context"
- "errors"
- "strconv"
- "github.com/runningwater/go-redis/interface/resp"
- "github.com/runningwater/go-redis/lib/utils"
- client_ "github.com/runningwater/go-redis/resp/client"
- "github.com/runningwater/go-redis/resp/reply"
- )
- const (
- ErrNodeNotFound = "node connection not found"
- )
- func (c *Database) getPeerClient(peer string) (*client_.Client, error) {
- pool, ok := c.peerConnection[peer]
- if !ok {
- return nil, errors.New(ErrNodeNotFound)
- }
- object, err := pool.BorrowObject(context.Background())
- if err != nil {
- return nil, err
- }
- client, ok := object.(*client_.Client)
- if !ok {
- return nil, errors.New("type mismatch")
- }
- return client, nil
- }
- func (c *Database) returnPeerClient(peer string, peerClient *client_.Client) error {
- pool, ok := c.peerConnection[peer]
- if !ok {
- return errors.New(ErrNodeNotFound)
- }
- return pool.ReturnObject(context.Background(), peerClient)
- }
- // relay 将命令转发给指定节点
- func (c *Database) relay(peer string, conn resp.Connection, args [][]byte) resp.Reply {
- // 自己节点
- if peer == c.self {
- return c.db.Exec(conn, args)
- }
- // 操作其它节点
- peerClient, err := c.getPeerClient(peer)
- if err != nil {
- return reply.NewErrReply(err.Error())
- }
- defer func() {
- // 归还连接
- _ = c.returnPeerClient(peer, peerClient)
- }()
- selectCmd := utils.ToCmdLine("select", strconv.Itoa(conn.GetDBIndex()))
- _ = peerClient.Send(selectCmd)
- return peerClient.Send(args)
- }
- func (c *Database) broadcast(conn resp.Connection, args [][]byte) map[string]resp.Reply {
- results := make(map[string]resp.Reply)
- for _, node := range c.nodes {
- result := c.relay(node, conn, args)
- results[node] = result
- }
- return results
- }
|