parser.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. // Author: simon (ynwdlxm@163.com)
  2. // Date: 2025/7/11 15:58
  3. // Desc: 解析器
  4. //
  5. // 解析器用于将Redis协议的字节流解析为抽象的响应对象
  6. package parser
  7. import (
  8. "bufio"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "runtime/debug"
  13. "strconv"
  14. "strings"
  15. "github.com/runningwater/go-redis/interface/resp"
  16. "github.com/runningwater/go-redis/lib/logger"
  17. "github.com/runningwater/go-redis/resp/reply"
  18. )
  19. // Payload 解析器的载荷
  20. // 用于存储解析器的状态信息
  21. type Payload struct {
  22. Data resp.Reply // 解析后的响应对象, 与返回数据结构一致
  23. Err error // 解析过程中发生的错误
  24. }
  25. func (p *Payload) String() string {
  26. return fmt.Sprintf("payload: %s, err: %v", p.Data, p.Err)
  27. }
  28. type currentState struct {
  29. readingMultiLine bool // 是否正在读取多行数据
  30. expectedArgsCount int // 期望的参数数量
  31. msgType byte // 消息类型
  32. args [][]byte // 参数列表
  33. bulkLen int64 // 批量数据的长度
  34. }
  35. func (s *currentState) finished() bool {
  36. return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
  37. }
  38. // ParseStream 解析输入流中的数据
  39. // 解析器会根据Redis协议的规则解析输入流中的数据
  40. // 解析后的结果会通过通道发送给调用方
  41. // 参数:
  42. //
  43. // reader io.Reader - 输入流读取器
  44. //
  45. // 返回值:
  46. //
  47. // <-chan *Payload - 解析结果通道,每个元素都是一个解析后的Payload对象
  48. func ParseStream(reader io.Reader) <-chan *Payload {
  49. logger.Debug("parse stream...")
  50. ch := make(chan *Payload)
  51. go parse0(reader, ch)
  52. return ch
  53. }
  54. func parse0(reader io.Reader, ch chan<- *Payload) {
  55. defer func() {
  56. if err := recover(); err != nil {
  57. logger.Error(string(debug.Stack()))
  58. }
  59. }()
  60. logger.Debug("parse0 ...")
  61. bufReader := bufio.NewReader(reader)
  62. var state currentState
  63. var err error
  64. var line []byte // 一行数据
  65. for { // 死循环
  66. var ioErr bool
  67. line, ioErr, err = readLine(bufReader, &state)
  68. if err != nil {
  69. if ioErr {
  70. ch <- &Payload{
  71. Err: err,
  72. }
  73. close(ch)
  74. return
  75. }
  76. ch <- &Payload{
  77. Err: err,
  78. }
  79. state = currentState{} // 重置状态
  80. continue
  81. }
  82. // 判断是不是多行模式
  83. if !state.readingMultiLine {
  84. if line[0] == '*' { // *3/r/n
  85. err = parseMultiBulkHeader(line, &state)
  86. if err != nil {
  87. ch <- &Payload{
  88. Err: errors.New("protocol error: " + string(line)),
  89. }
  90. state = currentState{} // 重置状态
  91. continue
  92. }
  93. if state.expectedArgsCount == 0 {
  94. ch <- &Payload{
  95. Data: reply.NewEmptyMultiBulkReply(),
  96. }
  97. state = currentState{} // 重置状态
  98. continue
  99. }
  100. } else if line[0] == '$' { // $3/r/n
  101. err = parseBulkHeader(line, &state)
  102. if err != nil {
  103. ch <- &Payload{
  104. Err: errors.New("protocol error: " + string(line)),
  105. }
  106. state = currentState{} // 重置状态
  107. continue
  108. }
  109. // $-1\r\n
  110. if state.bulkLen == -1 {
  111. ch <- &Payload{
  112. Data: reply.NewNullBulkReply(),
  113. }
  114. state = currentState{} // 重置状态
  115. continue
  116. }
  117. } else {
  118. var result resp.Reply
  119. result, err = parseSingleLineReply(line)
  120. ch <- &Payload{
  121. Data: result,
  122. Err: err,
  123. }
  124. state = currentState{}
  125. continue
  126. }
  127. } else {
  128. // 解析多行模式
  129. err = readBody(line, &state)
  130. if err != nil {
  131. ch <- &Payload{
  132. Err: errors.New("protocol error: " + string(line)),
  133. }
  134. state = currentState{}
  135. continue
  136. }
  137. if state.finished() {
  138. var result resp.Reply
  139. switch state.msgType {
  140. case '*':
  141. result = reply.NewMultiBulkReply(state.args)
  142. case '$':
  143. result = reply.NewBulkReply(state.args[0])
  144. }
  145. ch <- &Payload{
  146. Data: result,
  147. Err: err,
  148. }
  149. state = currentState{} // 重置状态
  150. }
  151. }
  152. } // End for true
  153. }
  154. // 读取一行数据
  155. // 读取到的数据可能是完整的一行,也可能是不完整的一行
  156. //
  157. // 返回值:
  158. // 1. 读取到的数据
  159. // 2. 是否读取到了不完整的一行
  160. // 3. 错误信息
  161. // 例如: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  162. // 读取到的数据可能是: *3\r\n, $3\r\nSET\r\n, $3\r\nkey\r\n, $5\r\nvalue\r\n
  163. // 读取到的数据可能是: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  164. func readLine(reader *bufio.Reader, state *currentState) ([]byte, bool, error) {
  165. var msg []byte
  166. var err error
  167. if state.bulkLen == 0 { // \r\n 切分
  168. // 读取一行数据
  169. reader.Reset(reader)
  170. msg, err = reader.ReadBytes('\n')
  171. logger.Info("\r\n***readLine: ", string(msg))
  172. if err != nil {
  173. return nil, true, err
  174. }
  175. // 不是 \r\n 结尾的数据
  176. if len(msg) == 0 || msg[len(msg)-2] != '\r' {
  177. return nil, false, errors.New("readLine-protocol error: " + string(msg))
  178. }
  179. } else {
  180. // ELSE 之前读取到了 $数字,严格读取数字个字节
  181. msg = make([]byte, state.bulkLen+2)
  182. _, err := io.ReadFull(reader, msg)
  183. if err != nil {
  184. return nil, true, err
  185. }
  186. // 不是 \r\n 结尾的数据
  187. if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
  188. return nil, false, errors.New("readLine-protocol error: " + string(msg))
  189. }
  190. // 重置 bulkLen
  191. state.bulkLen = 0
  192. }
  193. return msg, false, nil
  194. }
  195. // 解析串的头部信息
  196. //
  197. // 例如: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  198. // 解析为: *3\r\n
  199. func parseMultiBulkHeader(msg []byte, state *currentState) error {
  200. var err error
  201. var expectedLine uint64
  202. expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
  203. if err != nil {
  204. return errors.New("protocol error: " + string(msg))
  205. }
  206. if expectedLine == 0 {
  207. state.expectedArgsCount = 0
  208. return nil
  209. } else if expectedLine > 0 {
  210. state.msgType = msg[0]
  211. state.readingMultiLine = true
  212. state.expectedArgsCount = int(expectedLine)
  213. state.args = make([][]byte, 0, expectedLine)
  214. return nil
  215. } else {
  216. return errors.New("protocol error: " + string(msg))
  217. }
  218. }
  219. // 单行字符串
  220. //
  221. // $4\r\nPING\r\n
  222. func parseBulkHeader(msg []byte, state *currentState) error {
  223. var err error
  224. // $ 开头的行,读取 bulkLen
  225. state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
  226. if err != nil {
  227. return errors.New("protocol error: " + string(msg))
  228. }
  229. if state.bulkLen == -1 {
  230. return nil
  231. } else if state.bulkLen > 0 {
  232. state.msgType = msg[0] // $
  233. state.readingMultiLine = true
  234. state.expectedArgsCount = 1
  235. state.args = make([][]byte, 0, 1)
  236. return nil
  237. } else {
  238. return errors.New("protocol error: " + string(msg))
  239. }
  240. }
  241. // 解析单行回复
  242. //
  243. // 例如:
  244. // +OK\r\n
  245. // -err\r\n
  246. // :1\r\n
  247. func parseSingleLineReply(msg []byte) (resp.Reply, error) {
  248. str := strings.TrimSuffix(string(msg), "\r\n") // 去除 \r\n
  249. var result resp.Reply
  250. switch msg[0] {
  251. case '+':
  252. result = reply.NewStatusReply(str[1:])
  253. case '-':
  254. result = reply.NewErrReply(str[1:])
  255. case ':':
  256. val, err := strconv.ParseInt(str[1:], 10, 64)
  257. if err != nil {
  258. return nil, errors.New("protocol error: " + string(msg))
  259. }
  260. result = reply.NewIntReply(val)
  261. default:
  262. return nil, errors.New("protocol error: " + string(msg))
  263. }
  264. return result, nil
  265. }
  266. // readBody 解析Redis协议中的消息体部分,处理以$开头的bulk字符串或普通字符串
  267. //
  268. // 例如:
  269. // PING\r\n
  270. // SET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
  271. //
  272. // 参数:
  273. //
  274. // msg: 完整的消息字节切片,包含\r\n结尾
  275. // state: 当前解析状态的指针,用于存储解析结果
  276. //
  277. // 返回值:
  278. //
  279. // error: 解析过程中出现的错误,如协议格式错误等
  280. func readBody(msg []byte, state *currentState) error {
  281. line := msg[:len(msg)-2] // 去除 \r\n
  282. var err error
  283. // 处理以$开头的bulk字符串格式
  284. if line[0] == '$' {
  285. state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
  286. if err != nil {
  287. return errors.New("protocol error: " + string(msg))
  288. }
  289. // 处理空字符串情况,$0\r\n表示空字符串
  290. if state.bulkLen <= 0 {
  291. state.args = append(state.args, []byte{})
  292. state.bulkLen = 0
  293. }
  294. } else {
  295. // 处理普通字符串,直接添加到参数列表中
  296. state.args = append(state.args, line)
  297. }
  298. return nil
  299. }