// Author: simon (ynwdlxm@163.com) // Date: 2025/10/20 14:51 // Desc: package cluster import ( "errors" "github.com/runningwater/go-redis/interface/resp" "github.com/runningwater/go-redis/resp/reply" ) type CmdFunc func(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply var defaultFuncMap map[string]CmdFunc func init() { defaultFuncMap = map[string]CmdFunc{ "get": defaultFunc, "getset": defaultFunc, "set": defaultFunc, "setnx": defaultFunc, "exists": defaultFunc, "del": delFunc, "type": defaultFunc, "ping": pingFunc, "select": selectFunc, "rename": renameFunc, "renamenx": renameFunc, "flushdb": flushdbFunc, } } // 默认命令处理函数,转发到对应的节点执行 // GET key // SEt key value func defaultFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply { key := string(args[1]) // 获取 key 所在的节点 peer := cdb.peerPicker.PickNode(key) return cdb.relay(peer, conn, args) } // PING 命令处理函数,不转发,本节点执行 func pingFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply { // 不能转发,只能执行 return cdb.db.Exec(conn, args) } func selectFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply { return pingFunc(cdb, conn, args) } // rename k1 k2 func renameFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply { if len(args) != 3 { return reply.NewErrReply("ERR wrong number of arguments for 'rename' command") } src := string(args[1]) dest := string(args[2]) peerSrc := cdb.peerPicker.PickNode(src) peerDest := cdb.peerPicker.PickNode(dest) if peerSrc != peerDest { return reply.NewErrReply("ERR rename command not supported in cluster mode") } return cdb.relay(peerSrc, conn, args) } func flushdbFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply { replies := cdb.broadcast(conn, args) var errReply resp.Reply for _, r := range replies { if reply.IsErrReply(r) { errors.As(r, &errReply) break } } if errReply == nil { return reply.NewOkReply() } return reply.NewErrReply("err: " + errReply.Error()) } func delFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply { replies := cdb.broadcast(conn, args) var errReply resp.Reply var deleted int64 = 0 for _, r := range replies { if reply.IsErrReply(r) { errors.As(r, &errReply) break } var intReply *reply.IntReply ok := errors.As(r, &intReply) if !ok { errReply = reply.NewErrReply("err: convert to IntReply not ok") } deleted += intReply.Code } if errReply == nil { return reply.NewIntReply(deleted) } return reply.NewErrReply("err: " + errReply.Error()) }