Browse Source

feat: 集群功能

runningwater 2 tháng trước cách đây
mục cha
commit
686cf5274a

+ 50 - 0
cluster/client_pool.go

@@ -0,0 +1,50 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/10/20 11:32
+// Desc: 客户端连接工厂
+
+package cluster
+
+import (
+	"context"
+	"errors"
+
+	pool "github.com/jolestar/go-commons-pool/v2"
+
+	client_ "github.com/runningwater/go-redis/resp/client"
+)
+
+type connectionFactory struct {
+	Peer string // 节点地址
+}
+
+func (c connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
+	client, err := client_.NewClient(c.Peer)
+	if err != nil {
+		return nil, err
+	}
+
+	client.Start()
+
+	return pool.NewPooledObject(client), nil
+}
+
+func (c connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
+	client, ok := object.Object.(*client_.Client)
+	if !ok {
+		return errors.New("type mismatch")
+	}
+	client.Stop()
+	return nil
+}
+
+func (c connectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
+	return true
+}
+
+func (c connectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
+	return nil
+}
+
+func (c connectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
+	return nil
+}

+ 85 - 0
cluster/cluster_database.go

@@ -0,0 +1,85 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/10/20 11:12
+// Desc:
+
+package cluster
+
+import (
+	"context"
+	"strings"
+
+	pool "github.com/jolestar/go-commons-pool/v2"
+
+	"github.com/runningwater/go-redis/config"
+	"github.com/runningwater/go-redis/database"
+	idatabase "github.com/runningwater/go-redis/interface/database"
+	"github.com/runningwater/go-redis/interface/resp"
+	"github.com/runningwater/go-redis/lib/consistenthash"
+	"github.com/runningwater/go-redis/lib/logger"
+	"github.com/runningwater/go-redis/resp/reply"
+)
+
+// Database 集群数据库结构体
+// 集群数据库包含自身节点、集群节点、节点连接池、数据库实例等信息
+// 集群数据库实现了数据库接口,提供Exec和Close方法
+type Database struct {
+	self           string                      // 自身节点
+	nodes          []string                    // 集群节点
+	peerPicker     *consistenthash.NodeMap     // 一致性哈希
+	peerConnection map[string]*pool.ObjectPool // 节点连接池
+	db             idatabase.Database          // 数据库
+}
+
+func NewClusterDatabase() *Database {
+	cluster := &Database{
+		self:           config.Properties.Self,
+		db:             database.NewStandaloneDatabase(),
+		peerPicker:     consistenthash.NewNodeMap(nil),
+		peerConnection: make(map[string]*pool.ObjectPool),
+	}
+
+	nodes := make([]string, 0, len(config.Properties.Peers)+1)
+	for _, peer := range config.Properties.Peers {
+		nodes = append(nodes, peer)
+	}
+	nodes = append(nodes, config.Properties.Self)
+	cluster.peerPicker.AddNode(nodes...)
+	cluster.nodes = nodes
+
+	ctx := context.Background()
+	for _, peer := range config.Properties.Peers {
+
+		cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{
+			Peer: peer,
+		})
+	}
+
+	return cluster
+}
+
+func (c *Database) Exec(client resp.Connection, args [][]byte) (result resp.Reply) {
+	defer func() {
+		if err := recover(); err != nil {
+			// 忽略错误
+			logger.Error(err)
+			result = reply.NewUnknownErrReply("recover err")
+		}
+	}()
+
+	cmdName := strings.ToLower(string(args[0]))
+	cmdFunc, ok := defaultFuncMap[cmdName]
+	if !ok {
+		return reply.NewErrReply("unsupported command")
+	}
+	result = cmdFunc(c, client, args)
+
+	return
+}
+
+func (c *Database) Close() {
+	c.db.Close()
+}
+
+func (c *Database) AfterClientClose(client resp.Connection) {
+	c.db.AfterClientClose(client)
+}

+ 74 - 0
cluster/communication.go

@@ -0,0 +1,74 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/10/20 14:13
+// Desc:
+
+package cluster
+
+import (
+	"context"
+	"errors"
+	"strconv"
+
+	"github.com/runningwater/go-redis/interface/resp"
+	"github.com/runningwater/go-redis/lib/utils"
+	client_ "github.com/runningwater/go-redis/resp/client"
+	"github.com/runningwater/go-redis/resp/reply"
+)
+
+const (
+	ErrNodeNotFound = "node connection not found"
+)
+
+func (c *Database) getPeerClient(peer string) (*client_.Client, error) {
+	pool, ok := c.peerConnection[peer]
+	if !ok {
+		return nil, errors.New(ErrNodeNotFound)
+	}
+	object, err := pool.BorrowObject(context.Background())
+	if err != nil {
+		return nil, err
+	}
+	client, ok := object.(*client_.Client)
+	if !ok {
+		return nil, errors.New("type mismatch")
+	}
+	return client, nil
+}
+func (c *Database) returnPeerClient(peer string, peerClient *client_.Client) error {
+	pool, ok := c.peerConnection[peer]
+	if !ok {
+		return errors.New(ErrNodeNotFound)
+	}
+	return pool.ReturnObject(context.Background(), peerClient)
+}
+
+// relay 将命令转发给指定节点
+func (c *Database) relay(peer string, conn resp.Connection, args [][]byte) resp.Reply {
+	// 自己节点
+	if peer == c.self {
+		return c.db.Exec(conn, args)
+	}
+	// 操作其它节点
+	peerClient, err := c.getPeerClient(peer)
+	if err != nil {
+		return reply.NewErrReply(err.Error())
+	}
+	defer func() {
+		// 归还连接
+		_ = c.returnPeerClient(peer, peerClient)
+	}()
+
+	selectCmd := utils.ToCmdLine("select", strconv.Itoa(conn.GetDBIndex()))
+	_ = peerClient.Send(selectCmd)
+
+	return peerClient.Send(args)
+}
+
+func (c *Database) broadcast(conn resp.Connection, args [][]byte) map[string]resp.Reply {
+	results := make(map[string]resp.Reply)
+	for _, node := range c.nodes {
+		result := c.relay(node, conn, args)
+		results[node] = result
+	}
+	return results
+}

