3 Commity b2ee81bac5 ... ceed43a94f

Autor SHA1 Správa Dátum
  runningwater ceed43a94f fix: reformate code 3 mesiacov pred
  runningwater 31ed57c373 fix: some bug 3 mesiacov pred
  runningwater 23a8a0df6e feat: 通用协议解析 3 mesiacov pred

+ 62 - 0
.golangci.yaml

@@ -0,0 +1,62 @@
+version: "2"
+linters:
+  settings:
+    whitespace:
+      multi-if: true
+      multi-func: true
+    assailant:
+      # To specify a set of function names to exclude.
+      # The values are merged with the builtin exclusions.
+      # The builtin exclusions can be disabled by setting `use-builtin-exclusions` to `false`.
+      # Default: ["^(fmt|log|logger|t|)\.(Print|Sprint|Sprint|Fatal|Panic|Error|Warn|Warning|Info|Debug|Log)(|f|ln)$"]
+      exclude:
+        - Append
+        - \.Wrapf
+      use-builtin-exclusions: false
+    laparoscopy:
+      # Check all assigning the loop variable to another variable.
+      # Default: false
+      check-alias: true
+    decoder:
+      dec-order:
+        - type
+        - const
+        - var
+        - func
+      # If true, underscore vars (vars with "_" as the name) will be ignored at all checks.
+      # Default: false (underscore vars are not ignored)
+      ignore-underscore-vars: false
+      # If true, order of declarations is not checked at all.
+      # Default: true (disabled)
+      disable-dec-order-check: false
+      # If true, `init` func can be anywhere in file (does not have to be declared before all other functions).
+      # Default: true (disabled)
+      disable-init-func-first-check: false
+      # If true, multiple global `type`, `const` and `var` declarations are allowed.
+      # Default: true (disabled)
+      disable-dec-num-check: false
+      # If true, type declarations will be ignored for dec num check.
+      # Default: false (type statements are not ignored)
+      disable-type-dec-num-check: false
+      # If true, const declarations will be ignored for dec num check.
+      # Default: false (const statements are not ignored)
+      disable-const-dec-num-check: false
+      # If true, var declarations will be ignored for dec num check.
+      # Default: false (var statements are not ignored)
+      disable-var-dec-num-check: false
+
+formatters:
+  settings:
+    gofmt:
+      simplify: false
+      rewrite-rules:
+        - pattern: 'interface{}'
+          replacement: 'any'
+        - pattern: 'a[b:len(a)]'
+          replacement: 'a[b:]'
+    golines:
+      max-len: 200
+      tab-len: 8
+      shorten-comments: true
+      reformat-tags: false
+      chain-split-dots: false

+ 15 - 0
.vscode/launch.json

@@ -0,0 +1,15 @@
+{
+  // Use IntelliSense to learn about possible attributes.
+  // Hover to view descriptions of existing attributes.
+  // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
+  "version": "0.2.0",
+  "configurations": [
+    {
+      "name": "Launch Package",
+      "type": "go",
+      "request": "launch",
+      "mode": "auto",
+      "program": "${fileDirname}/main.go"
+    }
+  ]
+}

+ 7 - 2
config/config.go

@@ -82,7 +82,7 @@ func parse(src io.Reader) *ServerProperties {
 					fieldVal.SetInt(intValue)
 					fieldVal.SetInt(intValue)
 				}
 				}
 			case reflect.Bool:
 			case reflect.Bool:
-				boolValue := "yes" == value
+				boolValue := value == "yes"
 				fieldVal.SetBool(boolValue)
 				fieldVal.SetBool(boolValue)
 			case reflect.Slice:
 			case reflect.Slice:
 				if field.Type.Elem().Kind() == reflect.String {
 				if field.Type.Elem().Kind() == reflect.String {
@@ -103,6 +103,11 @@ func SetupConfig(configFilename string) {
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
 	}
 	}
-	defer file.Close()
+	defer func() {
+		closeErr := file.Close()
+		if closeErr != nil {
+			panic(closeErr)
+		}
+	}()
 	Properties = parse(file)
 	Properties = parse(file)
 }
 }

+ 25 - 0
database/command.go

@@ -0,0 +1,25 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/9/15 18:08
+// Desc:
+
+package database
+
+import (
+	"strings"
+)
+
+// 命令表
+var cmdTable = make(map[string]*command)
+
+type command struct {
+	executor ExecFunc // 执行函数
+	arity    int      // 参数个数
+}
+
+func RegisterCommand(name string, executor ExecFunc, arity int) {
+	name = strings.ToLower(name)
+	cmdTable[name] = &command{
+		executor: executor,
+		arity:    arity,
+	}
+}

+ 100 - 0
database/db.go

@@ -0,0 +1,100 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/9/15 18:04
+// Desc:
+
+package database
+
+import (
+	"strings"
+
+	"github.com/runningwater/go-redis/datastruct/dict"
+	"github.com/runningwater/go-redis/interface/database"
+	"github.com/runningwater/go-redis/interface/resp"
+	"github.com/runningwater/go-redis/resp/reply"
+)
+
+// DB 数据库
+type DB struct {
+	index int
+	data  dict.Dict
+}
+
+type ExecFunc func(db *DB, args [][]byte) resp.Reply
+type CmdLine = [][]byte
+
+func NewDB() *DB {
+	return &DB{
+		index: 0,
+		data:  dict.NewSyncDict(),
+	}
+}
+
+// Exec 执行一个redis命令
+// 参数:
+//   - c: 客户端连接对象,表示发送命令的客户端连接
+//   - cmdLine: 命令行参数,包含命令名称和所有参数的字节切片
+//
+// 返回值:
+//   - resp.Reply: 命令执行结果的回复
+func (d *DB) Exec(c resp.Connection, cmdLine CmdLine) resp.Reply {
+	// 检查命令行参数是否为空
+	if len(cmdLine) == 0 {
+		return reply.NewErrReply("empty command")
+	}
+	// ping set setnx get
+	cmdName := strings.ToLower(string(cmdLine[0]))
+	// 查找命令
+	cmd, ok := cmdTable[cmdName]
+	if !ok {
+		return reply.NewUnknownErrReply(cmdName)
+	}
+	// 参数校验
+	if !validateArity(cmd.arity, cmdLine[1:]) {
+		return reply.NewArgNumErrReply(cmdName)
+	}
+
+	if cmd.executor == nil {
+		return reply.NewErrReply("command not implement")
+	}
+	return cmd.executor(d, cmdLine[1:])
+}
+
+func validateArity(arity int, args [][]byte) bool {
+	if arity >= 0 {
+		return arity == len(args)
+	}
+	// 变长的 arity 设置为 负的最小个数
+	return len(args) >= -arity
+}
+
+func (d *DB) GetEntity(key string) (*database.DataEntity, bool) {
+	raw, ok := d.data.Get(key)
+	if !ok {
+		return nil, false
+	}
+	entity, _ := raw.(*database.DataEntity)
+	return entity, true
+}
+
+func (d *DB) PutEntity(key string, entity *database.DataEntity) int {
+	return d.data.Put(key, entity)
+}
+
+func (d *DB) Remove(key string) {
+	d.data.Remove(key)
+}
+
+func (d *DB) Removes(keys ...string) (deleted int) {
+	deleted = 0
+	for _, key := range keys {
+		if _, exists := d.data.Get(key); exists {
+			d.Remove(key)
+			deleted++
+		}
+	}
+	return
+}
+
+func (d *DB) Flush() {
+	d.data.Clear()
+}

