echo.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/5/26 10:52
  3. // Desc:
  4. package tcp
  5. import (
  6. "bufio"
  7. "context"
  8. "io"
  9. "net"
  10. "sync"
  11. "time"
  12. "github.com/runningwater/go-redis/lib/logger"
  13. "github.com/runningwater/go-redis/lib/sync/atomic"
  14. "github.com/runningwater/go-redis/lib/sync/wait"
  15. )
  16. // EchoClient 客户端信息
  17. type EchoClient struct {
  18. Conn net.Conn
  19. Waiting wait.Wait
  20. }
  21. func (e *EchoClient) Close() error {
  22. e.Waiting.WaitWithTimeout(10 * time.Second)
  23. _ = e.Conn.Close()
  24. return nil
  25. }
  26. type Echo struct {
  27. activeConn sync.Map
  28. closing atomic.Boolean
  29. }
  30. func NewEcho() *Echo {
  31. return &Echo{
  32. activeConn: sync.Map{},
  33. closing: 0,
  34. }
  35. }
  36. func (e *Echo) Handle(ctx context.Context, conn net.Conn) {
  37. if e.closing.Get() {
  38. _ = conn.Close()
  39. }
  40. // 记录客户端信息
  41. client := &EchoClient{
  42. Conn: conn,
  43. }
  44. e.activeConn.Store(client, struct{}{})
  45. reader := bufio.NewReader(conn)
  46. for {
  47. msg, err := reader.ReadString('\n')
  48. if err != nil {
  49. if err == io.EOF {
  50. logger.Info("Connection close")
  51. e.activeConn.Delete(client)
  52. } else {
  53. logger.Warn(err)
  54. }
  55. return
  56. }
  57. client.Waiting.Add(1)
  58. b := []byte(msg)
  59. _, _ = conn.Write(b)
  60. client.Waiting.Done()
  61. }
  62. }
  63. func (e *Echo) Close() error {
  64. logger.Info("handler shutting down")
  65. e.closing.Set(true)
  66. // 剔除所有连接
  67. e.activeConn.Range(func(key, value any) bool {
  68. client := key.(*EchoClient)
  69. _ = client.Conn.Close()
  70. return true
  71. })
  72. return nil
  73. }