| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- // Author: simon (ynwdlxm@163.com)
- // Date: 2025/7/11 15:58
- // Desc: 解析器
- //
- // 解析器用于将Redis协议的字节流解析为抽象的响应对象
- package parser
- import (
- "bufio"
- "errors"
- "fmt"
- "io"
- "runtime/debug"
- "strconv"
- "strings"
- "github.com/runningwater/go-redis/interface/resp"
- "github.com/runningwater/go-redis/lib/logger"
- "github.com/runningwater/go-redis/resp/reply"
- )
- // Payload 解析器的载荷
- // 用于存储解析器的状态信息
- type Payload struct {
- Data resp.Reply // 解析后的响应对象, 与返回数据结构一致
- Err error // 解析过程中发生的错误
- }
- func (p *Payload) String() string {
- return fmt.Sprintf("payload: %s, err: %v", p.Data, p.Err)
- }
- type currentState struct {
- readingMultiLine bool // 是否正在读取多行数据
- expectedArgsCount int // 期望的参数数量
- msgType byte // 消息类型
- args [][]byte // 参数列表
- bulkLen int64 // 批量数据的长度
- }
- func (s *currentState) finished() bool {
- return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
- }
- // ParseStream 解析输入流中的数据
- // 解析器会根据Redis协议的规则解析输入流中的数据
- // 解析后的结果会通过通道发送给调用方
- // 参数:
- //
- // reader io.Reader - 输入流读取器
- //
- // 返回值:
- //
- // <-chan *Payload - 解析结果通道,每个元素都是一个解析后的Payload对象
- func ParseStream(reader io.Reader) <-chan *Payload {
- logger.Debug("parse stream...")
- ch := make(chan *Payload)
- go parse0(reader, ch)
- return ch
- }
- func parse0(reader io.Reader, ch chan<- *Payload) {
- defer func() {
- if err := recover(); err != nil {
- logger.Error(string(debug.Stack()))
- }
- }()
- logger.Debug("parse0 ...")
- bufReader := bufio.NewReader(reader)
- var state currentState
- var err error
- var line []byte // 一行数据
- for { // 死循环
- var ioErr bool
- line, ioErr, err = readLine(bufReader, &state)
- if err != nil {
- if ioErr {
- ch <- &Payload{
- Err: err,
- }
- close(ch)
- return
- }
- ch <- &Payload{
- Err: err,
- }
- state = currentState{} // 重置状态
- continue
- }
- // 判断是不是多行模式
- if !state.readingMultiLine {
- if line[0] == '*' { // *3/r/n
- err = parseMultiBulkHeader(line, &state)
- if err != nil {
- ch <- &Payload{
- Err: errors.New("protocol error: " + string(line)),
- }
- state = currentState{} // 重置状态
- continue
- }
- if state.expectedArgsCount == 0 {
- ch <- &Payload{
- Data: reply.NewEmptyMultiBulkReply(),
- }
- state = currentState{} // 重置状态
- continue
- }
- } else if line[0] == '$' { // $3/r/n
- err = parseBulkHeader(line, &state)
- if err != nil {
- ch <- &Payload{
- Err: errors.New("protocol error: " + string(line)),
- }
- state = currentState{} // 重置状态
- continue
- }
- // $-1\r\n
- if state.bulkLen == -1 {
- ch <- &Payload{
- Data: reply.NewNullBulkReply(),
- }
- state = currentState{} // 重置状态
- continue
- }
- } else {
- var result resp.Reply
- result, err = parseSingleLineReply(line)
- ch <- &Payload{
- Data: result,
- Err: err,
- }
- state = currentState{}
- continue
- }
- } else {
- // 解析多行模式
- err = readBody(line, &state)
- if err != nil {
- ch <- &Payload{
- Err: errors.New("protocol error: " + string(line)),
- }
- state = currentState{}
- continue
- }
- if state.finished() {
- var result resp.Reply
- switch state.msgType {
- case '*':
- result = reply.NewMultiBulkReply(state.args)
- case '$':
- result = reply.NewBulkReply(state.args[0])
- }
- ch <- &Payload{
- Data: result,
- Err: err,
- }
- state = currentState{} // 重置状态
- }
- }
- } // End for true
- }
- // 读取一行数据
- // 读取到的数据可能是完整的一行,也可能是不完整的一行
- //
- // 返回值:
- // 1. 读取到的数据
- // 2. 是否读取到了不完整的一行
- // 3. 错误信息
- // 例如: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
- // 读取到的数据可能是: *3\r\n, $3\r\nSET\r\n, $3\r\nkey\r\n, $5\r\nvalue\r\n
- // 读取到的数据可能是: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
- func readLine(reader *bufio.Reader, state *currentState) ([]byte, bool, error) {
- var msg []byte
- var err error
- if state.bulkLen == 0 { // \r\n 切分
- // 读取一行数据
- reader.Reset(reader)
- msg, err = reader.ReadBytes('\n')
- logger.Info("\r\n***readLine: ", string(msg))
- if err != nil {
- return nil, true, err
- }
- // 不是 \r\n 结尾的数据
- if len(msg) == 0 || msg[len(msg)-2] != '\r' {
- return nil, false, errors.New("readLine-protocol error: " + string(msg))
- }
- } else {
- // ELSE 之前读取到了 $数字,严格读取数字个字节
- msg = make([]byte, state.bulkLen+2)
- _, err := io.ReadFull(reader, msg)
- if err != nil {
- return nil, true, err
- }
- // 不是 \r\n 结尾的数据
- if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
- return nil, false, errors.New("readLine-protocol error: " + string(msg))
- }
- // 重置 bulkLen
- state.bulkLen = 0
- }
- return msg, false, nil
- }
- // 解析串的头部信息
- //
- // 例如: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
- // 解析为: *3\r\n
- func parseMultiBulkHeader(msg []byte, state *currentState) error {
- var err error
- var expectedLine uint64
- expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
- if err != nil {
- return errors.New("protocol error: " + string(msg))
- }
- if expectedLine == 0 {
- state.expectedArgsCount = 0
- return nil
- } else if expectedLine > 0 {
- state.msgType = msg[0]
- state.readingMultiLine = true
- state.expectedArgsCount = int(expectedLine)
- state.args = make([][]byte, 0, expectedLine)
- return nil
- } else {
- return errors.New("protocol error: " + string(msg))
- }
- }
- // 单行字符串
- //
- // $4\r\nPING\r\n
- func parseBulkHeader(msg []byte, state *currentState) error {
- var err error
- // $ 开头的行,读取 bulkLen
- state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
- if err != nil {
- return errors.New("protocol error: " + string(msg))
- }
- if state.bulkLen == -1 {
- return nil
- } else if state.bulkLen > 0 {
- state.msgType = msg[0] // $
- state.readingMultiLine = true
- state.expectedArgsCount = 1
- state.args = make([][]byte, 0, 1)
- return nil
- } else {
- return errors.New("protocol error: " + string(msg))
- }
- }
- // 解析单行回复
- //
- // 例如:
- // +OK\r\n
- // -err\r\n
- // :1\r\n
- func parseSingleLineReply(msg []byte) (resp.Reply, error) {
- str := strings.TrimSuffix(string(msg), "\r\n") // 去除 \r\n
- var result resp.Reply
- switch msg[0] {
- case '+':
- result = reply.NewStatusReply(str[1:])
- case '-':
- result = reply.NewErrReply(str[1:])
- case ':':
- val, err := strconv.ParseInt(str[1:], 10, 64)
- if err != nil {
- return nil, errors.New("protocol error: " + string(msg))
- }
- result = reply.NewIntReply(val)
- default:
- return nil, errors.New("protocol error: " + string(msg))
- }
- return result, nil
- }
- // readBody 解析Redis协议中的消息体部分,处理以$开头的bulk字符串或普通字符串
- //
- // 例如:
- // PING\r\n
- // SET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
- //
- // 参数:
- //
- // msg: 完整的消息字节切片,包含\r\n结尾
- // state: 当前解析状态的指针,用于存储解析结果
- //
- // 返回值:
- //
- // error: 解析过程中出现的错误,如协议格式错误等
- func readBody(msg []byte, state *currentState) error {
- line := msg[:len(msg)-2] // 去除 \r\n
- var err error
- // 处理以$开头的bulk字符串格式
- if line[0] == '$' {
- state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
- if err != nil {
- return errors.New("protocol error: " + string(msg))
- }
- // 处理空字符串情况,$0\r\n表示空字符串
- if state.bulkLen <= 0 {
- state.args = append(state.args, []byte{})
- state.bulkLen = 0
- }
- } else {
- // 处理普通字符串,直接添加到参数列表中
- state.args = append(state.args, line)
- }
- return nil
- }
|