// Author: simon (ynwdlxm@163.com) // Date: 2025/10/20 11:12 // Desc: package cluster import ( "context" "strings" pool "github.com/jolestar/go-commons-pool/v2" "github.com/runningwater/go-redis/config" "github.com/runningwater/go-redis/database" idatabase "github.com/runningwater/go-redis/interface/database" "github.com/runningwater/go-redis/interface/resp" "github.com/runningwater/go-redis/lib/consistenthash" "github.com/runningwater/go-redis/lib/logger" "github.com/runningwater/go-redis/resp/reply" ) // Database 集群数据库结构体 // 集群数据库包含自身节点、集群节点、节点连接池、数据库实例等信息 // 集群数据库实现了数据库接口,提供Exec和Close方法 type Database struct { self string // 自身节点 nodes []string // 集群节点 peerPicker *consistenthash.NodeMap // 一致性哈希 peerConnection map[string]*pool.ObjectPool // 节点连接池 db idatabase.Database // 数据库 } func NewClusterDatabase() *Database { cluster := &Database{ self: config.Properties.Self, db: database.NewStandaloneDatabase(), peerPicker: consistenthash.NewNodeMap(nil), peerConnection: make(map[string]*pool.ObjectPool), } nodes := make([]string, 0, len(config.Properties.Peers)+1) for _, peer := range config.Properties.Peers { nodes = append(nodes, peer) } nodes = append(nodes, config.Properties.Self) cluster.peerPicker.AddNode(nodes...) cluster.nodes = nodes ctx := context.Background() for _, peer := range config.Properties.Peers { cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{ Peer: peer, }) } return cluster } func (c *Database) Exec(client resp.Connection, args [][]byte) (result resp.Reply) { defer func() { if err := recover(); err != nil { // 忽略错误 logger.Error(err) result = reply.NewUnknownErrReply("recover err") } }() cmdName := strings.ToLower(string(args[0])) cmdFunc, ok := defaultFuncMap[cmdName] if !ok { return reply.NewErrReply("unsupported command") } result = cmdFunc(c, client, args) return } func (c *Database) Close() { c.db.Close() } func (c *Database) AfterClientClose() { c.db.AfterClientClose() }