cluster_database.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/10/20 11:12
  3. // Desc:
  4. package cluster
  5. import (
  6. "context"
  7. "strings"
  8. pool "github.com/jolestar/go-commons-pool/v2"
  9. "github.com/runningwater/go-redis/config"
  10. "github.com/runningwater/go-redis/database"
  11. idatabase "github.com/runningwater/go-redis/interface/database"
  12. "github.com/runningwater/go-redis/interface/resp"
  13. "github.com/runningwater/go-redis/lib/consistenthash"
  14. "github.com/runningwater/go-redis/lib/logger"
  15. "github.com/runningwater/go-redis/resp/reply"
  16. )
  17. // Database 集群数据库结构体
  18. // 集群数据库包含自身节点、集群节点、节点连接池、数据库实例等信息
  19. // 集群数据库实现了数据库接口,提供Exec和Close方法
  20. type Database struct {
  21. self string // 自身节点
  22. nodes []string // 集群节点
  23. peerPicker *consistenthash.NodeMap // 一致性哈希
  24. peerConnection map[string]*pool.ObjectPool // 节点连接池
  25. db idatabase.Database // 数据库
  26. }
  27. func NewClusterDatabase() *Database {
  28. cluster := &Database{
  29. self: config.Properties.Self,
  30. db: database.NewStandaloneDatabase(),
  31. peerPicker: consistenthash.NewNodeMap(nil),
  32. peerConnection: make(map[string]*pool.ObjectPool),
  33. }
  34. nodes := make([]string, 0, len(config.Properties.Peers)+1)
  35. for _, peer := range config.Properties.Peers {
  36. nodes = append(nodes, peer)
  37. }
  38. nodes = append(nodes, config.Properties.Self)
  39. cluster.peerPicker.AddNode(nodes...)
  40. cluster.nodes = nodes
  41. ctx := context.Background()
  42. for _, peer := range config.Properties.Peers {
  43. cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{
  44. Peer: peer,
  45. })
  46. }
  47. return cluster
  48. }
  49. func (c *Database) Exec(client resp.Connection, args [][]byte) (result resp.Reply) {
  50. defer func() {
  51. if err := recover(); err != nil {
  52. // 忽略错误
  53. logger.Error(err)
  54. result = reply.NewUnknownErrReply("recover err")
  55. }
  56. }()
  57. cmdName := strings.ToLower(string(args[0]))
  58. cmdFunc, ok := defaultFuncMap[cmdName]
  59. if !ok {
  60. return reply.NewErrReply("unsupported command")
  61. }
  62. result = cmdFunc(c, client, args)
  63. return
  64. }
  65. func (c *Database) Close() {
  66. c.db.Close()
  67. }
  68. func (c *Database) AfterClientClose() {
  69. c.db.AfterClientClose()
  70. }