+ 110 - 0
cluster/router.go

@@ -0,0 +1,110 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/10/20 14:51
+// Desc:
+
+package cluster
+
+import (
+	"errors"
+
+	"github.com/runningwater/go-redis/interface/resp"
+	"github.com/runningwater/go-redis/resp/reply"
+)
+
+type CmdFunc func(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply
+
+var defaultFuncMap map[string]CmdFunc
+
+func init() {
+	defaultFuncMap = map[string]CmdFunc{
+		"get":      defaultFunc,
+		"getset":   defaultFunc,
+		"set":      defaultFunc,
+		"setnx":    defaultFunc,
+		"exists":   defaultFunc,
+		"del":      delFunc,
+		"type":     defaultFunc,
+		"ping":     pingFunc,
+		"select":   selectFunc,
+		"rename":   renameFunc,
+		"renamenx": renameFunc,
+		"flushdb":  flushdbFunc,
+	}
+}
+
+// 默认命令处理函数,转发到对应的节点执行
+// GET key
+// SEt key value
+func defaultFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
+	key := string(args[1])
+	// 获取 key 所在的节点
+	peer := cdb.peerPicker.PickNode(key)
+
+	return cdb.relay(peer, conn, args)
+}
+
+// PING 命令处理函数,不转发,本节点执行
+func pingFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
+	// 不能转发,只能执行
+	return cdb.db.Exec(conn, args)
+}
+func selectFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
+	return pingFunc(cdb, conn, args)
+}
+
+// rename k1 k2
+func renameFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
+	if len(args) != 3 {
+		return reply.NewErrReply("ERR wrong number of arguments for 'rename' command")
+	}
+	src := string(args[1])
+	dest := string(args[2])
+
+	peerSrc := cdb.peerPicker.PickNode(src)
+	peerDest := cdb.peerPicker.PickNode(dest)
+
+	if peerSrc != peerDest {
+		return reply.NewErrReply("ERR rename command not supported in cluster mode")
+	}
+
+	return cdb.relay(peerSrc, conn, args)
+}
+
+func flushdbFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
+	replies := cdb.broadcast(conn, args)
+	var errReply resp.Reply
+	for _, r := range replies {
+		if reply.IsErrReply(r) {
+			errors.As(r, &errReply)
+			break
+		}
+	}
+	if errReply == nil {
+		return reply.NewOkReply()
+	}
+
+	return reply.NewErrReply("err: " + errReply.Error())
+}
+
+func delFunc(cdb *Database, conn resp.Connection, args [][]byte) resp.Reply {
+	replies := cdb.broadcast(conn, args)
+	var errReply resp.Reply
+	var deleted int64 = 0
+	for _, r := range replies {
+		if reply.IsErrReply(r) {
+			errors.As(r, &errReply)
+			break
+		}
+		var intReply *reply.IntReply
+		ok := errors.As(r, &intReply)
+		if !ok {
+			errReply = reply.NewErrReply("err:  convert to IntReply not ok")
+		}
+		deleted += intReply.Code
+	}
+	if errReply == nil {
+		return reply.NewIntReply(deleted)
+	}
+
+	return reply.NewErrReply("err: " + errReply.Error())
+}

database/generic.go → database/cmd_generic.go


database/ping.go → database/cmd_ping.go


database/string.go → database/cmd_string.go


+ 1 - 1
database/db.go

@@ -13,7 +13,7 @@ import (
 	"github.com/runningwater/go-redis/resp/reply"
 )
 
