// Author: simon (ynwdlxm@163.com) // Date: 2025/7/11 15:58 // Desc: 解析器 // // 解析器用于将Redis协议的字节流解析为抽象的响应对象 package parser import ( "bufio" "errors" "fmt" "io" "runtime/debug" "strconv" "strings" "github.com/runningwater/go-redis/interface/resp" "github.com/runningwater/go-redis/lib/logger" "github.com/runningwater/go-redis/resp/reply" ) // Payload 解析器的载荷 // 用于存储解析器的状态信息 type Payload struct { Data resp.Reply // 解析后的响应对象, 与返回数据结构一致 Err error // 解析过程中发生的错误 } func (p *Payload) String() string { return fmt.Sprintf("payload: %s, err: %v", p.Data, p.Err) } type currentState struct { readingMultiLine bool // 是否正在读取多行数据 expectedArgsCount int // 期望的参数数量 msgType byte // 消息类型 args [][]byte // 参数列表 bulkLen int64 // 批量数据的长度 } func (s *currentState) finished() bool { return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount } // ParseStream 解析输入流中的数据 // 解析器会根据Redis协议的规则解析输入流中的数据 // 解析后的结果会通过通道发送给调用方 // 参数: // // reader io.Reader - 输入流读取器 // // 返回值: // // <-chan *Payload - 解析结果通道,每个元素都是一个解析后的Payload对象 func ParseStream(reader io.Reader) <-chan *Payload { logger.Debug("parse stream...") ch := make(chan *Payload) go parse0(reader, ch) return ch } func parse0(reader io.Reader, ch chan<- *Payload) { defer func() { if err := recover(); err != nil { logger.Error(string(debug.Stack())) } }() logger.Debug("parse0 ...") bufReader := bufio.NewReader(reader) var state currentState var err error var line []byte // 一行数据 for { // 死循环 var ioErr bool line, ioErr, err = readLine(bufReader, &state) if err != nil { if ioErr { ch <- &Payload{ Err: err, } close(ch) return } ch <- &Payload{ Err: err, } state = currentState{} // 重置状态 continue } // 判断是不是多行模式 if !state.readingMultiLine { if line[0] == '*' { // *3/r/n err = parseMultiBulkHeader(line, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(line)), } state = currentState{} // 重置状态 continue } if state.expectedArgsCount == 0 { ch <- &Payload{ Data: reply.NewEmptyMultiBulkReply(), } state = currentState{} // 重置状态 continue } } else if line[0] == '$' { // $3/r/n err = parseBulkHeader(line, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(line)), } state = currentState{} // 重置状态 continue } // $-1\r\n if state.bulkLen == -1 { ch <- &Payload{ Data: reply.NewNullBulkReply(), } state = currentState{} // 重置状态 continue } } else { var result resp.Reply result, err = parseSingleLineReply(line) ch <- &Payload{ Data: result, Err: err, } state = currentState{} continue } } else { // 解析多行模式 err = readBody(line, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(line)), } state = currentState{} continue } if state.finished() { var result resp.Reply switch state.msgType { case '*': result = reply.NewMultiBulkReply(state.args) case '$': result = reply.NewBulkReply(state.args[0]) } ch <- &Payload{ Data: result, Err: err, } state = currentState{} // 重置状态 } } } // End for true } // 读取一行数据 // 读取到的数据可能是完整的一行,也可能是不完整的一行 // // 返回值: // 1. 读取到的数据 // 2. 是否读取到了不完整的一行 // 3. 错误信息 // 例如: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n // 读取到的数据可能是: *3\r\n, $3\r\nSET\r\n, $3\r\nkey\r\n, $5\r\nvalue\r\n // 读取到的数据可能是: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n func readLine(reader *bufio.Reader, state *currentState) ([]byte, bool, error) { var msg []byte var err error if state.bulkLen == 0 { // \r\n 切分 // 读取一行数据 reader.Reset(reader) msg, err = reader.ReadBytes('\n') logger.Info("\r\n***readLine: ", string(msg)) if err != nil { return nil, true, err } // 不是 \r\n 结尾的数据 if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, false, errors.New("readLine-protocol error: " + string(msg)) } } else { // ELSE 之前读取到了 $数字,严格读取数字个字节 msg = make([]byte, state.bulkLen+2) _, err := io.ReadFull(reader, msg) if err != nil { return nil, true, err } // 不是 \r\n 结尾的数据 if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' { return nil, false, errors.New("readLine-protocol error: " + string(msg)) } // 重置 bulkLen state.bulkLen = 0 } return msg, false, nil } // 解析串的头部信息 // // 例如: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n // 解析为: *3\r\n func parseMultiBulkHeader(msg []byte, state *currentState) error { var err error var expectedLine uint64 expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { return errors.New("protocol error: " + string(msg)) } if expectedLine == 0 { state.expectedArgsCount = 0 return nil } else if expectedLine > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = int(expectedLine) state.args = make([][]byte, 0, expectedLine) return nil } else { return errors.New("protocol error: " + string(msg)) } } // 单行字符串 // // $4\r\nPING\r\n func parseBulkHeader(msg []byte, state *currentState) error { var err error // $ 开头的行,读取 bulkLen state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen == -1 { return nil } else if state.bulkLen > 0 { state.msgType = msg[0] // $ state.readingMultiLine = true state.expectedArgsCount = 1 state.args = make([][]byte, 0, 1) return nil } else { return errors.New("protocol error: " + string(msg)) } } // 解析单行回复 // // 例如: // +OK\r\n // -err\r\n // :1\r\n func parseSingleLineReply(msg []byte) (resp.Reply, error) { str := strings.TrimSuffix(string(msg), "\r\n") // 去除 \r\n var result resp.Reply switch msg[0] { case '+': result = reply.NewStatusReply(str[1:]) case '-': result = reply.NewErrReply(str[1:]) case ':': val, err := strconv.ParseInt(str[1:], 10, 64) if err != nil { return nil, errors.New("protocol error: " + string(msg)) } result = reply.NewIntReply(val) default: return nil, errors.New("protocol error: " + string(msg)) } return result, nil } // readBody 解析Redis协议中的消息体部分,处理以$开头的bulk字符串或普通字符串 // // 例如: // PING\r\n // SET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n // // 参数: // // msg: 完整的消息字节切片,包含\r\n结尾 // state: 当前解析状态的指针,用于存储解析结果 // // 返回值: // // error: 解析过程中出现的错误,如协议格式错误等 func readBody(msg []byte, state *currentState) error { line := msg[:len(msg)-2] // 去除 \r\n var err error // 处理以$开头的bulk字符串格式 if line[0] == '$' { state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } // 处理空字符串情况,$0\r\n表示空字符串 if state.bulkLen <= 0 { state.args = append(state.args, []byte{}) state.bulkLen = 0 } } else { // 处理普通字符串,直接添加到参数列表中 state.args = append(state.args, line) } return nil }