+ 29 - 0
database/echo_database.go

@@ -0,0 +1,29 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/9/12 14:57
+// Desc:
+
+package database
+
+import (
+	"github.com/runningwater/go-redis/interface/resp"
+	"github.com/runningwater/go-redis/lib/logger"
+	"github.com/runningwater/go-redis/resp/reply"
+)
+
+type EchoDatabase struct {
+}
+
+func NewEchoDatabase() *EchoDatabase {
+	return &EchoDatabase{}
+}
+func (e *EchoDatabase) Exec(_ resp.Connection, args [][]byte) resp.Reply {
+	return reply.NewMultiBulkReply(args)
+}
+
+func (e *EchoDatabase) Close() {
+	logger.Info("database closed")
+}
+
+func (e *EchoDatabase) AfterClientClose(_ resp.Connection) {
+	logger.Info("client closed, AfterClientClose")
+}

+ 21 - 0
datastruct/dict/dict.go

@@ -0,0 +1,21 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/9/15 17:00
+// Desc:
+
+package dict
+
+type Consumer func(key string, value any) bool
+
+type Dict interface {
+	Get(key string) (any, bool)
+	Len() int
+	Put(key string, value any) (result int)
+	PutIfAbsent(key string, value any) (result int)
+	PutIfExists(key string, value any) (result int)
+	Remove(key string) (result int)
+	ForEach(consumer Consumer)
+	Keys() []string
+	RandomKeys(limit int) []string
+	RandomDistinctKeys(limit int) []string
+	Clear()
+}

+ 113 - 0
datastruct/dict/sync_dict.go

@@ -0,0 +1,113 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/9/15 17:07
+// Desc:
+
+package dict
+
+import (
+	"sync"
+)
+
+type SyncDict struct {
+	m sync.Map
+}
+
+func (s *SyncDict) Get(key string) (any, bool) {
+	value, ok := s.m.Load(key)
+	return value, ok
+}
+
+func (s *SyncDict) Len() int {
+	length := 0
+	s.m.Range(func(key, value any) bool {
+		length++
+		return true
+	})
+	return length
+}
+
+func (s *SyncDict) Put(key string, value any) (result int) {
+	_, ok := s.m.Load(key)
+	s.m.Store(key, value)
+
+	if ok {
+		result = 0
+	} else {
+		result = 1
+	}
+	return
+}
+
+func (s *SyncDict) PutIfAbsent(key string, value any) (result int) {
+	_, ok := s.m.Load(key)
+	if ok {
+		return 0
+	}
+	s.m.Store(key, value)
+	return 1
+}
+
+func (s *SyncDict) PutIfExists(key string, value any) (result int) {
+	_, ok := s.m.Load(key)
+	if ok {
+		s.m.Store(key, value)
+		return 1
+	}
+
+	return 0
+}
+
+func (s *SyncDict) Remove(key string) (result int) {
+	_, ok := s.m.Load(key)
+	if ok {
+		s.m.Delete(key)
+		return 1
+	}
+	return 0
+}
+
+func (s *SyncDict) ForEach(consumer Consumer) {
+	s.m.Range(func(key, value any) bool {
+		consumer(key.(string), value)
+		return true
+	})
+}
+
+func (s *SyncDict) Keys() []string {
+	result := make([]string, s.Len())
+	s.m.Range(func(key, value any) bool {
+		result = append(result, key.(string))
+		return true
+	})
+	return result
+}
+
+func (s *SyncDict) RandomKeys(limit int) []string {
+	result := make([]string, limit)
+	for i := 0; i < limit; i++ {
+		s.m.Range(func(key, value any) bool {
+			result[i] = key.(string)
+			return false
+		})
+	}
+	return result
+}
+
+func (s *SyncDict) RandomDistinctKeys(limit int) []string {
+	result := make([]string, limit)
+	i := 0
+	s.m.Range(func(key, value any) bool {
+		result[i] = key.(string)
+		i++
+		return i < limit // 如果返回false,则结束遍历
+	})
+	return result
+}
+
+func (s *SyncDict) Clear() {
+	*s = *NewSyncDict()
+}
+
+func NewSyncDict() *SyncDict {
+	return &SyncDict{}
+}

+ 0 - 11
go.sum

@@ -1,11 +0,0 @@
-github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
-github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
-github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
-github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
-github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
-github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
-github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
-golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
-golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

+ 21 - 0
interface/database/database.go

@@ -0,0 +1,21 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/9/12 13:27
+// Desc: Redis 数据库业务操作
+
+package database
+
+import (
+	"github.com/runningwater/go-redis/interface/resp"
+)
+
+type CmdLine = [][]byte
+
+type DataEntity struct {
+	Data any
+}
+
+type Database interface {
+	Exec(client resp.Connection, args [][]byte) resp.Reply
+	Close()
+	AfterClientClose(client resp.Connection)
+}

