parser.go 8.1 KB


  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/7/11 15:58
  3. // Desc: Parser for Redis protocol
  4. //
  5. // The parser is used to parse the byte stream of the Redis protocol into abstract response objects
  6. package parser
  7. import (
  8. "bufio"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "runtime/debug"
  13. "strconv"
  14. "strings"
  15. "github.com/runningwater/go-redis/interface/resp"
  16. "github.com/runningwater/go-redis/lib/logger"
  17. "github.com/runningwater/go-redis/resp/reply"
  18. )
  19. // Payload is the payload of the parser
  20. //
  21. // Used to store the state information of the parser
  22. type Payload struct {
  23. Data resp.Reply // Parsed response object, consistent with the return data structure
  24. Err error // Errors that occurred during parsing
  25. }
  26. func (p *Payload) String() string {
  27. return fmt.Sprintf("payload: %s, err: %v", p.Data, p.Err)
  28. }
  29. type currentState struct {
  30. readingMultiLine bool // Whether multi-line data is being read
  31. expectedArgsCount int // Expected number of arguments
  32. msgType byte // Message type
  33. args [][]byte // Argument list
  34. bulkLen int64 // Length of bulk data
  35. }
  36. func (s *currentState) finished() bool {
  37. return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
  38. }
  39. // ParseStream parses data in the input stream
  40. // The parser parses the data in the input stream according to the rules of the Redis protocol
  41. // The parsed results will be sent to the caller through the channel
  42. // Parameters:
  43. //
  44. // reader io.Reader - Input stream reader
  45. //
  46. // Return value:
  47. //
  48. // <-chan *Payload - Parse result channel, each element is a parsed Payload object
  49. func ParseStream(reader io.Reader) <-chan *Payload {
  50. logger.Debug("parse stream...")
  51. ch := make(chan *Payload)
  52. go parse0(reader, ch)
  53. return ch
  54. }
  55. func parse0(reader io.Reader, ch chan<- *Payload) {
  56. defer func() {
  57. if err := recover(); err != nil {
  58. logger.Error(string(debug.Stack()))
  59. }
  60. }()
  61. logger.Debug("parse0 ...")
  62. bufReader := bufio.NewReader(reader)
  63. var state currentState
  64. var err error
  65. var line []byte // A line of data
  66. for { // Infinite loop
  67. var ioErr bool
  68. line, ioErr, err = readLine(bufReader, &state)
  69. if err != nil {
  70. ch <- &Payload{
  71. Err: err,
  72. }
  73. if ioErr {
  74. close(ch)
  75. return
  76. }
  77. state = currentState{} // Reset state
  78. continue
  79. }
  80. // Check if it is multi-line mode
  81. if !state.readingMultiLine {
  82. if line[0] == '*' { // *3/r/n
  83. err = parseMultiBulkHeader(line, &state)
  84. if err != nil {
  85. ch <- &Payload{
  86. Err: errors.New("protocol error: " + string(line)),
  87. }
  88. state = currentState{} // Reset state
  89. continue
  90. }
  91. if state.expectedArgsCount == 0 {
  92. ch <- &Payload{
  93. Data: reply.NewEmptyMultiBulkReply(),
  94. }
  95. state = currentState{} // Reset state
  96. continue
  97. }
  98. } else if line[0] == '$' { // $3/r/n
  99. err = parseBulkHeader(line, &state)
  100. if err != nil {
  101. ch <- &Payload{
  102. Err: errors.New("protocol error: " + string(line)),
  103. }
  104. state = currentState{} // Reset state
  105. continue
  106. }
  107. // $-1\r\n
  108. if state.bulkLen == -1 {
  109. ch <- &Payload{
  110. Data: reply.NewNullBulkReply(),
  111. }
  112. state = currentState{} // Reset state
  113. continue
  114. }
  115. } else {
  116. var result resp.Reply
  117. result, err = parseSingleLineReply(line)
  118. ch <- &Payload{
  119. Data: result,
  120. Err: err,
  121. }
  122. state = currentState{}
  123. continue
  124. }
  125. } else {
  126. // Parse multi-line mode
  127. err = readBody(line, &state)
  128. if err != nil {
  129. ch <- &Payload{
  130. Err: errors.New("protocol error: " + string(line)),
  131. }
  132. state = currentState{}
  133. continue
  134. }
  135. if state.finished() {
  136. var result resp.Reply
  137. switch state.msgType {
  138. case '*':
  139. result = reply.NewMultiBulkReply(state.args)
  140. case '$':
  141. result = reply.NewBulkReply(state.args[0])
  142. }
  143. ch <- &Payload{
  144. Data: result,
  145. }
  146. state = currentState{} // Reset state
  147. }
  148. }
  149. } // End for true
  150. }
  151. // Read a line of data
  152. // The read data may be a complete line or an incomplete line
  153. //
  154. // Return values:
  155. // 1. Data read
  156. // 2. Whether an incomplete line was read
  157. // 3. Error message
  158. // For example: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  159. // The data read may be: *3\r\n, $3\r\nSET\r\n, $3\r\nkey\r\n, $5\r\nvalue\r\n
  160. // The data read may be: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  161. func readLine(reader *bufio.Reader, state *currentState) ([]byte, bool, error) {
  162. var line []byte
  163. var err error
  164. if state.bulkLen == 0 { // \r\n split
  165. // Read a line of data
  166. line, err = reader.ReadBytes('\n')
  167. logger.Debug("\r\n***readLine: ", string(line))
  168. if err != nil {
  169. return nil, true, err
  170. }
  171. // Data that does not end with \r\n
  172. if len(line) == 0 || line[len(line)-2] != '\r' {
  173. return nil, false, errors.New("readLine-protocol error: " + string(line))
  174. }
  175. } else {
  176. // ELSE previously read $number, strictly read number of bytes
  177. line = make([]byte, state.bulkLen+2)
  178. _, err := io.ReadFull(reader, line)
  179. if err != nil {
  180. return nil, true, err
  181. }
  182. // Data that does not end with \r\n
  183. if len(line) == 0 || line[len(line)-2] != '\r' || line[len(line)-1] != '\n' {
  184. return nil, false, errors.New("readLine-protocol error: " + string(line))
  185. }
  186. // Reset bulkLen
  187. state.bulkLen = 0
  188. }
  189. return line, false, nil
  190. }
  191. // Parse the header information of the string
  192. //
  193. // For example: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  194. // Parsed as: *3\r\n
  195. func parseMultiBulkHeader(msg []byte, state *currentState) error {
  196. var err error
  197. var expectedLine uint64
  198. expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
  199. if err != nil {
  200. return errors.New("protocol error: " + string(msg))
  201. }
  202. if expectedLine == 0 {
  203. state.expectedArgsCount = 0
  204. return nil
  205. } else if expectedLine > 0 {
  206. state.msgType = msg[0]
  207. state.readingMultiLine = true
  208. state.expectedArgsCount = int(expectedLine)
  209. state.args = make([][]byte, 0, expectedLine)
  210. return nil
  211. } else {
  212. return errors.New("protocol error: " + string(msg))
  213. }
  214. }
  215. // Single line string
  216. //
  217. // $4\r\nPING\r\n
  218. func parseBulkHeader(msg []byte, state *currentState) error {
  219. var err error
  220. // $ Start line, read bulkLen
  221. state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
  222. if err != nil {
  223. return errors.New("protocol error: " + string(msg))
  224. }
  225. if state.bulkLen == -1 {
  226. return nil
  227. } else if state.bulkLen > 0 {
  228. state.msgType = msg[0] // $
  229. state.readingMultiLine = true
  230. state.expectedArgsCount = 1
  231. state.args = make([][]byte, 0, 1)
  232. return nil
  233. } else {
  234. return errors.New("protocol error: " + string(msg))
  235. }
  236. }
  237. // Parse single line reply
  238. //
  239. // For example:
  240. // +OK\r\n
  241. // -err\r\n
  242. // :1\r\n
  243. func parseSingleLineReply(msg []byte) (resp.Reply, error) {
  244. str := strings.TrimSuffix(string(msg), "\r\n") // Remove \r\n
  245. var result resp.Reply
  246. switch msg[0] {
  247. case '+':
  248. result = reply.NewStatusReply(str[1:])
  249. case '-':
  250. result = reply.NewErrReply(str[1:])
  251. case ':':
  252. val, err := strconv.ParseInt(str[1:], 10, 64)
  253. if err != nil {
  254. return nil, errors.New("protocol error: " + string(msg))
  255. }
  256. result = reply.NewIntReply(val)
  257. default:
  258. return nil, errors.New("protocol error: " + string(msg))
  259. }
  260. return result, nil
  261. }
  262. // readBody parses the message body part in the Redis protocol, handling bulk strings starting with $ or ordinary strings
  263. //
  264. // For example:
  265. // PING\r\n
  266. // SET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  267. //
  268. // Parameters:
  269. //
  270. // msg: Complete message byte slice, including \r\n ending
  271. // state: Pointer to current parsing state, used to store parsing results
  272. //
  273. // Return value:
  274. //
  275. // error: Errors that occurred during parsing, such as protocol format errors, etc.
  276. func readBody(msg []byte, state *currentState) error {
  277. line := msg[0 : len(msg)-2] // Remove \r\n
  278. var err error
  279. // Handle bulk string format starting with $
  280. if line[0] == '$' {
  281. state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
  282. if err != nil {
  283. return errors.New("protocol error: " + string(msg))
  284. }
  285. // Handle empty string case, $0\r\n represents empty string
  286. if state.bulkLen <= 0 {
  287. state.args = append(state.args, []byte{})
  288. state.bulkLen = 0
  289. }
  290. } else {
  291. // Handle ordinary strings, directly add to argument list
  292. state.args = append(state.args, line)
  293. }
  294. return nil
  295. }