echo.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/5/26 10:52
  3. // Desc:
  4. package tcp
  5. import (
  6. "bufio"
  7. "context"
  8. "io"
  9. "net"
  10. "sync"
  11. "time"
  12. "github.com/runningwater/go-redis/lib/logger"
  13. "github.com/runningwater/go-redis/lib/sync/atomic"
  14. "github.com/runningwater/go-redis/lib/sync/wait"
  15. )
  16. // EchoClient 客户端信息
  17. type EchoClient struct {
  18. Conn net.Conn
  19. Waiting wait.Wait
  20. }
  21. func (e *EchoClient) Close() error {
  22. e.Waiting.WaitWithTimeout(10 * time.Second)
  23. _ = e.Conn.Close()
  24. return nil
  25. }
  26. type EchoHandler struct {
  27. activeConn sync.Map
  28. closing atomic.Boolean
  29. }
  30. func NewEchoHandler() *EchoHandler {
  31. return &EchoHandler{
  32. activeConn: sync.Map{},
  33. closing: 0,
  34. }
  35. }
  36. func (e *EchoHandler) Handle(_ctx context.Context, conn net.Conn) {
  37. if e.closing.Get() {
  38. _ = conn.Close()
  39. }
  40. // 记录客户端信息
  41. client := &EchoClient{
  42. Conn: conn,
  43. }
  44. e.activeConn.Store(client, struct{}{})
  45. reader := bufio.NewReader(conn)
  46. for {
  47. msg, err := reader.ReadString('\n')
  48. if err != nil {
  49. if err == io.EOF {
  50. logger.Info("Connection close")
  51. e.activeConn.Delete(client)
  52. } else {
  53. logger.Warn(err)
  54. }
  55. return
  56. }
  57. msg = "Echo: " + msg
  58. client.Waiting.Add(1)
  59. b := []byte(msg)
  60. _, _ = conn.Write(b)
  61. client.Waiting.Done()
  62. }
  63. }
  64. func (e *EchoHandler) Close() error {
  65. logger.Info("handler shutting down")
  66. e.closing.Set(true)
  67. // 剔除所有连接
  68. e.activeConn.Range(func(key, value any) bool {
  69. client := key.(*EchoClient)
  70. _ = client.Conn.Close()
  71. return true
  72. })
  73. return nil
  74. }