+ 2 - 0
interface/resp/reply.go

@@ -8,4 +8,6 @@ package resp
 type Reply interface {
 type Reply interface {
 	// ToBytes 将响应对象转换为符合Redis协议的字节数组
 	// ToBytes 将响应对象转换为符合Redis协议的字节数组
 	ToBytes() []byte
 	ToBytes() []byte
+	// String 返回响应对象的字符串表示
+	String() string
 }
 }

+ 2 - 2
lib/logger/files.go

@@ -16,7 +16,7 @@ func checkPermission(src string) bool {
 }
 }
 
 
 func isNotExistMkDir(src string) error {
 func isNotExistMkDir(src string) error {
-	if notExist := checkNotExist(src); notExist == true {
+	if notExist := checkNotExist(src); notExist {
 		if err := mkDir(src); err != nil {
 		if err := mkDir(src); err != nil {
 			return err
 			return err
 		}
 		}
@@ -35,7 +35,7 @@ func mkDir(src string) error {
 
 
 func mustOpen(fileName, dir string) (*os.File, error) {
 func mustOpen(fileName, dir string) (*os.File, error) {
 	perm := checkPermission(dir)
 	perm := checkPermission(dir)
-	if perm == true {
+	if perm {
 		return nil, fmt.Errorf("permission denied dir: %s", dir)
 		return nil, fmt.Errorf("permission denied dir: %s", dir)
 	}
 	}
 
 

+ 1 - 1
lib/logger/logger.go

@@ -55,7 +55,7 @@ func Setup(settings *Settings) {
 		time.Now().Format(settings.TimeFormat),
 		time.Now().Format(settings.TimeFormat),
 		settings.Ext)
 		settings.Ext)
 
 
-	logFile, err := mustOpen(fileName, dir)
+	logFile, err = mustOpen(fileName, dir)
 	if err != nil {
 	if err != nil {
 		log.Fatalf("logging.Setup err: %s", err)
 		log.Fatalf("logging.Setup err: %s", err)
 	}
 	}

+ 6 - 3
main.go

@@ -10,6 +10,7 @@ import (
 
 
 	"github.com/runningwater/go-redis/config"
 	"github.com/runningwater/go-redis/config"
 	"github.com/runningwater/go-redis/lib/logger"
 	"github.com/runningwater/go-redis/lib/logger"
+	"github.com/runningwater/go-redis/resp/handler"
 	"github.com/runningwater/go-redis/tcp"
 	"github.com/runningwater/go-redis/tcp"
 )
 )
 
 
@@ -38,13 +39,15 @@ func main() {
 		config.Properties = defaultProperties
 		config.Properties = defaultProperties
 	}
 	}
 
 
-	logger.Info("start server")
+	addr := fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port)
+	logger.Info("server starting ...")
 
 
 	err := tcp.ListenAndServeWithSignal(
 	err := tcp.ListenAndServeWithSignal(
 		&tcp.Config{
 		&tcp.Config{
-			Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port),
+			Address: addr,
 		},
 		},
-		tcp.NewEcho(),
+		// tcp.NewEchoHandler(),
+		handler.NewHandler(),
 	)
 	)
 	if err != nil {
 	if err != nil {
 		logger.Error(err)
 		logger.Error(err)

+ 72 - 0
resp/connection/conn.go

@@ -0,0 +1,72 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/9/12 11:02
+// Desc: Encapsulation of each connection (description of each client connection)
+
+package connection
+
+import (
+	"net"
+	"sync"
+	"time"
+
+	"github.com/runningwater/go-redis/lib/logger"
+	"github.com/runningwater/go-redis/lib/sync/wait"
+)
+
+// Connection represents a client connection to the Redis server
+type Connection struct {
+	conn       net.Conn
+	waiting    wait.Wait
+	mu         sync.Mutex
+	selectedDB int
+}
+
+// NewConnection creates a new connection wrapper
+func NewConnection(conn net.Conn) *Connection {
+	return &Connection{
+		conn: conn,
+	}
+}
+
+// RemoteAddr returns the remote network address
+func (c *Connection) RemoteAddr() net.Addr {
+	return c.conn.RemoteAddr()
+}
+
+// Close closes the connection
+func (c *Connection) Close() error {
+	timeout := c.waiting.WaitWithTimeout(10 * time.Second)
+	if timeout {
+		logger.Info("timeout to close connection")
+	} else {
+		logger.Info("connection closed")
+	}
+	err := c.conn.Close()
+	return err
+}
+
+// Write writes data to the connection
+func (c *Connection) Write(bytes []byte) error {
+	if len(bytes) == 0 {
+		return nil
+	}
+	logger.InfoC("write to client: ", string(bytes))
+	c.mu.Lock()
+	c.waiting.Add(1)
+	defer func() {
+		c.waiting.Done()
+		c.mu.Unlock()
+	}()
+	_, err := c.conn.Write(bytes)
+	return err
+}
+
+// GetDBIndex returns the currently selected database index
+func (c *Connection) GetDBIndex() int {
+	return c.selectedDB
+}
+
+// SelectDB selects a database
+func (c *Connection) SelectDB(dbNum int) {
+	c.selectedDB = dbNum
+}

+ 126 - 0
resp/handler/handler.go

@@ -0,0 +1,126 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/9/12 13:24
+// Desc: Handling RESP protocol
+
+package handler
+
+import (
+	"context"
+	"errors"
+	"io"
+	"net"
+	"strings"
+	"sync"
+
+	"github.com/runningwater/go-redis/database"
+	dbface "github.com/runningwater/go-redis/interface/database"
+	"github.com/runningwater/go-redis/lib/logger"
+	"github.com/runningwater/go-redis/lib/sync/atomic"
+	"github.com/runningwater/go-redis/resp/connection"
+	"github.com/runningwater/go-redis/resp/parser"
+	"github.com/runningwater/go-redis/resp/reply"
+)
+
+// RespHandler handles client connections and RESP protocol
+type RespHandler struct {
+	activeConn sync.Map
+	db         dbface.Database
+	closing    atomic.Boolean
+}
+
+// NewHandler creates a new RespHandler
+func NewHandler() *RespHandler {
+	return &RespHandler{
+		db: database.NewEchoDatabase(),
+	}
+}
+
+// Handle processes client connections
+func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) {
+	if r.closing.Get() {
+		_ = conn.Close()
+		return
+	}
+	logger.Info("new connection from ", conn.RemoteAddr().String())
+	client := connection.NewConnection(conn)
+	r.activeConn.Store(client, struct{}{})
+
+	// Parse data
+	ch := parser.ParseStream(conn)
+	for payload := range ch {
+		logger.Info("received channel data: ", payload)
+		// error
+		if payload.Err != nil {
+			if errors.Is(payload.Err, io.EOF) ||
+				errors.Is(payload.Err, io.ErrUnexpectedEOF) ||
+				strings.Contains(payload.Err.Error(), "use of closed network connection") {
+				r.closeClient(client)
+				logger.Info("connection closed: ", client.RemoteAddr().String())
+				return
+			}
+			// Protocol error
+			logger.Error("protocol error: ", payload.Err.Error())
+			errReply := reply.NewErrReply(payload.Err.Error())
+
+			err := client.Write(errReply.ToBytes())
+			if err != nil {
+				r.closeClient(client)
+				logger.Error("client.Write:", err.Error())
+				return
+			}
+			continue
+		}
+
+		// exec
+		if payload.Data == nil {
+			logger.Error("empty payload")
+			continue
+		}
+		bulkReply, ok := payload.Data.(*reply.MultiBulkReply)
+		if !ok {
+			logger.Error("require multi bulk reply")
+			continue
+		}
+		// Command execution
+		execResult := r.db.Exec(client, bulkReply.Args)
+		if execResult == nil {
+			execResult = reply.NewUnknownErrReply("aa")
+		}
+		// Return result
+		logger.Info("exec result: ", execResult)
+
+		_ = client.Write(execResult.ToBytes())
+
+	} // for end
+}
+
+// Close shuts down the RespHandler and releases resources
+// This function closes all active connections and the database connection
+// Return value: error - Error information for the close operation, currently always returns nil
+func (r *RespHandler) Close() error {
+	logger.Info("handler shutting down...")
+	r.closing.Set(true)
+
+	// Iterate through all active connections and close them
+	r.activeConn.Range(
+		func(key, value any) bool {
+			client := key.(*connection.Connection)
+			_ = client.Close()
+			return true
+		})
+
+	// Close database connection
+	r.db.Close()
+
+	return nil
+}
+
+// closeClient closes a client connection
+// Parameters:
+//
+//	client *connection.Connection - The client connection object to close
+func (r *RespHandler) closeClient(client *connection.Connection) {
+	_ = client.Close()
+	r.db.AfterClientClose(client)
+	r.activeConn.Delete(client)
+}

+ 315 - 0
resp/parser/parser.go

@@ -0,0 +1,315 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/7/11 15:58
+// Desc: Parser for Redis protocol
+//
+// The parser is used to parse the byte stream of the Redis protocol into abstract response objects
+
+package parser
+
+import (
+	"bufio"
+	"errors"
+	"fmt"
+	"io"
+	"runtime/debug"
+	"strconv"
+	"strings"
+
+	"github.com/runningwater/go-redis/interface/resp"
+	"github.com/runningwater/go-redis/lib/logger"
+	"github.com/runningwater/go-redis/resp/reply"
+)
+
+// Payload is the payload of the parser
+// Used to store the state information of the parser
+type Payload struct {
+	Data resp.Reply // Parsed response object, consistent with the return data structure
+	Err  error      // Errors that occurred during parsing
+}
+
+func (p *Payload) String() string {
+	return fmt.Sprintf("payload: %s, err: %v", p.Data, p.Err)
+}
+
+type currentState struct {
+	readingMultiLine  bool     // Whether multi-line data is being read
+	expectedArgsCount int      // Expected number of arguments
+	msgType           byte     // Message type
+	args              [][]byte // Argument list
+	bulkLen           int64    // Length of bulk data
+}
+
+func (s *currentState) finished() bool {
+	return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
+}
+
+// ParseStream parses data in the input stream
+// The parser parses the data in the input stream according to the rules of the Redis protocol
+// The parsed results will be sent to the caller through the channel
+// Parameters:
+//
+//	reader io.Reader - Input stream reader
+//
+// Return value:
+//
+//	<-chan *Payload - Parse result channel, each element is a parsed Payload object
+func ParseStream(reader io.Reader) <-chan *Payload {
+	logger.Debug("parse stream...")
+	ch := make(chan *Payload)
+	go parse0(reader, ch)
+	return ch
+}
+
+func parse0(reader io.Reader, ch chan<- *Payload) {
+	defer func() {
+		if err := recover(); err != nil {
+			logger.Error(string(debug.Stack()))
+		}
+	}()
+	logger.Debug("parse0 ...")
+	bufReader := bufio.NewReader(reader)
+	var state currentState
+	var err error
+	var line []byte // A line of data
+	for {           // Infinite loop
+		var ioErr bool
+		line, ioErr, err = readLine(bufReader, &state)
+		if err != nil {
+			ch <- &Payload{
+				Err: err,
+			}
+			if ioErr {
+				close(ch)
+				return
+			}
+			state = currentState{} // Reset state
+			continue
+		}
+
+		// Check if it is multi-line mode
+		if !state.readingMultiLine {
+			if line[0] == '*' { // *3/r/n
+				err = parseMultiBulkHeader(line, &state)
+				if err != nil {
+					ch <- &Payload{
+						Err: errors.New("protocol error: " + string(line)),
+					}
+					state = currentState{} // Reset state
+					continue
+				}
+				if state.expectedArgsCount == 0 {
+					ch <- &Payload{
+						Data: reply.NewEmptyMultiBulkReply(),
+					}
+					state = currentState{} // Reset state
+					continue
+				}
+			} else if line[0] == '$' { // $3/r/n
+				err = parseBulkHeader(line, &state)
+				if err != nil {
+					ch <- &Payload{
+						Err: errors.New("protocol error: " + string(line)),
+					}
+					state = currentState{} // Reset state
+					continue
+				}
+				// $-1\r\n
+				if state.bulkLen == -1 {
+					ch <- &Payload{
+						Data: reply.NewNullBulkReply(),
+					}
+					state = currentState{} // Reset state
+					continue
+				}
+			} else {
+				var result resp.Reply
+				result, err = parseSingleLineReply(line)
+				ch <- &Payload{
+					Data: result,
+					Err:  err,
+				}
+				state = currentState{}
+				continue
+			}
+		} else {
+			// Parse multi-line mode
+			err = readBody(line, &state)
+			if err != nil {
+				ch <- &Payload{
+					Err: errors.New("protocol error: " + string(line)),
+				}
+				state = currentState{}
+				continue
+			}
+			if state.finished() {
+				var result resp.Reply
+				switch state.msgType {
+				case '*':
+					result = reply.NewMultiBulkReply(state.args)
+				case '$':
+					result = reply.NewBulkReply(state.args[0])
+				}
+				ch <- &Payload{
+					Data: result,
+				}
+				state = currentState{} // Reset state
+			}
+		}
+	} // End for true
+}
+
+// Read a line of data
+// The read data may be a complete line or an incomplete line
+//
+// Return values:
+// 1. Data read
+// 2. Whether an incomplete line was read
+// 3. Error message
+// For example: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
+// The data read may be: *3\r\n, $3\r\nSET\r\n, $3\r\nkey\r\n, $5\r\nvalue\r\n
+// The data read may be: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
+func readLine(reader *bufio.Reader, state *currentState) ([]byte, bool, error) {
+	var msg []byte
+	var err error
+	if state.bulkLen == 0 { // \r\n split
+		// Read a line of data
+		msg, err = reader.ReadBytes('\n')
+		logger.Info("\r\n***readLine: ", string(msg))
+		if err != nil {
+			return nil, true, err
+		}
+		// Data that does not end with \r\n
+		if len(msg) == 0 || msg[len(msg)-2] != '\r' {
+			return nil, false, errors.New("readLine-protocol error: " + string(msg))
+		}
+
+	} else {
+		// ELSE previously read $number, strictly read number of bytes
+		msg = make([]byte, state.bulkLen+2)
+		_, err := io.ReadFull(reader, msg)
+		if err != nil {
+			return nil, true, err
+		}
+		// Data that does not end with \r\n
+		if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
+			return nil, false, errors.New("readLine-protocol error: " + string(msg))
+		}
+
+		// Reset bulkLen
+		state.bulkLen = 0
+	}
+	return msg, false, nil
+}
+
+// Parse the header information of the string
+//
+// For example: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
+// Parsed as: *3\r\n
+func parseMultiBulkHeader(msg []byte, state *currentState) error {
+	var err error
+	var expectedLine uint64
+	expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
+	if err != nil {
+		return errors.New("protocol error: " + string(msg))
+	}
+	if expectedLine == 0 {
+		state.expectedArgsCount = 0
+		return nil
+	} else if expectedLine > 0 {
+		state.msgType = msg[0]
+		state.readingMultiLine = true
+		state.expectedArgsCount = int(expectedLine)
+		state.args = make([][]byte, 0, expectedLine)
+		return nil
+	} else {
+		return errors.New("protocol error: " + string(msg))
+	}
+}
+
+// Single line string
+//
+// $4\r\nPING\r\n
+func parseBulkHeader(msg []byte, state *currentState) error {
+	var err error
+	// $ Start line, read bulkLen
+	state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
+	if err != nil {
+		return errors.New("protocol error: " + string(msg))
+	}
+
+	if state.bulkLen == -1 {
+		return nil
+	} else if state.bulkLen > 0 {
+		state.msgType = msg[0] // $
+		state.readingMultiLine = true
+		state.expectedArgsCount = 1
+		state.args = make([][]byte, 0, 1)
+		return nil
+	} else {
+		return errors.New("protocol error: " + string(msg))
+	}
+}
+
+// Parse single line reply
+//
+// For example:
+// +OK\r\n
+// -err\r\n
+// :1\r\n
+func parseSingleLineReply(msg []byte) (resp.Reply, error) {
+	str := strings.TrimSuffix(string(msg), "\r\n") // Remove \r\n
+
+	var result resp.Reply
+	switch msg[0] {
+	case '+':
+		result = reply.NewStatusReply(str[1:])
+	case '-':
+		result = reply.NewErrReply(str[1:])
+	case ':':
+		val, err := strconv.ParseInt(str[1:], 10, 64)
+		if err != nil {
+			return nil, errors.New("protocol error: " + string(msg))
+		}
+		result = reply.NewIntReply(val)
+	default:
+		return nil, errors.New("protocol error: " + string(msg))
+	}
+	return result, nil
+}
+
+// readBody parses the message body part in the Redis protocol, handling bulk strings starting with $ or ordinary strings
+//
+// For example:
+// PING\r\n
+// SET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
+//
+// Parameters:
+//
+//	msg: Complete message byte slice, including \r\n ending
+//	state: Pointer to current parsing state, used to store parsing results
+//
+// Return value:
+//
+//	error: Errors that occurred during parsing, such as protocol format errors, etc.
+func readBody(msg []byte, state *currentState) error {
+	line := msg[0 : len(msg)-2] // Remove \r\n
+
+	var err error
+
+	// Handle bulk string format starting with $
+	if line[0] == '$' {
+		state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
+		if err != nil {
+			return errors.New("protocol error: " + string(msg))
+		}
+		// Handle empty string case, $0\r\n represents empty string
+		if state.bulkLen <= 0 {
+			state.args = append(state.args, []byte{})
+			state.bulkLen = 0
+		}
+	} else {
+		// Handle ordinary strings, directly add to argument list
+		state.args = append(state.args, line)
+	}
+	return nil
+}

+ 30 - 2
resp/reply/consts.go

@@ -7,6 +7,10 @@ package reply
 // PongReply 表示PONG响应,用于Redis协议的PONG响应
 // PongReply 表示PONG响应,用于Redis协议的PONG响应
 type PongReply struct{}
 type PongReply struct{}
 
 
+func (p PongReply) String() string {
+	return "+PONG\r\n"
+}
+
 // ToBytes 将PONG响应转换为字节数组
 // ToBytes 将PONG响应转换为字节数组
 // 返回格式:"+PONG\r\n"
 // 返回格式:"+PONG\r\n"
 func (p PongReply) ToBytes() []byte {
 func (p PongReply) ToBytes() []byte {
@@ -22,6 +26,10 @@ func NewPongReply() *PongReply {
 // 返回格式:"+OK\r\n"
 // 返回格式:"+OK\r\n"
 type OkReply struct{}
 type OkReply struct{}
 
 
+func (o OkReply) String() string {
+	return "+OK\r\n"
+}
+
 // ToBytes 将OK响应转换为字节数组
 // ToBytes 将OK响应转换为字节数组
 // 返回格式:"+OK\r\n"
 // 返回格式:"+OK\r\n"
 func (o OkReply) ToBytes() []byte {
 func (o OkReply) ToBytes() []byte {
@@ -36,6 +44,10 @@ func NewOkReply() *OkReply {
 // NoReply 表示NO响应,用于Redis协议的NO响应
 // NoReply 表示NO响应,用于Redis协议的NO响应
 type NoReply struct{}
 type NoReply struct{}
 
 
+func (n NoReply) String() string {
+	return ""
+}
+
 // ToBytes 将NO响应转换为字节数组
 // ToBytes 将NO响应转换为字节数组
 func (n NoReply) ToBytes() []byte {
 func (n NoReply) ToBytes() []byte {
 	return []byte("")
 	return []byte("")
@@ -46,15 +58,20 @@ func NewNoReply() *NoReply {
 	return &NoReply{}
 	return &NoReply{}
 }
 }
 
 
-// NullBulkReply 表示NO响应,用于Redis协议的NO响应
+// NullBulkReply 表示NULL响应,用于Redis协议的NULL响应
 // 返回格式:"$-1\r\n"
 // 返回格式:"$-1\r\n"
 type NullBulkReply struct{}
 type NullBulkReply struct{}
 
 
-// ToBytes 将NO响应转换为字节数组
+func (n NullBulkReply) String() string {
+	return "$-1\r\n"
+}
+
+// ToBytes 将NULL响应转换为字节数组
 // 返回格式:"$-1\r\n"
 // 返回格式:"$-1\r\n"
 func (n NullBulkReply) ToBytes() []byte {
 func (n NullBulkReply) ToBytes() []byte {
 	return []byte("$-1\r\n")
 	return []byte("$-1\r\n")
 }
 }
+
 func NewNullBulkReply() *NullBulkReply {
 func NewNullBulkReply() *NullBulkReply {
 	return &NullBulkReply{}
 	return &NullBulkReply{}
 }
 }
@@ -63,10 +80,21 @@ func NewNullBulkReply() *NullBulkReply {
 // 返回格式:"*0\r\n"
 // 返回格式:"*0\r\n"
 type EmptyMultiBulkReply struct{}
 type EmptyMultiBulkReply struct{}
 
 
+func (e EmptyMultiBulkReply) String() string {
+	return "*0\r\n"
+}
+
 // ToBytes 将空的多批量回复转换为字节数组
 // ToBytes 将空的多批量回复转换为字节数组
 func (e EmptyMultiBulkReply) ToBytes() []byte {
 func (e EmptyMultiBulkReply) ToBytes() []byte {
 	return []byte("*0\r\n")
 	return []byte("*0\r\n")
 }
 }
+
 func NewEmptyMultiBulkReply() *EmptyMultiBulkReply {
 func NewEmptyMultiBulkReply() *EmptyMultiBulkReply {
 	return &EmptyMultiBulkReply{}
 	return &EmptyMultiBulkReply{}
 }
 }
+
+// IsOkReply 判断回复是否为OK响应
+func IsOkReply(reply interface{}) bool {
+	_, ok := reply.(*OkReply)
+	return ok
+}

+ 30 - 6
resp/reply/error.go

@@ -9,17 +9,26 @@ package reply
 
 
 // UnknownErrReply 表示Redis协议中的未知错误回复
 // UnknownErrReply 表示Redis协议中的未知错误回复
 // 当没有更具体的错误类型可用时使用
 // 当没有更具体的错误类型可用时使用
-type UnknownErrReply struct{}
+type UnknownErrReply struct {
+	Cmd string
+}
 
 
+func NewUnknownErrReply(cmd string) *UnknownErrReply {
+	return &UnknownErrReply{Cmd: cmd}
+}
+func (u *UnknownErrReply) String() string {
+	return u.Error()
+}
 func (u *UnknownErrReply) Error() string {
 func (u *UnknownErrReply) Error() string {
-	return "Err unknown"
+	return "Err unknown command '" + u.Cmd + "'"
 }
 }
 
 
 func (u *UnknownErrReply) ToBytes() []byte {
 func (u *UnknownErrReply) ToBytes() []byte {
-	return []byte("-Err unknown\r\n")
+	return []byte("-Err unknown command '" + u.Cmd + "'\r\n")
 }
 }
 
 
-// ArgNumErrReply represents an error reply for wrong number of arguments in Redis protocol.
+// ArgNumErrReply 表示命令参数数量错误的响应
+// 当命令的参数数量不符合预期时使用
 type ArgNumErrReply struct {
 type ArgNumErrReply struct {
 	Cmd string
 	Cmd string
 }
 }
@@ -32,12 +41,19 @@ func (a *ArgNumErrReply) ToBytes() []byte {
 	return []byte("-ERR wrong number of arguments for '" + a.Cmd + "' command\r\n")
 	return []byte("-ERR wrong number of arguments for '" + a.Cmd + "' command\r\n")
 }
 }
 
 
+func (a *ArgNumErrReply) String() string {
+	return a.Error()
+}
+
+// NewArgNumErrReply 创建指定命令的参数数量错误响应
 func NewArgNumErrReply(cmd string) *ArgNumErrReply {
 func NewArgNumErrReply(cmd string) *ArgNumErrReply {
 	return &ArgNumErrReply{
 	return &ArgNumErrReply{
 		Cmd: cmd,
 		Cmd: cmd,
 	}
 	}
 }
 }
 
 
+// SyntaxErrReply 表示命令语法错误的通用响应
+// 对应Redis的"syntax error"响应
 type SyntaxErrReply struct{}
 type SyntaxErrReply struct{}
 
 
 func (s *SyntaxErrReply) Error() string {
 func (s *SyntaxErrReply) Error() string {
@@ -52,20 +68,28 @@ func NewSyntaxErrReply() *SyntaxErrReply {
 	return &SyntaxErrReply{}
 	return &SyntaxErrReply{}
 }
 }
 
 
+// WrongTypeErrReply 表示对键值进行类型不匹配操作的错误
+// 对应Redis的 "WRONGTYPE" 错误响应
 type WrongTypeErrReply struct{}
 type WrongTypeErrReply struct{}
 
 
 func (w *WrongTypeErrReply) Error() string {
 func (w *WrongTypeErrReply) Error() string {
-	return "WRONGTYPE Operation against a key holding the wrong kind of value"
+	return "WRONG TYPE Operation against a key holding the wrong kind of value"
+}
+
+func (w *WrongTypeErrReply) String() string {
+	return w.Error()
 }
 }
 
 
 func (w *WrongTypeErrReply) ToBytes() []byte {
 func (w *WrongTypeErrReply) ToBytes() []byte {
-	return []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")
+	return []byte("-WRONG TYPE Operation against a key holding the wrong kind of value\r\n")
 }
 }
 
 
 func NewWrongTypeErrReply() *WrongTypeErrReply {
 func NewWrongTypeErrReply() *WrongTypeErrReply {
 	return &WrongTypeErrReply{}
 	return &WrongTypeErrReply{}
 }
 }
 
 
+// ProtocolErrReply 表示违反RESP协议规范的错误
+// Msg 包含具体的协议违规详情
 type ProtocolErrReply struct {
 type ProtocolErrReply struct {
 	Msg string
 	Msg string
 }
 }

+ 61 - 18
resp/reply/reply.go

@@ -1,25 +1,25 @@
-// Package reply 实现了Redis序列化协议(RESP)的回复处理功能。
+// Package reply implements the Redis Serialization Protocol (RESP) reply handling functionality.
 //
 //
-// 本包包含构建符合Redis RESP协议的各种响应格式的实现,支持以下类型:
-//   - 简单字符串(Simple Strings:  StatusReply
-//   - 错误类型(Errors):            StandardErrReply 及其子类型
-//   - 整型(Integers:             IntReply
-//   - 批量字符串(Bulk Strings:    BulkReply
-//   - 数组(Arrays:               MultiBulkReply
+// This package contains implementations for building various response formats that comply with the Redis RESP protocol, supporting the following types:
+//   - Simple Strings:  StatusReply
+//   - Errors:          StandardErrReply and its subtypes
+//   - Integers:        IntReply
+//   - Bulk Strings:    BulkReply
+//   - Arrays:          MultiBulkReply
 //
 //
-// 所有回复类型都实现了 `resp.Reply` 接口,通过 `ToBytes()` 方法生成符合RESP协议格式的字节数据。
+// All reply types implement the [resp.Reply](file:///Users/lixiaoming/go/src/github.com/runningwater/go-redis/interface/resp/reply.go#L7-L12) interface, generating byte data in the RESP protocol format through the [ToBytes()](file:///Users/lixiaoming/go/src/github.com/runningwater/go-redis/interface/resp/reply.go#L9-L9) method.
 //
 //
-// 示例用法:
+// Example usage:
 //
 //
-// 创建整数回复
+// Create an integer reply
 //
 //
 //	reply := NewIntReply(42)
 //	reply := NewIntReply(42)
-//	output := reply.ToBytes() // 输出 ":42\r\n"
+//	output := reply.ToBytes() // Output ":42\r\n"
 //
 //
-// 创建批量字符串回复
+// Create a bulk string reply
 //
 //
 //	bulk := NewBulkReply([]byte("hello"))
 //	bulk := NewBulkReply([]byte("hello"))
-//	output := bulk.ToBytes() // 输出 "$5\r\nhello\r\n"
+//	output := bulk.ToBytes() // Output "$5\r\nhello\r\n"
 package reply
 package reply
 
 
 import (
 import (
@@ -34,11 +34,15 @@ var (
 	CRLF               = "\r\n"
 	CRLF               = "\r\n"
 )
 )
 
 
-// BulkReply 表示批量字符串回复的结构体。
+// BulkReply represents a bulk string reply structure.
 type BulkReply struct {
 type BulkReply struct {
 	Arg []byte // "test" => "$4\r\ntest\r\n"
 	Arg []byte // "test" => "$4\r\ntest\r\n"
 }
 }
 
 
+func (b *BulkReply) String() string {
+	return string(b.ToBytes())
+}
+
 func (b *BulkReply) ToBytes() []byte {
 func (b *BulkReply) ToBytes() []byte {
 	if len(b.Arg) == 0 {
 	if len(b.Arg) == 0 {
 		return nullBulkReplyBytes
 		return nullBulkReplyBytes
@@ -46,15 +50,20 @@ func (b *BulkReply) ToBytes() []byte {
 	return []byte("$" + strconv.Itoa(len(b.Arg)) + CRLF + string(b.Arg) + CRLF)
 	return []byte("$" + strconv.Itoa(len(b.Arg)) + CRLF + string(b.Arg) + CRLF)
 }
 }
 
 
+// NewBulkReply creates a new bulk reply
 func NewBulkReply(arg []byte) *BulkReply {
 func NewBulkReply(arg []byte) *BulkReply {
 	return &BulkReply{Arg: arg}
 	return &BulkReply{Arg: arg}
 }
 }
 
 
-// MultiBulkReply 表示多条批量字符串回复的结构体。
+// MultiBulkReply represents a multi-bulk string reply structure.
 type MultiBulkReply struct {
 type MultiBulkReply struct {
 	Args [][]byte
 	Args [][]byte
 }
 }
 
 
+func (m *MultiBulkReply) String() string {
+	return string(m.ToBytes())
+}
+
 func (m *MultiBulkReply) ToBytes() []byte {
 func (m *MultiBulkReply) ToBytes() []byte {
 	argLen := len(m.Args)
 	argLen := len(m.Args)
 	var buf bytes.Buffer
 	var buf bytes.Buffer
@@ -70,40 +79,69 @@ func (m *MultiBulkReply) ToBytes() []byte {
 	return buf.Bytes()
 	return buf.Bytes()
 }
 }
 
 
+// NewMultiBulkReply creates a new multi-bulk reply
 func NewMultiBulkReply(args [][]byte) *MultiBulkReply {
 func NewMultiBulkReply(args [][]byte) *MultiBulkReply {
 	return &MultiBulkReply{Args: args}
 	return &MultiBulkReply{Args: args}
 }
 }
 
 
+// StatusReply represents a simple string reply structure.
+// For example: OK, PING, SET, GET command replies.
+// Corresponds to Redis replies starting with "+".
+// For example: "+OK\r\n"
 type StatusReply struct {
 type StatusReply struct {
 	Status string
 	Status string
 }
 }
 
 
+func (s *StatusReply) String() string {
+	return string(s.ToBytes())
+}
+
 func (s *StatusReply) ToBytes() []byte {
 func (s *StatusReply) ToBytes() []byte {
 	return []byte("+" + s.Status + CRLF)
 	return []byte("+" + s.Status + CRLF)
 }
 }
 
 
+// NewStatusReply creates a new status reply
 func NewStatusReply(status string) *StatusReply {
 func NewStatusReply(status string) *StatusReply {
 	return &StatusReply{Status: status}
 	return &StatusReply{Status: status}
 }
 }
 
 
+// IntReply represents an integer reply structure.
+// Corresponds to Redis replies starting with ":".
+// For example: ":1\r\n"
 type IntReply struct {
 type IntReply struct {
 	Code int64
 	Code int64
 }
 }
 
 
+func (i *IntReply) String() string {
+	return string(i.ToBytes())
+}
+
 func (i *IntReply) ToBytes() []byte {
 func (i *IntReply) ToBytes() []byte {
 	return []byte(":" + strconv.FormatInt(i.Code, 10) + CRLF)
 	return []byte(":" + strconv.FormatInt(i.Code, 10) + CRLF)
 }
 }
 
 
-// ErrorReply 表示错误回复的接口。
+// NewIntReply creates a new integer reply
+func NewIntReply(code int64) *IntReply {
+	return &IntReply{Code: code}
+}
+
+// ErrorReply represents an error reply interface.
 type ErrorReply interface {
 type ErrorReply interface {
 	Error() string
 	Error() string
 	ToBytes() []byte
 	ToBytes() []byte
 }
 }
 
 
+// StandardErrReply represents a standard error reply structure.
+// Corresponds to Redis replies starting with "-".
+// For example: "-ERR unknown command 'foobar'\r\n"
 type StandardErrReply struct {
 type StandardErrReply struct {
 	Status string
 	Status string
 }
 }
 
 
+func (s *StandardErrReply) String() string {
+	return string(s.ToBytes())
+}
+
 func (s *StandardErrReply) Error() string {
 func (s *StandardErrReply) Error() string {
 	return s.Status
 	return s.Status
 }
 }
@@ -112,10 +150,15 @@ func (s *StandardErrReply) ToBytes() []byte {
 	return []byte("-" + s.Status + CRLF)
 	return []byte("-" + s.Status + CRLF)
 }
 }
 
 
-func NewStandardErrReply(status string) *StandardErrReply {
+// NewErrReply creates a new error reply
+func NewErrReply(status string) *StandardErrReply {
 	return &StandardErrReply{Status: status}
 	return &StandardErrReply{Status: status}
 }
 }
 
 
+// IsErrReply checks if the reply is an error reply.
+// Error replies start with "-".
+// For example: "-ERR unknown command 'foobar'\r\n"
 func IsErrReply(reply resp.Reply) bool {
 func IsErrReply(reply resp.Reply) bool {
-	return reply.ToBytes()[0] == '-'
+	toBytes := reply.ToBytes()
+	return len(toBytes) > 0 && toBytes[0] == '-'
 }
 }

+ 3 - 2
tcp/echo.go

@@ -34,7 +34,7 @@ type EchoHandler struct {
 	closing    atomic.Boolean
 	closing    atomic.Boolean
 }
 }
 
 
-func NewEcho() *EchoHandler {
+func NewEchoHandler() *EchoHandler {
 	return &EchoHandler{
 	return &EchoHandler{
 		activeConn: sync.Map{},
 		activeConn: sync.Map{},
 		closing:    0,
 		closing:    0,
@@ -63,9 +63,10 @@ func (e *EchoHandler) Handle(_ctx context.Context, conn net.Conn) {
 			}
 			}
 			return
 			return
 		}
 		}
-
+		msg = "Echo: " + msg
 		client.Waiting.Add(1)
 		client.Waiting.Add(1)
 		b := []byte(msg)
 		b := []byte(msg)
+
 		_, _ = conn.Write(b)
 		_, _ = conn.Write(b)
 		client.Waiting.Done()
 		client.Waiting.Done()
 	}
 	}

+ 3 - 3
tcp/server.go

@@ -19,8 +19,8 @@ type Config struct {
 func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
 func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
 
 
 	closeChan := make(chan struct{})
 	closeChan := make(chan struct{})
-	sigChan := make(chan os.Signal)
-	signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
+	sigChan := make(chan os.Signal, 1)
+	signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) //nolint:govt
 	go func() {
 	go func() {
 		sig := <-sigChan
 		sig := <-sigChan
 		switch sig {
 		switch sig {
@@ -33,7 +33,7 @@ func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	logger.Info("start listening")
+	logger.Info("start listening at", cfg.Address)
 
 
 	return ListenAndServe(listener, handler, closeChan)
 	return ListenAndServe(listener, handler, closeChan)