|
|
@@ -0,0 +1,320 @@
|
|
|
+// Author: simon (ynwdlxm@163.com)
|
|
|
+// Date: 2025/7/11 15:58
|
|
|
+// Desc: 解析器
|
|
|
+//
|
|
|
+// 解析器用于将Redis协议的字节流解析为抽象的响应对象
|
|
|
+
|
|
|
+package parser
|
|
|
+
|
|
|
+import (
|
|
|
+ "bufio"
|
|
|
+ "errors"
|
|
|
+ "fmt"
|
|
|
+ "io"
|
|
|
+ "runtime/debug"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
+
|
|
|
+ "github.com/runningwater/go-redis/interface/resp"
|
|
|
+ "github.com/runningwater/go-redis/lib/logger"
|
|
|
+ "github.com/runningwater/go-redis/resp/reply"
|
|
|
+)
|
|
|
+
|
|
|
+// Payload 解析器的载荷
|
|
|
+// 用于存储解析器的状态信息
|
|
|
+type Payload struct {
|
|
|
+ Data resp.Reply // 解析后的响应对象, 与返回数据结构一致
|
|
|
+ Err error // 解析过程中发生的错误
|
|
|
+}
|
|
|
+
|
|
|
+func (p *Payload) String() string {
|
|
|
+ return fmt.Sprintf("payload: %s, err: %v", p.Data, p.Err)
|
|
|
+}
|
|
|
+
|
|
|
+type currentState struct {
|
|
|
+ readingMultiLine bool // 是否正在读取多行数据
|
|
|
+ expectedArgsCount int // 期望的参数数量
|
|
|
+ msgType byte // 消息类型
|
|
|
+ args [][]byte // 参数列表
|
|
|
+ bulkLen int64 // 批量数据的长度
|
|
|
+}
|
|
|
+
|
|
|
+func (s *currentState) finished() bool {
|
|
|
+ return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
|
|
|
+}
|
|
|
+
|
|
|
+// ParseStream 解析输入流中的数据
|
|
|
+// 解析器会根据Redis协议的规则解析输入流中的数据
|
|
|
+// 解析后的结果会通过通道发送给调用方
|
|
|
+// 参数:
|
|
|
+//
|
|
|
+// reader io.Reader - 输入流读取器
|
|
|
+//
|
|
|
+// 返回值:
|
|
|
+//
|
|
|
+// <-chan *Payload - 解析结果通道,每个元素都是一个解析后的Payload对象
|
|
|
+func ParseStream(reader io.Reader) <-chan *Payload {
|
|
|
+ logger.Debug("parse stream...")
|
|
|
+ ch := make(chan *Payload)
|
|
|
+ go parse0(reader, ch)
|
|
|
+ return ch
|
|
|
+}
|
|
|
+
|
|
|
+func parse0(reader io.Reader, ch chan<- *Payload) {
|
|
|
+ defer func() {
|
|
|
+ if err := recover(); err != nil {
|
|
|
+ logger.Error(string(debug.Stack()))
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ logger.Debug("parse0 ...")
|
|
|
+ bufReader := bufio.NewReader(reader)
|
|
|
+ var state currentState
|
|
|
+ var err error
|
|
|
+ var line []byte // 一行数据
|
|
|
+ for { // 死循环
|
|
|
+ var ioErr bool
|
|
|
+ line, ioErr, err = readLine(bufReader, &state)
|
|
|
+ if err != nil {
|
|
|
+ if ioErr {
|
|
|
+ ch <- &Payload{
|
|
|
+ Err: err,
|
|
|
+ }
|
|
|
+ close(ch)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ ch <- &Payload{
|
|
|
+ Err: err,
|
|
|
+ }
|
|
|
+ state = currentState{} // 重置状态
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // 判断是不是多行模式
|
|
|
+ if !state.readingMultiLine {
|
|
|
+ if line[0] == '*' { // *3/r/n
|
|
|
+ err = parseMultiBulkHeader(line, &state)
|
|
|
+ if err != nil {
|
|
|
+ ch <- &Payload{
|
|
|
+ Err: errors.New("protocol error: " + string(line)),
|
|
|
+ }
|
|
|
+ state = currentState{} // 重置状态
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if state.expectedArgsCount == 0 {
|
|
|
+ ch <- &Payload{
|
|
|
+ Data: reply.NewEmptyMultiBulkReply(),
|
|
|
+ }
|
|
|
+ state = currentState{} // 重置状态
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ } else if line[0] == '$' { // $3/r/n
|
|
|
+ err = parseBulkHeader(line, &state)
|
|
|
+ if err != nil {
|
|
|
+ ch <- &Payload{
|
|
|
+ Err: errors.New("protocol error: " + string(line)),
|
|
|
+ }
|
|
|
+ state = currentState{} // 重置状态
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ // $-1\r\n
|
|
|
+ if state.bulkLen == -1 {
|
|
|
+ ch <- &Payload{
|
|
|
+ Data: reply.NewNullBulkReply(),
|
|
|
+ }
|
|
|
+ state = currentState{} // 重置状态
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ var result resp.Reply
|
|
|
+ result, err = parseSingleLineReply(line)
|
|
|
+ ch <- &Payload{
|
|
|
+ Data: result,
|
|
|
+ Err: err,
|
|
|
+ }
|
|
|
+ state = currentState{}
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 解析多行模式
|
|
|
+ err = readBody(line, &state)
|
|
|
+ if err != nil {
|
|
|
+ ch <- &Payload{
|
|
|
+ Err: errors.New("protocol error: " + string(line)),
|
|
|
+ }
|
|
|
+ state = currentState{}
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if state.finished() {
|
|
|
+ var result resp.Reply
|
|
|
+ switch state.msgType {
|
|
|
+ case '*':
|
|
|
+ result = reply.NewMultiBulkReply(state.args)
|
|
|
+ case '$':
|
|
|
+ result = reply.NewBulkReply(state.args[0])
|
|
|
+ }
|
|
|
+ ch <- &Payload{
|
|
|
+ Data: result,
|
|
|
+ Err: err,
|
|
|
+ }
|
|
|
+ state = currentState{} // 重置状态
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } // End for true
|
|
|
+}
|
|
|
+
|
|
|
+// 读取一行数据
|
|
|
+// 读取到的数据可能是完整的一行,也可能是不完整的一行
|
|
|
+//
|
|
|
+// 返回值:
|
|
|
+// 1. 读取到的数据
|
|
|
+// 2. 是否读取到了不完整的一行
|
|
|
+// 3. 错误信息
|
|
|
+// 例如: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
+// 读取到的数据可能是: *3\r\n, $3\r\nSET\r\n, $3\r\nkey\r\n, $5\r\nvalue\r\n
|
|
|
+// 读取到的数据可能是: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
+func readLine(reader *bufio.Reader, state *currentState) ([]byte, bool, error) {
|
|
|
+ var msg []byte
|
|
|
+ var err error
|
|
|
+ if state.bulkLen == 0 { // \r\n 切分
|
|
|
+ // 读取一行数据
|
|
|
+ reader.Reset(reader)
|
|
|
+ msg, err = reader.ReadBytes('\n')
|
|
|
+ logger.Info("\r\n***readLine: ", string(msg))
|
|
|
+ if err != nil {
|
|
|
+ return nil, true, err
|
|
|
+ }
|
|
|
+ // 不是 \r\n 结尾的数据
|
|
|
+ if len(msg) == 0 || msg[len(msg)-2] != '\r' {
|
|
|
+ return nil, false, errors.New("readLine-protocol error: " + string(msg))
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+ // ELSE 之前读取到了 $数字,严格读取数字个字节
|
|
|
+ msg = make([]byte, state.bulkLen+2)
|
|
|
+ _, err := io.ReadFull(reader, msg)
|
|
|
+ if err != nil {
|
|
|
+ return nil, true, err
|
|
|
+ }
|
|
|
+ // 不是 \r\n 结尾的数据
|
|
|
+ if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
|
|
|
+ return nil, false, errors.New("readLine-protocol error: " + string(msg))
|
|
|
+ }
|
|
|
+
|
|
|
+ // 重置 bulkLen
|
|
|
+ state.bulkLen = 0
|
|
|
+ }
|
|
|
+ return msg, false, nil
|
|
|
+}
|
|
|
+
|
|
|
+// 解析串的头部信息
|
|
|
+//
|
|
|
+// 例如: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
+// 解析为: *3\r\n
|
|
|
+func parseMultiBulkHeader(msg []byte, state *currentState) error {
|
|
|
+ var err error
|
|
|
+ var expectedLine uint64
|
|
|
+ expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
|
|
|
+ if err != nil {
|
|
|
+ return errors.New("protocol error: " + string(msg))
|
|
|
+ }
|
|
|
+ if expectedLine == 0 {
|
|
|
+ state.expectedArgsCount = 0
|
|
|
+ return nil
|
|
|
+ } else if expectedLine > 0 {
|
|
|
+ state.msgType = msg[0]
|
|
|
+ state.readingMultiLine = true
|
|
|
+ state.expectedArgsCount = int(expectedLine)
|
|
|
+ state.args = make([][]byte, 0, expectedLine)
|
|
|
+ return nil
|
|
|
+ } else {
|
|
|
+ return errors.New("protocol error: " + string(msg))
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// 单行字符串
|
|
|
+//
|
|
|
+// $4\r\nPING\r\n
|
|
|
+func parseBulkHeader(msg []byte, state *currentState) error {
|
|
|
+ var err error
|
|
|
+ // $ 开头的行,读取 bulkLen
|
|
|
+ state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
|
|
|
+ if err != nil {
|
|
|
+ return errors.New("protocol error: " + string(msg))
|
|
|
+ }
|
|
|
+
|
|
|
+ if state.bulkLen == -1 {
|
|
|
+ return nil
|
|
|
+ } else if state.bulkLen > 0 {
|
|
|
+ state.msgType = msg[0] // $
|
|
|
+ state.readingMultiLine = true
|
|
|
+ state.expectedArgsCount = 1
|
|
|
+ state.args = make([][]byte, 0, 1)
|
|
|
+ return nil
|
|
|
+ } else {
|
|
|
+ return errors.New("protocol error: " + string(msg))
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// 解析单行回复
|
|
|
+//
|
|
|
+// 例如:
|
|
|
+// +OK\r\n
|
|
|
+// -err\r\n
|
|
|
+// :1\r\n
|
|
|
+func parseSingleLineReply(msg []byte) (resp.Reply, error) {
|
|
|
+ str := strings.TrimSuffix(string(msg), "\r\n") // 去除 \r\n
|
|
|
+
|
|
|
+ var result resp.Reply
|
|
|
+ switch msg[0] {
|
|
|
+ case '+':
|
|
|
+ result = reply.NewStatusReply(str[1:])
|
|
|
+ case '-':
|
|
|
+ result = reply.NewErrReply(str[1:])
|
|
|
+ case ':':
|
|
|
+ val, err := strconv.ParseInt(str[1:], 10, 64)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.New("protocol error: " + string(msg))
|
|
|
+ }
|
|
|
+ result = reply.NewIntReply(val)
|
|
|
+ default:
|
|
|
+ return nil, errors.New("protocol error: " + string(msg))
|
|
|
+ }
|
|
|
+ return result, nil
|
|
|
+}
|
|
|
+
|
|
|
+// readBody 解析Redis协议中的消息体部分,处理以$开头的bulk字符串或普通字符串
|
|
|
+//
|
|
|
+// 例如:
|
|
|
+// PING\r\n
|
|
|
+// SET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
+//
|
|
|
+// 参数:
|
|
|
+//
|
|
|
+// msg: 完整的消息字节切片,包含\r\n结尾
|
|
|
+// state: 当前解析状态的指针,用于存储解析结果
|
|
|
+//
|
|
|
+// 返回值:
|
|
|
+//
|
|
|
+// error: 解析过程中出现的错误,如协议格式错误等
|
|
|
+func readBody(msg []byte, state *currentState) error {
|
|
|
+ line := msg[:len(msg)-2] // 去除 \r\n
|
|
|
+
|
|
|
+ var err error
|
|
|
+
|
|
|
+ // 处理以$开头的bulk字符串格式
|
|
|
+ if line[0] == '$' {
|
|
|
+ state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
|
|
|
+ if err != nil {
|
|
|
+ return errors.New("protocol error: " + string(msg))
|
|
|
+ }
|
|
|
+ // 处理空字符串情况,$0\r\n表示空字符串
|
|
|
+ if state.bulkLen <= 0 {
|
|
|
+ state.args = append(state.args, []byte{})
|
|
|
+ state.bulkLen = 0
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 处理普通字符串,直接添加到参数列表中
|
|
|
+ state.args = append(state.args, line)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|