| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- // Author: simon (ynwdlxm@163.com)
- // Date: 2025/10/20 14:51
- // Desc:
- package cluster
- import (
- "errors"
- "github.com/runningwater/go-redis/interface/resp"
- "github.com/runningwater/go-redis/resp/reply"
- )
- type CmdFunc func(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply
- var defaultFuncMap map[string]CmdFunc
- func init() {
- defaultFuncMap = map[string]CmdFunc{
- "get": defaultFunc,
- "getset": defaultFunc,
- "set": defaultFunc,
- "setnx": defaultFunc,
- "exists": defaultFunc,
- "del": delFunc,
- "type": defaultFunc,
- "ping": pingFunc,
- "select": selectFunc,
- "rename": renameFunc,
- "renamenx": renameFunc,
- "flushdb": flushdbFunc,
- }
- }
- // 默认命令处理函数,转发到对应的节点执行
- // GET key
- // SEt key value
- func defaultFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
- key := string(args[1])
- // 获取 key 所在的节点
- peer := cdb.peerPicker.PickNode(key)
- return cdb.relay(peer, conn, args)
- }
- // PING 命令处理函数,不转发,本节点执行
- func pingFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
- // 不能转发,只能执行
- return cdb.db.Exec(conn, args)
- }
- func selectFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
- return pingFunc(cdb, conn, args)
- }
- // rename k1 k2
- func renameFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
- if len(args) != 3 {
- return reply.NewErrReply("ERR wrong number of arguments for 'rename' command")
- }
- src := string(args[1])
- dest := string(args[2])
- peerSrc := cdb.peerPicker.PickNode(src)
- peerDest := cdb.peerPicker.PickNode(dest)
- if peerSrc != peerDest {
- return reply.NewErrReply("ERR rename command not supported in cluster mode")
- }
- return cdb.relay(peerSrc, conn, args)
- }
- func flushdbFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
- replies := cdb.broadcast(conn, args)
- var errReply resp.Reply
- for _, r := range replies {
- if reply.IsErrReply(r) {
- errors.As(r, &errReply)
- break
- }
- }
- if errReply == nil {
- return reply.NewOkReply()
- }
- return reply.NewErrReply("err: " + errReply.Error())
- }
- func delFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
- replies := cdb.broadcast(conn, args)
- var errReply resp.Reply
- var deleted int64 = 0
- for _, r := range replies {
- if reply.IsErrReply(r) {
- errors.As(r, &errReply)
- break
- }
- var intReply *reply.IntReply
- ok := errors.As(r, &intReply)
- if !ok {
- errReply = reply.NewErrReply("err: convert to IntReply not ok")
- }
- deleted += intReply.Code
- }
- if errReply == nil {
- return reply.NewIntReply(deleted)
- }
- return reply.NewErrReply("err: " + errReply.Error())
- }
|