parser.go 8.1 KB


  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/7/11 15:58
  3. // Desc: Parser for Redis protocol
  4. //
  5. // The parser is used to parse the byte stream of the Redis protocol into abstract response objects
  6. package parser
  7. import (
  8. "bufio"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "runtime/debug"
  13. "strconv"
  14. "strings"
  15. "github.com/runningwater/go-redis/interface/resp"
  16. "github.com/runningwater/go-redis/lib/logger"
  17. "github.com/runningwater/go-redis/resp/reply"
  18. )
  19. // Payload is the payload of the parser
  20. // Used to store the state information of the parser
  21. type Payload struct {
  22. Data resp.Reply // Parsed response object, consistent with the return data structure
  23. Err error // Errors that occurred during parsing
  24. }
  25. func (p *Payload) String() string {
  26. return fmt.Sprintf("payload: %s, err: %v", p.Data, p.Err)
  27. }
  28. type currentState struct {
  29. readingMultiLine bool // Whether multi-line data is being read
  30. expectedArgsCount int // Expected number of arguments
  31. msgType byte // Message type
  32. args [][]byte // Argument list
  33. bulkLen int64 // Length of bulk data
  34. }
  35. func (s *currentState) finished() bool {
  36. return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
  37. }
  38. // ParseStream parses data in the input stream
  39. // The parser parses the data in the input stream according to the rules of the Redis protocol
  40. // The parsed results will be sent to the caller through the channel
  41. // Parameters:
  42. //
  43. // reader io.Reader - Input stream reader
  44. //
  45. // Return value:
  46. //
  47. // <-chan *Payload - Parse result channel, each element is a parsed Payload object
  48. func ParseStream(reader io.Reader) <-chan *Payload {
  49. logger.Debug("parse stream...")
  50. ch := make(chan *Payload)
  51. go parse0(reader, ch)
  52. return ch
  53. }
  54. func parse0(reader io.Reader, ch chan<- *Payload) {
  55. defer func() {
  56. if err := recover(); err != nil {
  57. logger.Error(string(debug.Stack()))
  58. }
  59. }()
  60. logger.Debug("parse0 ...")
  61. bufReader := bufio.NewReader(reader)
  62. var state currentState
  63. var err error
  64. var line []byte // A line of data
  65. for { // Infinite loop
  66. var ioErr bool
  67. line, ioErr, err = readLine(bufReader, &state)
  68. if err != nil {
  69. ch <- &Payload{
  70. Err: err,
  71. }
  72. if ioErr {
  73. close(ch)
  74. return
  75. }
  76. state = currentState{} // Reset state
  77. continue
  78. }
  79. // Check if it is multi-line mode
  80. if !state.readingMultiLine {
  81. if line[0] == '*' { // *3/r/n
  82. err = parseMultiBulkHeader(line, &state)
  83. if err != nil {
  84. ch <- &Payload{
  85. Err: errors.New("protocol error: " + string(line)),
  86. }
  87. state = currentState{} // Reset state
  88. continue
  89. }
  90. if state.expectedArgsCount == 0 {
  91. ch <- &Payload{
  92. Data: reply.NewEmptyMultiBulkReply(),
  93. }
  94. state = currentState{} // Reset state
  95. continue
  96. }
  97. } else if line[0] == '$' { // $3/r/n
  98. err = parseBulkHeader(line, &state)
  99. if err != nil {
  100. ch <- &Payload{
  101. Err: errors.New("protocol error: " + string(line)),
  102. }
  103. state = currentState{} // Reset state
  104. continue
  105. }
  106. // $-1\r\n
  107. if state.bulkLen == -1 {
  108. ch <- &Payload{
  109. Data: reply.NewNullBulkReply(),
  110. }
  111. state = currentState{} // Reset state
  112. continue
  113. }
  114. } else {
  115. var result resp.Reply
  116. result, err = parseSingleLineReply(line)
  117. ch <- &Payload{
  118. Data: result,
  119. Err: err,
  120. }
  121. state = currentState{}
  122. continue
  123. }
  124. } else {
  125. // Parse multi-line mode
  126. err = readBody(line, &state)
  127. if err != nil {
  128. ch <- &Payload{
  129. Err: errors.New("protocol error: " + string(line)),
  130. }
  131. state = currentState{}
  132. continue
  133. }
  134. if state.finished() {
  135. var result resp.Reply
  136. switch state.msgType {
  137. case '*':
  138. result = reply.NewMultiBulkReply(state.args)
  139. case '$':
  140. result = reply.NewBulkReply(state.args[0])
  141. }
  142. ch <- &Payload{
  143. Data: result,
  144. }
  145. state = currentState{} // Reset state
  146. }
  147. }
  148. } // End for true
  149. }
  150. // Read a line of data
  151. // The read data may be a complete line or an incomplete line
  152. //
  153. // Return values:
  154. // 1. Data read
  155. // 2. Whether an incomplete line was read
  156. // 3. Error message
  157. // For example: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  158. // The data read may be: *3\r\n, $3\r\nSET\r\n, $3\r\nkey\r\n, $5\r\nvalue\r\n
  159. // The data read may be: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  160. func readLine(reader *bufio.Reader, state *currentState) ([]byte, bool, error) {
  161. var msg []byte
  162. var err error
  163. if state.bulkLen == 0 { // \r\n split
  164. // Read a line of data
  165. msg, err = reader.ReadBytes('\n')
  166. logger.Info("\r\n***readLine: ", string(msg))
  167. if err != nil {
  168. return nil, true, err
  169. }
  170. // Data that does not end with \r\n
  171. if len(msg) == 0 || msg[len(msg)-2] != '\r' {
  172. return nil, false, errors.New("readLine-protocol error: " + string(msg))
  173. }
  174. } else {
  175. // ELSE previously read $number, strictly read number of bytes
  176. msg = make([]byte, state.bulkLen+2)
  177. _, err := io.ReadFull(reader, msg)
  178. if err != nil {
  179. return nil, true, err
  180. }
  181. // Data that does not end with \r\n
  182. if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
  183. return nil, false, errors.New("readLine-protocol error: " + string(msg))
  184. }
  185. // Reset bulkLen
  186. state.bulkLen = 0
  187. }
  188. return msg, false, nil
  189. }
  190. // Parse the header information of the string
  191. //
  192. // For example: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  193. // Parsed as: *3\r\n
  194. func parseMultiBulkHeader(msg []byte, state *currentState) error {
  195. var err error
  196. var expectedLine uint64
  197. expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
  198. if err != nil {
  199. return errors.New("protocol error: " + string(msg))
  200. }
  201. if expectedLine == 0 {
  202. state.expectedArgsCount = 0
  203. return nil
  204. } else if expectedLine > 0 {
  205. state.msgType = msg[0]
  206. state.readingMultiLine = true
  207. state.expectedArgsCount = int(expectedLine)
  208. state.args = make([][]byte, 0, expectedLine)
  209. return nil
  210. } else {
  211. return errors.New("protocol error: " + string(msg))
  212. }
  213. }
  214. // Single line string
  215. //
  216. // $4\r\nPING\r\n
  217. func parseBulkHeader(msg []byte, state *currentState) error {
  218. var err error
  219. // $ Start line, read bulkLen
  220. state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
  221. if err != nil {
  222. return errors.New("protocol error: " + string(msg))
  223. }
  224. if state.bulkLen == -1 {
  225. return nil
  226. } else if state.bulkLen > 0 {
  227. state.msgType = msg[0] // $
  228. state.readingMultiLine = true
  229. state.expectedArgsCount = 1
  230. state.args = make([][]byte, 0, 1)
  231. return nil
  232. } else {
  233. return errors.New("protocol error: " + string(msg))
  234. }
  235. }
  236. // Parse single line reply
  237. //
  238. // For example:
  239. // +OK\r\n
  240. // -err\r\n
  241. // :1\r\n
  242. func parseSingleLineReply(msg []byte) (resp.Reply, error) {
  243. str := strings.TrimSuffix(string(msg), "\r\n") // Remove \r\n
  244. var result resp.Reply
  245. switch msg[0] {
  246. case '+':
  247. result = reply.NewStatusReply(str[1:])
  248. case '-':
  249. result = reply.NewErrReply(str[1:])
  250. case ':':
  251. val, err := strconv.ParseInt(str[1:], 10, 64)
  252. if err != nil {
  253. return nil, errors.New("protocol error: " + string(msg))
  254. }
  255. result = reply.NewIntReply(val)
  256. default:
  257. return nil, errors.New("protocol error: " + string(msg))
  258. }
  259. return result, nil
  260. }
  261. // readBody parses the message body part in the Redis protocol, handling bulk strings starting with $ or ordinary strings
  262. //
  263. // For example:
  264. // PING\r\n
  265. // SET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  266. //
  267. // Parameters:
  268. //
  269. // msg: Complete message byte slice, including \r\n ending
  270. // state: Pointer to current parsing state, used to store parsing results
  271. //
  272. // Return value:
  273. //
  274. // error: Errors that occurred during parsing, such as protocol format errors, etc.
  275. func readBody(msg []byte, state *currentState) error {
  276. line := msg[0 : len(msg)-2] // Remove \r\n
  277. var err error
  278. // Handle bulk string format starting with $
  279. if line[0] == '$' {
  280. state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
  281. if err != nil {
  282. return errors.New("protocol error: " + string(msg))
  283. }
  284. // Handle empty string case, $0\r\n represents empty string
  285. if state.bulkLen <= 0 {
  286. state.args = append(state.args, []byte{})
  287. state.bulkLen = 0
  288. }
  289. } else {
  290. // Handle ordinary strings, directly add to argument list
  291. state.args = append(state.args, line)
  292. }
  293. return nil
  294. }