router.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/10/20 14:51
  3. // Desc:
  4. package cluster
  5. import (
  6. "errors"
  7. "github.com/runningwater/go-redis/interface/resp"
  8. "github.com/runningwater/go-redis/resp/reply"
  9. )
  10. type CmdFunc func(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply
  11. var defaultFuncMap map[string]CmdFunc
  12. func init() {
  13. defaultFuncMap = map[string]CmdFunc{
  14. "get": defaultFunc,
  15. "getset": defaultFunc,
  16. "set": defaultFunc,
  17. "setnx": defaultFunc,
  18. "exists": defaultFunc,
  19. "del": delFunc,
  20. "type": defaultFunc,
  21. "ping": pingFunc,
  22. "select": selectFunc,
  23. "rename": renameFunc,
  24. "renamenx": renameFunc,
  25. "flushdb": flushdbFunc,
  26. }
  27. }
  28. // 默认命令处理函数,转发到对应的节点执行
  29. // GET key
  30. // SEt key value
  31. func defaultFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
  32. key := string(args[1])
  33. // 获取 key 所在的节点
  34. peer := cdb.peerPicker.PickNode(key)
  35. return cdb.relay(peer, conn, args)
  36. }
  37. // PING 命令处理函数,不转发,本节点执行
  38. func pingFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
  39. // 不能转发,只能执行
  40. return cdb.db.Exec(conn, args)
  41. }
  42. func selectFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
  43. return pingFunc(cdb, conn, args)
  44. }
  45. // rename k1 k2
  46. func renameFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
  47. if len(args) != 3 {
  48. return reply.NewErrReply("ERR wrong number of arguments for 'rename' command")
  49. }
  50. src := string(args[1])
  51. dest := string(args[2])
  52. peerSrc := cdb.peerPicker.PickNode(src)
  53. peerDest := cdb.peerPicker.PickNode(dest)
  54. if peerSrc != peerDest {
  55. return reply.NewErrReply("ERR rename command not supported in cluster mode")
  56. }
  57. return cdb.relay(peerSrc, conn, args)
  58. }
  59. func flushdbFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
  60. replies := cdb.broadcast(conn, args)
  61. var errReply resp.Reply
  62. for _, r := range replies {
  63. if reply.IsErrReply(r) {
  64. errors.As(r, &errReply)
  65. break
  66. }
  67. }
  68. if errReply == nil {
  69. return reply.NewOkReply()
  70. }
  71. return reply.NewErrReply("err: " + errReply.Error())
  72. }
  73. func delFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
  74. replies := cdb.broadcast(conn, args)
  75. var errReply resp.Reply
  76. var deleted int64 = 0
  77. for _, r := range replies {
  78. if reply.IsErrReply(r) {
  79. errors.As(r, &errReply)
  80. break
  81. }
  82. var intReply *reply.IntReply
  83. ok := errors.As(r, &intReply)
  84. if !ok {
  85. errReply = reply.NewErrReply("err: convert to IntReply not ok")
  86. }
  87. deleted += intReply.Code
  88. }
  89. if errReply == nil {
  90. return reply.NewIntReply(deleted)
  91. }
  92. return reply.NewErrReply("err: " + errReply.Error())
  93. }