-// DB 表示一个 Redis 数据库实例(Database 里面的 dbSet)
+// DB 表示一个 Redis 数据库实例(StandaloneDatabase 里面的 dbSet)
 type DB struct {
 	index  int           // 数据库索引
 	data   dict.Dict     // 存储键值对的数据结构

+ 0 - 110
database/resp_database.go

@@ -1,110 +0,0 @@
-// Author: simon (ynwdlxm@163.com)
-// Date: 2025/9/25 11:27
-// Desc: 处理 RESP-SPEC 命令
-
-package database
-
-import (
-	"strconv"
-	"strings"
-
-	"github.com/runningwater/go-redis/aof"
-	"github.com/runningwater/go-redis/config"
-	"github.com/runningwater/go-redis/interface/resp"
-	"github.com/runningwater/go-redis/lib/logger"
-	"github.com/runningwater/go-redis/resp/reply"
-)
-
-type Database struct {
-	dbSet      []*DB
-	aofHandler *aof.Handler
-}
-
-func NewDatabase() *Database {
-	if config.Properties.Databases == 0 {
-		config.Properties.Databases = 16
-	}
-	database := &Database{}
-	database.dbSet = make([]*DB, config.Properties.Databases)
-	// 初始化数据库集合
-	for i := range database.dbSet {
-		db := NewDB()
-		db.index = i
-		database.dbSet[i] = db
-	}
-
-	if config.Properties.AppendOnly {
-		aofHandler, err := aof.NewAofHandler(database)
-		if err != nil {
-			panic(err)
-		}
-		database.aofHandler = aofHandler
-	}
-
-	for _, db := range database.dbSet {
-		d := db
-		d.addAof = func(line CmdLine) {
-			database.aofHandler.AddAof(d.index, line)
-		}
-	}
-	return database
-}
-
-func (d *Database) Exec(client resp.Connection, args [][]byte) resp.Reply {
-	defer func() {
-		if err := recover(); err != nil {
-			logger.Error(err)
-		}
-	}()
-
-	cmdName := strings.ToLower(string(args[0]))
-	if cmdName == "select" {
-		if len(args) != 2 {
-			return reply.NewErrReply("ERR wrong number of arguments for 'select' command")
-		}
-		return execSelect(client, d, args[1:])
-	} else if cmdName == "command" {
-		return reply.NewOkReply()
-	}
-
-	index := client.GetDBIndex()
-	return d.dbSet[index].Exec(client, args)
-}
-
-func (d *Database) Close() {
-	// TODO implement me
-
-}
-
-func (d *Database) AfterClientClose(client resp.Connection) {
-	// TODO implement me
-
-}
-
-// execSelect 处理数据库选择命令
-//
-//	参数:
-//
-//			c: 客户端连接对象,用于执行数据库切换操作
-//			database: 数据库实例,包含所有可用的数据库集合
-//			args: 命令参数,第一个参数为要选择的数据库索引
-//
-//	 返回值:
-//
-//		resp.Reply: 执行结果回复,成功返回OK,失败返回错误信息
-func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply {
-	// 解析数据库索引参数
-	dbIndex, err := strconv.Atoi(string(args[0]))
-	if err != nil {
-		return reply.NewErrReply("ERR invalid DB index")
-	}
-
-	// 验证数据库索引范围
-	if dbIndex >= len(database.dbSet) {
-		return reply.NewErrReply("ERR DB index is out of range")
-	}
-
-	// 执行数据库切换操作
-	c.SelectDB(dbIndex)
-	return reply.NewOkReply()
-}

+ 143 - 0
database/standalone_database.go

@@ -0,0 +1,143 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/9/25 11:27
+// Desc: 处理 RESP-SPEC 命令
+
+package database
+
+import (
+	"strconv"
+	"strings"
+
+	"github.com/runningwater/go-redis/aof"
+	"github.com/runningwater/go-redis/config"
+	"github.com/runningwater/go-redis/interface/resp"
+	"github.com/runningwater/go-redis/lib/logger"
+	"github.com/runningwater/go-redis/resp/reply"
+)
+
+// StandaloneDatabase 表示独立运行的Redis数据库实例
+//
+//	包含多个数据库以及AOF持久化处理器
+type StandaloneDatabase struct {
+	dbSet      []*DB        // 数据库集合,每个元素代表一个独立的数据库
+	aofHandler *aof.Handler // AOF持久化处理器,负责处理AOF相关操作
+}
+
+// NewStandaloneDatabase 创建并初始化一个新的独立数据库实例
+//
+//	根据配置决定数据库数量和是否启用AOF持久化
+func NewStandaloneDatabase() *StandaloneDatabase {
+	// 如果未设置数据库数量,默认创建16个数据库
+	if config.Properties.Databases == 0 {
+		config.Properties.Databases = 16
+	}
+
+	// 创建数据库实例
+	database := &StandaloneDatabase{}
+
+	// 初始化数据库集合
+	database.dbSet = make([]*DB, config.Properties.Databases)
+
+	// 为每个数据库索引创建对应的DB实例
+	for i := range database.dbSet {
+		db := NewDB()
+		db.index = i
+		database.dbSet[i] = db
+	}
+
+	// 如果启用了AOF持久化,则初始化AOF处理器
+	if config.Properties.AppendOnly {
+		aofHandler, err := aof.NewAofHandler(database)
+		if err != nil {
+			panic(err)
+		}
+		database.aofHandler = aofHandler
+	}
+
+	// 为每个数据库设置AOF记录函数
+	for _, db := range database.dbSet {
+		d := db
+		// addAof 函数用于向AOF日志中添加命令记录
+		d.addAof = func(line CmdLine) {
+			database.aofHandler.AddAof(d.index, line)
+		}
+	}
+
+	return database
+}
+
+// Exec 执行客户端发送的命令
+//
+//	client: 客户端连接对象
+//	args: 命令参数列表,第一个参数为命令名称
+//	返回命令执行结果
+func (d *StandaloneDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
+	// 使用defer捕获可能发生的panic,防止程序崩溃
+	defer func() {
+		if err := recover(); err != nil {
+			logger.Error(err)
+		}
+	}()
+
+	// 获取命令名称并转换为小写
+	cmdName := strings.ToLower(string(args[0]))
+
+	// 处理SELECT命令 - 切换数据库
+	if cmdName == "select" {
+		// 检查参数数量是否正确
+		if len(args) != 2 {
+			return reply.NewErrReply("ERR wrong number of arguments for 'select' command")
+		}
+		// 执行数据库选择操作
+		return execSelect(client, d, args[1:])
+	} else if cmdName == "command" {
+		// COMMAND命令返回OK
+		return reply.NewOkReply()
+	}
+
+	// 获取客户端当前选择的数据库索引
+	index := client.GetDBIndex()
+
+	// 在对应的数据库中执行命令
+	return d.dbSet[index].Exec(client, args)
+}
+
+// Close 关闭数据库连接
+// TODO 实现数据库关闭逻辑
+func (d *StandaloneDatabase) Close() {
+	// TODO implement me
+}
+
+// AfterClientClose 处理客户端连接关闭后的清理工作
+// TODO 实现客户端关闭后的处理逻辑
+func (d *StandaloneDatabase) AfterClientClose(client resp.Connection) {
+	// TODO implement me
+}
+
+// execSelect 处理数据库选择命令
+//
+// 参数:
+//
+//	c: 客户端连接对象,用于执行数据库切换操作
+//	database: 数据库实例,包含所有可用的数据库集合
+//	args: 命令参数,第一个参数为要选择的数据库索引
+//
+// 返回值:
+//
+//	resp.Reply: 执行结果回复,成功返回OK,失败返回错误信息
+func execSelect(c resp.Connection, database *StandaloneDatabase, args [][]byte) resp.Reply {
+	// 解析数据库索引参数
+	dbIndex, err := strconv.Atoi(string(args[0]))
+	if err != nil {
+		return reply.NewErrReply("ERR invalid DB index")
+	}
+
+	// 验证数据库索引范围是否有效
+	if dbIndex >= len(database.dbSet) {
+		return reply.NewErrReply("ERR DB index is out of range")
+	}
+
+	// 执行数据库切换操作
+	c.SelectDB(dbIndex)
+	return reply.NewOkReply()
+}

+ 11 - 0
go.mod

@@ -1,3 +1,14 @@
 module github.com/runningwater/go-redis
 
 go 1.24.2
+
+require (
+	github.com/jolestar/go-commons-pool/v2 v2.1.2
+	github.com/stretchr/testify v1.11.1
+)
+
+require (
+	github.com/davecgh/go-spew v1.1.1 // indirect
+	github.com/pmezard/go-difflib v1.0.0 // indirect
+	gopkg.in/yaml.v3 v3.0.1 // indirect
+)

+ 18 - 0
go.sum

@@ -0,0 +1,18 @@
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
+github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
+github.com/jolestar/go-commons-pool/v2 v2.1.2 h1:E+XGo58F23t7HtZiC/W6jzO2Ux2IccSH/yx4nD+J1CM=
+github.com/jolestar/go-commons-pool/v2 v2.1.2/go.mod h1:r4NYccrkS5UqP1YQI1COyTZ9UjPJAAGTUxzcsK1kqhY=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
+github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 2 - 0
interface/resp/reply.go

@@ -10,4 +10,6 @@ type Reply interface {
 	ToBytes() []byte
 	// String 返回响应对象的字符串表示
 	String() string
+	// Error 返回响应对象的错误信息
+	Error() string
 }

+ 77 - 0
lib/consistenthash/consistenthash.go

@@ -0,0 +1,77 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/10/17 10:50
+// Desc: 一致性哈希算法
+
+package consistenthash
+
+import (
+	"hash/crc32"
+	"sort"
+)
+
+// HashFunc 定义哈希函数类型,接收字节切片并返回32位无符号整数
+type HashFunc func(data []byte) uint32
+
+// NodeMap 一致性哈希节点映射结构体
+// 用于维护虚拟节点到真实节点的映射关系
+type NodeMap struct {
+	hashFunc HashFunc       // 哈希函数
+	nodeHash []int          // 节点哈希值数组
+	nodeMap  map[int]string // 哈希值到节点名称的映射
+}
+
+// NewNodeMap 创建新的NodeMap实例
+// 如果传入的哈希函数为空,则使用默认的crc32.ChecksumIEEE函数
+func NewNodeMap(fn HashFunc) *NodeMap {
+	m := &NodeMap{
+		hashFunc: fn,
+		nodeMap:  make(map[int]string),
+	}
+	if m.hashFunc == nil {
+		m.hashFunc = crc32.ChecksumIEEE
+	}
+	return m
+}
+
+// IsEmpty 判断节点映射是否为空
+// 返回true表示没有节点,false表示有节点
+func (m *NodeMap) IsEmpty() bool {
+	return len(m.nodeHash) == 0
+}
+
+// AddNode 添加节点到一致性哈希环中
+// 支持同时添加多个节点,会为每个节点计算哈希值并维护映射关系
+// 空节点会被忽略
+func (m *NodeMap) AddNode(keys ...string) {
+	for _, key := range keys {
+		if len(key) == 0 {
+			// key is empty
+			continue
+		}
+		hash := int(m.hashFunc([]byte(key)))
+		m.nodeHash = append(m.nodeHash, hash)
+		m.nodeMap[hash] = key
+	}
+	// 排序节点哈希值,保证一致性哈希环的有序性
+	sort.Ints(m.nodeHash)
+}
+
+// PickNode 根据键值选择对应的节点
+// 通过计算键值的哈希值,在一致性哈希环上找到对应的节点
+// 如果哈希环为空则返回空字符串
+func (m *NodeMap) PickNode(key string) string {
+	if m.IsEmpty() {
+		return ""
+	}
+	hash := int(m.hashFunc([]byte(key)))
+
+	// 使用二分查找找到第一个大于等于hash值的位置
+	idx := sort.Search(len(m.nodeHash), func(i int) bool {
+		return m.nodeHash[i] >= hash
+	})
+	// 如果找不到合适的节点,则选择第一个节点(环状结构)
+	if idx == len(m.nodeHash) {
+		idx = 0
+	}
+	return m.nodeMap[m.nodeHash[idx]]
+}

BIN
main


+ 4 - 0
main.go

@@ -2,6 +2,8 @@
 // Date: 2025/5/26 11:33
 // Desc:
 
+//go:build !debug
+
 package main
 
 import (
@@ -14,6 +16,8 @@ import (
 	"github.com/runningwater/go-redis/tcp"
 )
 
+const debug = true
+
 const configFile string = "redis.conf"
 
 var defaultProperties = &config.ServerProperties{

+ 5 - 1
redis.conf

@@ -6,4 +6,8 @@ databases 16
 
 
 appendonly yes
-appendfilename aof.data
+appendfilename aof.data
+
+# cluster
+self  127.0.0.1:6379
+peers 127.0.0.1:6380

+ 410 - 0
resp/client/client.go

@@ -0,0 +1,410 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/10/17 14:53
+// Desc: 客户端
+
+package client_
+
+import (
+	"errors"
+	"net"
+	"runtime/debug"
+	"sync"
+	"time"
+
+	"github.com/runningwater/go-redis/interface/resp"
+	"github.com/runningwater/go-redis/lib/logger"
+	"github.com/runningwater/go-redis/lib/sync/wait"
+	"github.com/runningwater/go-redis/resp/parser"
+	"github.com/runningwater/go-redis/resp/reply"
+)
+
+// Client 表示一个 Redis 客户端,用于与 Redis 服务器通信
+type Client struct {
+	conn        net.Conn      // 与 Redis 服务器的网络连接
+	pendingReqs chan *request // 存放待发送的请求队列
+	waitingReqs chan *request // 存放已发送但等待响应的请求队列
+	ticker      *time.Ticker  // 心跳定时器
+	addr        string        // Redis 服务器地址
+
+	working *sync.WaitGroup // 同步机制,跟踪正在进行的请求
+	mu      sync.RWMutex    // 保护连接的读写锁
+	closed  bool            // 客户端是否已关闭
+}
+
+// NewClient 创建一个新的 Redis 客户端实例
+// addr: Redis 服务器地址(例如 "localhost:6379")
+// 返回创建的客户端实例和可能的错误
+func NewClient(addr string) (*Client, error) {
+	// 建立与 Redis 服务器的 TCP 连接
+	dial, err := net.Dial("tcp", addr)
+	if err != nil {
+		return nil, err
+	}
+
+	// 初始化客户端结构体
+	return &Client{
+		conn:        dial,
+		pendingReqs: make(chan *request, chanSize), // 设置缓冲区大小为 chanSize
+		waitingReqs: make(chan *request, chanSize),
+		addr:        addr,
+		working:     &sync.WaitGroup{},
+		closed:      false,
+	}, nil
+}
+
+// Start 启动客户端,开始处理请求和心跳
+func (c *Client) Start() {
+
+	c.mu.RLock()
+	if c.closed {
+		c.mu.RUnlock()
+		return
+	}
+	c.mu.RUnlock()
+
+	// 创建每10秒触发一次的心跳定时器
+	c.ticker = time.NewTicker(10 * time.Second)
+
+	// 启动写入协程,处理发送请求
+	go c.handleWrite()
+
+	// 启动读取协程,处理服务器响应
+	go func() {
+		err := c.handleRead()
+		if err != nil {
+			logger.Error("handle read error: ", err)
+		}
+	}()
+
+	// 启动心跳协程
+	go c.heartbeat()
+}
+
+// Stop 停止客户端,释放资源
+func (c *Client) Stop() {
+
+	c.mu.Lock()
+	if c.closed {
+		c.mu.Unlock()
+		return
+	}
+	c.closed = true
+	c.mu.Unlock()
+
+	// 停止心跳定时器
+	if c.ticker != nil {
+		c.ticker.Stop()
+	}
+
+	// 关闭待处理请求通道
+	close(c.pendingReqs)
+
+	// 等待所有正在进行的请求完成
+	c.working.Wait()
+
+	// 关闭网络连接和等待响应的通道
+	c.mu.Lock()
+	if c.conn != nil {
+		_ = c.conn.Close()
+	}
+	c.mu.Unlock()
+
+	// 关闭等待响应的通道
+	close(c.waitingReqs)
+	logger.Info("client stopped")
+}
+
+// Send 向 Redis 服务器发送命令并等待响应
+//
+//	args: 要发送的命令及其参数
+//
+// 返回服务器响应或错误信息
+func (c *Client) Send(args [][]byte) resp.Reply {
+
+	c.mu.Lock()
+	if c.closed {
+		c.mu.Unlock()
+		return reply.NewErrReply("client is closed")
+	}
+	c.mu.Unlock()
+
+	// 创建新的请求对象
+	req := &request{
+		id:        uint64(time.Now().Unix()), // 使用时间戳作为请求ID
+		args:      args,
+		heartbeat: false,
+		waiting:   &wait.Wait{}, // 用于等待响应的同步机制
+	}
+
+	// 增加等待计数器
+	req.waiting.Add(1)
+	c.working.Add(1)
+	defer c.working.Done()
+
+	// 将请求放入待处理队列
+	select {
+	case c.pendingReqs <- req:
+	case <-time.After(maxWait):
+		req.waiting.Done()
+		return reply.NewErrReply("待处理队列已满")
+	}
+
+	// 等待服务器响应,设置最大等待时间为 maxWait
+	timeout := req.waiting.WaitWithTimeout(maxWait)
+	if timeout {
+		return reply.NewErrReply("send server timeout")
+	}
+
+	// 检查请求是否有错误
+	if req.err != nil {
+		return reply.NewErrReply("request failed")
+	}
+
+	// 返回服务器响应
+	return req.reply
+}
+
+// handleWrite 处理待发送的请求队列
+func (c *Client) handleWrite() {
+	// 不断从待处理队列中取出请求并发送
+	for req := range c.pendingReqs {
+		c.doRequest(req)
+	}
+}
+
+// doRequest 执行实际的请求发送操作
+// req: 要发送的请求对象
+func (c *Client) doRequest(req *request) {
+	// 检查请求有效性
+	if req == nil || len(req.args) == 0 {
+		return
+	}
+
+	c.mu.RLock()
+	if c.closed {
+		c.mu.RUnlock()
+		if req.waiting != nil {
+			req.err = errors.New("client is closed for doRequest")
+			req.waiting.Done()
+		}
+		return
+	}
+	conn := c.conn
+	c.mu.RUnlock()
+
+	// 将命令参数封装成 RESP 协议格式
+	re := reply.NewMultiBulkReply(req.args)
+	bytes := re.ToBytes()
+
+	// 发送数据到服务器
+	_, err := conn.Write(bytes)
+	i := 0
+
+	// 如果发送失败,尝试最多3次重连
+	for err != nil && i < 3 {
+		logger.Warn("write to server error:", err)
+		logger.Info("reconnecting 第", i+1, " 次...")
+		err = c.handleConnectionError(err)
+		if err == nil {
+			c.mu.RLock()
+			if !c.closed {
+				_, err = c.conn.Write(bytes)
+			} else {
+				c.mu.RUnlock()
+				req.err = errors.New("client is closed for doRequest")
+				req.waiting.Done()
+				return
+			}
+			c.mu.RUnlock()
+		}
+		i++
+	}
+
+	// 如果发送成功,将请求移到等待响应队列
+	if err == nil {
+		select {
+		case c.waitingReqs <- req:
+		default:
+			req.err = errors.New("waiting queue is full")
+			req.waiting.Done()
+		}
+	} else {
+		// 如果发送失败,标记错误并通知等待方
+		req.err = err
+		req.waiting.Done()
+	}
+}
+
+// handleConnectionError 处理连接错误并尝试重新连接
+// err: 原始错误信息
+// 返回重连结果
+func (c *Client) handleConnectionError(err error) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if c.closed {
+		return errors.New("client is closed for handleConnectionError")
+	}
+
+	// 关闭当前连接
+	if c.conn != nil {
+		err1 := c.conn.Close()
+		if err1 != nil {
+			var opErr *net.OpError
+			if errors.As(err1, &opErr) {
+				// 如果是连接已关闭的正常错误,则忽略
+				if opErr.Err.Error() != "use of closed network connection" {
+					return err1
+				}
+			} else {
+				return err1
+			}
+		}
+	}
+
+	// 建立新连接
+	dial, err1 := net.Dial("tcp", c.addr)
+	if err1 != nil {
+		logger.Error(err1)
+		return err1
+	}
+
+	// 更新连接
+	c.conn = dial
+	logger.Info("reconnected")
+	return nil
+}
+
+// handleRead 处理从服务器读取的响应消息
+// 返回处理过程中可能发生的错误
+func (c *Client) handleRead() error {
+	c.mu.RLock()
+	if c.closed || c.conn == nil {
+		c.mu.RUnlock()
+		return nil
+	}
+	conn := c.conn
+	c.mu.RUnlock()
+
+	// 使用解析器解析从连接中读取的数据流
+	ch := parser.ParseStream(conn)
+
+	// 不断处理解析出来的响应数据
+	for payload := range ch {
+		if payload.Err != nil {
+			// 如果解析出错,记录错误并结束对应请求
+			logger.Error("parse message error:", payload.Err)
+			c.finishRequest(reply.NewErrReply(payload.Err.Error()))
+			continue
+		}
+		// 正常处理响应数据
+		c.finishRequest(payload.Data)
+	}
+	return nil
+}
+
+// finishRequest 完成请求处理,将响应返回给调用方
+// data: 从服务器收到的响应数据
+func (c *Client) finishRequest(data resp.Reply) {
+	// 使用 defer 捕获可能出现的 panic
+	defer func() {
+		if err := recover(); err != nil {
+			debug.PrintStack()
+			logger.Error("panic in finish request:", err)
+		}
+	}()
+
+	// 检查客户端是否已关闭
+	c.mu.RLock()
+	if c.closed {
+		c.mu.RUnlock()
+		return
+	}
+	c.mu.RUnlock()
+
+	// 从等待响应队列中取出对应的请求
+	select {
+	case req := <-c.waitingReqs:
+		if req == nil {
+			return
+		}
+
+		// 设置响应数据
+		req.reply = data
+
+		// 通知等待方请求已完成
+		if req.waiting != nil {
+			req.waiting.Done()
+		}
+	case <-time.After(100 * time.Millisecond):
+		// 防止在通道为空时阻塞过久
+		logger.Warn("no request waiting for response")
+		return
+	}
+}
+
+// heartbeat 发送心跳包维持连接
+func (c *Client) heartbeat() {
+	c.mu.RLock()
+	if c.closed {
+		c.mu.RUnlock()
+		return
+	}
+	c.mu.RUnlock()
+
+	// 每当定时器触发时发送心跳
+	for range c.ticker.C {
+		c.mu.RLock()
+		if c.closed {
+			c.mu.RUnlock()
+			return
+		}
+		c.mu.RUnlock()
+		// 发送心跳包
+		c.doHeartbeat()
+	}
+}
+
+// doHeartbeat 执行心跳操作
+func (c *Client) doHeartbeat() {
+	// 创建心跳请求(PING 命令)
+	req := &request{
+		id:        uint64(time.Now().UnixNano()),
+		heartbeat: true,
+		waiting:   &wait.Wait{},
+		args:      [][]byte{[]byte("ping")},
+	}
+
+	// 设置等待机制
+	req.waiting.Add(1)
+	c.working.Add(1)
+	defer c.working.Done()
+
+	// 发送心跳请求
+	select {
+	case c.pendingReqs <- req:
+	case <-time.After(maxWait):
+		req.waiting.Done()
+		logger.Warn("send heartbeat timeout")
+		return
+	}
+
+	// 等待响应(不关心结果)
+	req.waiting.WaitWithTimeout(maxWait)
+}
+
+// request 表示一个客户端请求
+type request struct {
+	id        uint64     // 请求唯一标识符
+	args      [][]byte   // 请求参数
+	reply     resp.Reply // 服务器响应
+	heartbeat bool       // 是否为心跳请求
+	waiting   *wait.Wait // 等待同步机制
+	err       error      // 请求过程中的错误
+}
+
+// 常量定义
+const (
+	chanSize = 256             // 通道缓冲区大小
+	maxWait  = 3 * time.Second // 最大等待时间
+)

+ 278 - 0
resp/client/client_test.go

@@ -0,0 +1,278 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/10/17 14:53
+// Desc: 客户端单元测试
+
+package client_
+
+import (
+	"bufio"
+	"bytes"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/runningwater/go-redis/resp/reply"
+)
+
+// mockServer 模拟Redis服务器用于测试
+type mockServer struct {
+	listener net.Listener
+	conns    []net.Conn
+	mu       sync.Mutex
+	closed   bool
+}
+
+// newMockServer 创建一个新的模拟服务器
+func newMockServer(addr string) (*mockServer, error) {
+	listener, err := net.Listen("tcp", addr)
+	if err != nil {
+		return nil, err
+	}
+
+	server := &mockServer{
+		listener: listener,
+		conns:    make([]net.Conn, 0),
+	}
+
+	go server.acceptConnections()
+	return server, nil
+}
+
+// acceptConnections 接受客户端连接
+func (s *mockServer) acceptConnections() {
+	for {
+		conn, err := s.listener.Accept()
+		if err != nil {
+			s.mu.Lock()
+			closed := s.closed
+			s.mu.Unlock()
+			if closed {
+				return
+			}
+			continue
+		}
+
+		s.mu.Lock()
+		s.conns = append(s.conns, conn)
+		s.mu.Unlock()
+
+		go s.handleConnection(conn)
+	}
+}
+
+// handleConnection 处理客户端连接
+func (s *mockServer) handleConnection(conn net.Conn) {
+	reader := bufio.NewReader(conn)
+	for {
+		// 读取客户端发送的数据
+		line, err := reader.ReadBytes('\n')
+		if err != nil {
+			return
+		}
+
+		// 解析命令
+		if bytes.Contains(line, []byte("PING")) {
+			// 回复 PONG
+			conn.Write([]byte("+PONG\r\n"))
+		} else if bytes.Contains(line, []byte("SET")) {
+			// 回复 OK
+			conn.Write([]byte("+OK\r\n"))
+		} else if bytes.Contains(line, []byte("GET")) {
+			// 回复值
+			conn.Write([]byte("$5\r\nvalue\r\n"))
+		} else {
+			// 默认回复 OK
+			conn.Write([]byte("+OK\r\n"))
+		}
+	}
+}
+
+// close 关闭模拟服务器
+func (s *mockServer) close() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	s.closed = true
+	s.listener.Close()
+
+	for _, conn := range s.conns {
+		conn.Close()
+	}
+}
+
+// TestNewClient 测试创建新客户端
+func TestNewClient(t *testing.T) {
+	// 启动模拟服务器
+	server, err := newMockServer("localhost:0")
+	assert.NoError(t, err)
+	defer server.close()
+
+	// 创建客户端
+	client, err := NewClient(server.listener.Addr().String())
+	assert.NoError(t, err)
+	assert.NotNil(t, client)
+	assert.Equal(t, server.listener.Addr().String(), client.addr)
+}
+
+// TestClientStartAndStop 测试客户端启动和停止
+func TestClientStartAndStop(t *testing.T) {
+	// 启动模拟服务器
+	server, err := newMockServer("localhost:0")
+	assert.NoError(t, err)
+	defer server.close()
+
+	// 创建客户端
+	client, err := NewClient(server.listener.Addr().String())
+	assert.NoError(t, err)
+
+	// 启动客户端
+	client.Start()
+
+	// 等待一段时间确保协程启动
+	time.Sleep(100 * time.Millisecond)
+
+	// 停止客户端
+	client.Stop()
+}
+
+// TestClientSend 测试发送命令
+func TestClientSend(t *testing.T) {
+	// 启动模拟服务器
+	server, err := newMockServer("localhost:0")
+	assert.NoError(t, err)
+	defer server.close()
+
+	// 创建客户端
+	client, err := NewClient(server.listener.Addr().String())
+	assert.NoError(t, err)
+	defer client.Stop()
+
+	// 启动客户端
+	client.Start()
+
+	// 发送 SET 命令
+	args := [][]byte{[]byte("SET"), []byte("key"), []byte("value")}
+	result := client.Send(args)
+
+	// 检查结果
+	assert.IsType(t, reply.NewOkReply(), result)
+	assert.Equal(t, "OK", result.String())
+}
+
+// TestClientPing 测试PING命令
+func TestClientPing(t *testing.T) {
+	// 启动模拟服务器
+	server, err := newMockServer("localhost:0")
+	assert.NoError(t, err)
+	defer server.close()
+
+	// 创建客户端
+	client, err := NewClient(server.listener.Addr().String())
+	assert.NoError(t, err)
+	defer client.Stop()
+
+	// 启动客户端
+	client.Start()
+
+	// 发送 PING 命令
+	args := [][]byte{[]byte("PING")}
+	result := client.Send(args)
+
+	// 检查结果
+	rep := reply.NewStatusReply("OK")
+	assert.IsType(t, rep, result)
+	assert.Equal(t, rep.String(), result.String())
+}
+
+// TestClientGet 测试GET命令
+func TestClientGet(t *testing.T) {
+	// 启动模拟服务器
+	server, err := newMockServer("localhost:0")
+	assert.NoError(t, err)
+	defer server.close()
+
+	// 创建客户端
+	client, err := NewClient(server.listener.Addr().String())
+	assert.NoError(t, err)
+	defer client.Stop()
+
+	// 启动客户端
+	client.Start()
+
+	// 发送 GET 命令
+	args := [][]byte{[]byte("GET"), []byte("key")}
+	result := client.Send(args)
+
+	// 检查结果
+	repl := reply.NewStatusReply("OK")
+	assert.IsType(t, repl, result)
+	assert.Equal(t, repl.String(), result.String())
+}
+
+// TestClientStop 测试停止已停止的客户端
+func TestClientStopClosed(t *testing.T) {
+	// 启动模拟服务器
+	server, err := newMockServer("localhost:0")
+	assert.NoError(t, err)
+	defer server.close()
+
+	// 创建客户端
+	client, err := NewClient(server.listener.Addr().String())
+	assert.NoError(t, err)
+
+	// 停止客户端两次
+	client.Stop()
+	client.Stop() // 应该安全地处理重复停止
+}
+
+// TestClientSendOnClosed 测试向已关闭的客户端发送命令
+func TestClientSendOnClosed(t *testing.T) {
+	// 启动模拟服务器
+	server, err := newMockServer("localhost:0")
+	assert.NoError(t, err)
+	defer server.close()
+
+	// 创建客户端
+	client, err := NewClient(server.listener.Addr().String())
+	assert.NoError(t, err)
+
+	// 启动然后立即停止客户端
+	client.Start()
+	client.Stop()
+
+	// 向已关闭的客户端发送命令
+	args := [][]byte{[]byte("PING")}
+	result := client.Send(args)
+
+	// 应该返回错误
+	assert.IsType(t, reply.NewErrReply(""), result)
+	assert.Contains(t, result.String(), "client is closed")
+}
+
+// TestClientHeartbeat 测试心跳功能
+func TestClientHeartbeat(t *testing.T) {
+	// 启动模拟服务器
+	server, err := newMockServer("localhost:0")
+	assert.NoError(t, err)
+	defer server.close()
+
+	// 创建客户端
+	client, err := NewClient(server.listener.Addr().String())
+	assert.NoError(t, err)
+	defer client.Stop()
+
+	// 启动客户端
+	client.Start()
+
+	// 等待心跳发生
+	time.Sleep(12 * time.Second) // 心跳间隔是10秒
+
+	// 客户端应该仍然工作正常
+	args := [][]byte{[]byte("PING")}
+	result := client.Send(args)
+	assert.IsType(t, reply.NewStatusReply("OK"), result)
+	assert.Equal(t, "+OK\r\n", result.String())
+}

+ 14 - 3
resp/handler/handler.go

@@ -12,6 +12,8 @@ import (
 	"strings"
 	"sync"
 
+	"github.com/runningwater/go-redis/cluster"
+	"github.com/runningwater/go-redis/config"
 	"github.com/runningwater/go-redis/database"
 	dbface "github.com/runningwater/go-redis/interface/database"
 	"github.com/runningwater/go-redis/lib/logger"
@@ -30,9 +32,17 @@ type RespHandler struct {
 
 // NewHandler creates a new RespHandler
 func NewHandler() *RespHandler {
+	var db dbface.Database
+	// db = database.NewEchoDatabase()
+	if config.Properties.Self != "" && len(config.Properties.Peers) > 0 {
+		logger.Info("cluster mode")
+		db = cluster.NewClusterDatabase()
+	} else {
+		logger.Info("standalone mode")
+		db = database.NewStandaloneDatabase()
+	}
 	return &RespHandler{
-		// db: database.NewEchoDatabase(),
-		db: database.NewDatabase(),
+		db: db,
 	}
 }
 
@@ -77,7 +87,8 @@ func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) {
 			logger.Error("empty payload")
 			continue
 		}
-		bulkReply, ok := payload.Data.(*reply.MultiBulkReply)
+		var bulkReply *reply.MultiBulkReply
+		ok := errors.As(payload.Data, &bulkReply)
 		if !ok {
 			logger.Error("require multi bulk reply")
 			continue

+ 10 - 10
resp/parser/parser.go

@@ -170,36 +170,36 @@ func parse0(reader io.Reader, ch chan<- *Payload) {
 // The data read may be: *3\r\n, $3\r\nSET\r\n, $3\r\nkey\r\n, $5\r\nvalue\r\n
 // The data read may be: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
 func readLine(reader *bufio.Reader, state *currentState) ([]byte, bool, error) {
-	var msg []byte
+	var line []byte
 	var err error
 	if state.bulkLen == 0 { // \r\n split
 		// Read a line of data
-		msg, err = reader.ReadBytes('\n')
-		logger.Debug("\r\n***readLine: ", string(msg))
+		line, err = reader.ReadBytes('\n')
+		logger.Debug("\r\n***readLine: ", string(line))
 		if err != nil {
 			return nil, true, err
 		}
 		// Data that does not end with \r\n
-		if len(msg) == 0 || msg[len(msg)-2] != '\r' {
-			return nil, false, errors.New("readLine-protocol error: " + string(msg))
+		if len(line) == 0 || line[len(line)-2] != '\r' {
+			return nil, false, errors.New("readLine-protocol error: " + string(line))
 		}
 
 	} else {
 		// ELSE previously read $number, strictly read number of bytes
-		msg = make([]byte, state.bulkLen+2)
-		_, err := io.ReadFull(reader, msg)
+		line = make([]byte, state.bulkLen+2)
+		_, err := io.ReadFull(reader, line)
 		if err != nil {
 			return nil, true, err
 		}
 		// Data that does not end with \r\n
-		if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
-			return nil, false, errors.New("readLine-protocol error: " + string(msg))
+		if len(line) == 0 || line[len(line)-2] != '\r' || line[len(line)-1] != '\n' {
+			return nil, false, errors.New("readLine-protocol error: " + string(line))
 		}
 
 		// Reset bulkLen
 		state.bulkLen = 0
 	}
-	return msg, false, nil
+	return line, false, nil
 }
 
 // Parse the header information of the string

+ 20 - 0
resp/reply/consts.go

@@ -7,6 +7,10 @@ package reply
 // PongReply 表示PONG响应,用于Redis协议的PONG响应
 type PongReply struct{}
 
+func (p PongReply) Error() string {
+	return ""
+}
+
 func (p PongReply) String() string {
 	return "+PONG\r\n"
 }
@@ -26,6 +30,10 @@ func NewPongReply() *PongReply {
 // 返回格式:"+OK\r\n"
 type OkReply struct{}
 
+func (o OkReply) Error() string {
+	return ""
+}
+
 func (o OkReply) String() string {
 	return "+OK\r\n"
 }
@@ -44,6 +52,10 @@ func NewOkReply() *OkReply {
 // NoReply 表示NO响应,用于Redis协议的NO响应
 type NoReply struct{}
 
+func (n NoReply) Error() string {
+	return ""
+}
+
 func (n NoReply) String() string {
 	return ""
 }
@@ -62,6 +74,10 @@ func NewNoReply() *NoReply {
 // 返回格式:"$-1\r\n"
 type NullBulkReply struct{}
 
+func (n NullBulkReply) Error() string {
+	return ""
+}
+
 func (n NullBulkReply) String() string {
 	return "$-1\r\n"
 }
@@ -80,6 +96,10 @@ func NewNullBulkReply() *NullBulkReply {
 // 返回格式:"*0\r\n"
 type EmptyMultiBulkReply struct{}
 
+func (e EmptyMultiBulkReply) Error() string {
+	return ""
+}
+
 func (e EmptyMultiBulkReply) String() string {
 	return "*0\r\n"
 }

+ 8 - 0
resp/reply/error.go

@@ -56,6 +56,10 @@ func NewArgNumErrReply(cmd string) *ArgNumErrReply {
 // 对应Redis的"syntax error"响应
 type SyntaxErrReply struct{}
 
+func (s *SyntaxErrReply) String() string {
+	return s.Error()
+}
+
 func (s *SyntaxErrReply) Error() string {
 	return "Err syntax error"
 }
@@ -94,6 +98,10 @@ type ProtocolErrReply struct {
 	Msg string
 }
 
+func (p *ProtocolErrReply) String() string {
+	return p.Error()
+}
+
 func (p *ProtocolErrReply) Error() string {
 	return "ERR Protocol error: '" + p.Msg + "'"
 }

+ 16 - 6
resp/reply/reply.go

@@ -39,6 +39,10 @@ type BulkReply struct {
 	Arg []byte // "test" => "$4\r\ntest\r\n"
 }
 
+func (b *BulkReply) Error() string {
+	return ""
+}
+
 func (b *BulkReply) String() string {
 	return string(b.ToBytes())
 }
@@ -60,6 +64,10 @@ type MultiBulkReply struct {
 	Args [][]byte
 }
 
+func (m *MultiBulkReply) Error() string {
+	return ""
+}
+
 func (m *MultiBulkReply) String() string {
 	return string(m.ToBytes())
 }
@@ -92,6 +100,10 @@ type StatusReply struct {
 	Status string
 }
 
+func (s *StatusReply) Error() string {
+	return ""
+}
+
 func (s *StatusReply) String() string {
 	return string(s.ToBytes())
 }
@@ -112,6 +124,10 @@ type IntReply struct {
 	Code int64
 }
 
+func (i *IntReply) Error() string {
+	return ""
+}
+
 func (i *IntReply) String() string {
 	return string(i.ToBytes())
 }
@@ -125,12 +141,6 @@ func NewIntReply(code int64) *IntReply {
 	return &IntReply{Code: code}
 }
 
-// ErrorReply represents an error reply interface.
-type ErrorReply interface {
-	Error() string
-	ToBytes() []byte
-}
-
 // StandardErrReply represents a standard error reply structure.
 // Corresponds to Redis replies starting with "-".
 // For example: "-ERR unknown command 'foobar'\r\n"