| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- // Author: simon (ynwdlxm@163.com)
- // Date: 2025/10/20 11:12
- // Desc:
- package cluster
- import (
- "context"
- "strings"
- pool "github.com/jolestar/go-commons-pool/v2"
- "github.com/runningwater/go-redis/config"
- "github.com/runningwater/go-redis/database"
- idatabase "github.com/runningwater/go-redis/interface/database"
- "github.com/runningwater/go-redis/interface/resp"
- "github.com/runningwater/go-redis/lib/consistenthash"
- "github.com/runningwater/go-redis/lib/logger"
- "github.com/runningwater/go-redis/resp/reply"
- )
- // Database 集群数据库结构体
- // 集群数据库包含自身节点、集群节点、节点连接池、数据库实例等信息
- // 集群数据库实现了数据库接口,提供Exec和Close方法
- type Database struct {
- self string // 自身节点
- nodes []string // 集群节点
- peerPicker *consistenthash.NodeMap // 一致性哈希
- peerConnection map[string]*pool.ObjectPool // 节点连接池
- db idatabase.Database // 数据库
- }
- func NewClusterDatabase() *Database {
- cluster := &Database{
- self: config.Properties.Self,
- db: database.NewStandaloneDatabase(),
- peerPicker: consistenthash.NewNodeMap(nil),
- peerConnection: make(map[string]*pool.ObjectPool),
- }
- nodes := make([]string, 0, len(config.Properties.Peers)+1)
- for _, peer := range config.Properties.Peers {
- nodes = append(nodes, peer)
- }
- nodes = append(nodes, config.Properties.Self)
- cluster.peerPicker.AddNode(nodes...)
- cluster.nodes = nodes
- ctx := context.Background()
- for _, peer := range config.Properties.Peers {
- cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{
- Peer: peer,
- })
- }
- return cluster
- }
- func (c *Database) Exec(client resp.Connection, args [][]byte) (result resp.Reply) {
- defer func() {
- if err := recover(); err != nil {
- // 忽略错误
- logger.Error(err)
- result = reply.NewUnknownErrReply("recover err")
- }
- }()
- cmdName := strings.ToLower(string(args[0]))
- cmdFunc, ok := defaultFuncMap[cmdName]
- if !ok {
- return reply.NewErrReply("unsupported command")
- }
- result = cmdFunc(c, client, args)
- return
- }
- func (c *Database) Close() {
- c.db.Close()
- }
- func (c *Database) AfterClientClose() {
- c.db.AfterClientClose()
- }
|