|
@@ -0,0 +1,315 @@
|
|
|
|
|
+// Author: simon (ynwdlxm@163.com)
|
|
|
|
|
+// Date: 2025/7/11 15:58
|
|
|
|
|
+// Desc: Parser for Redis protocol
|
|
|
|
|
+//
|
|
|
|
|
+// The parser is used to parse the byte stream of the Redis protocol into abstract response objects
|
|
|
|
|
+
|
|
|
|
|
+package parser
|
|
|
|
|
+
|
|
|
|
|
+import (
|
|
|
|
|
+ "bufio"
|
|
|
|
|
+ "errors"
|
|
|
|
|
+ "fmt"
|
|
|
|
|
+ "io"
|
|
|
|
|
+ "runtime/debug"
|
|
|
|
|
+ "strconv"
|
|
|
|
|
+ "strings"
|
|
|
|
|
+
|
|
|
|
|
+ "github.com/runningwater/go-redis/interface/resp"
|
|
|
|
|
+ "github.com/runningwater/go-redis/lib/logger"
|
|
|
|
|
+ "github.com/runningwater/go-redis/resp/reply"
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+// Payload is the payload of the parser
|
|
|
|
|
+// Used to store the state information of the parser
|
|
|
|
|
+type Payload struct {
|
|
|
|
|
+ Data resp.Reply // Parsed response object, consistent with the return data structure
|
|
|
|
|
+ Err error // Errors that occurred during parsing
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (p *Payload) String() string {
|
|
|
|
|
+ return fmt.Sprintf("payload: %s, err: %v", p.Data, p.Err)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+type currentState struct {
|
|
|
|
|
+ readingMultiLine bool // Whether multi-line data is being read
|
|
|
|
|
+ expectedArgsCount int // Expected number of arguments
|
|
|
|
|
+ msgType byte // Message type
|
|
|
|
|
+ args [][]byte // Argument list
|
|
|
|
|
+ bulkLen int64 // Length of bulk data
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *currentState) finished() bool {
|
|
|
|
|
+ return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// ParseStream parses data in the input stream
|
|
|
|
|
+// The parser parses the data in the input stream according to the rules of the Redis protocol
|
|
|
|
|
+// The parsed results will be sent to the caller through the channel
|
|
|
|
|
+// Parameters:
|
|
|
|
|
+//
|
|
|
|
|
+// reader io.Reader - Input stream reader
|
|
|
|
|
+//
|
|
|
|
|
+// Return value:
|
|
|
|
|
+//
|
|
|
|
|
+// <-chan *Payload - Parse result channel, each element is a parsed Payload object
|
|
|
|
|
+func ParseStream(reader io.Reader) <-chan *Payload {
|
|
|
|
|
+ logger.Debug("parse stream...")
|
|
|
|
|
+ ch := make(chan *Payload)
|
|
|
|
|
+ go parse0(reader, ch)
|
|
|
|
|
+ return ch
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func parse0(reader io.Reader, ch chan<- *Payload) {
|
|
|
|
|
+ defer func() {
|
|
|
|
|
+ if err := recover(); err != nil {
|
|
|
|
|
+ logger.Error(string(debug.Stack()))
|
|
|
|
|
+ }
|
|
|
|
|
+ }()
|
|
|
|
|
+ logger.Debug("parse0 ...")
|
|
|
|
|
+ bufReader := bufio.NewReader(reader)
|
|
|
|
|
+ var state currentState
|
|
|
|
|
+ var err error
|
|
|
|
|
+ var line []byte // A line of data
|
|
|
|
|
+ for { // Infinite loop
|
|
|
|
|
+ var ioErr bool
|
|
|
|
|
+ line, ioErr, err = readLine(bufReader, &state)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ ch <- &Payload{
|
|
|
|
|
+ Err: err,
|
|
|
|
|
+ }
|
|
|
|
|
+ if ioErr {
|
|
|
|
|
+ close(ch)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ state = currentState{} // Reset state
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Check if it is multi-line mode
|
|
|
|
|
+ if !state.readingMultiLine {
|
|
|
|
|
+ if line[0] == '*' { // *3/r/n
|
|
|
|
|
+ err = parseMultiBulkHeader(line, &state)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ ch <- &Payload{
|
|
|
|
|
+ Err: errors.New("protocol error: " + string(line)),
|
|
|
|
|
+ }
|
|
|
|
|
+ state = currentState{} // Reset state
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if state.expectedArgsCount == 0 {
|
|
|
|
|
+ ch <- &Payload{
|
|
|
|
|
+ Data: reply.NewEmptyMultiBulkReply(),
|
|
|
|
|
+ }
|
|
|
|
|
+ state = currentState{} // Reset state
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ } else if line[0] == '$' { // $3/r/n
|
|
|
|
|
+ err = parseBulkHeader(line, &state)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ ch <- &Payload{
|
|
|
|
|
+ Err: errors.New("protocol error: " + string(line)),
|
|
|
|
|
+ }
|
|
|
|
|
+ state = currentState{} // Reset state
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ // $-1\r\n
|
|
|
|
|
+ if state.bulkLen == -1 {
|
|
|
|
|
+ ch <- &Payload{
|
|
|
|
|
+ Data: reply.NewNullBulkReply(),
|
|
|
|
|
+ }
|
|
|
|
|
+ state = currentState{} // Reset state
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ var result resp.Reply
|
|
|
|
|
+ result, err = parseSingleLineReply(line)
|
|
|
|
|
+ ch <- &Payload{
|
|
|
|
|
+ Data: result,
|
|
|
|
|
+ Err: err,
|
|
|
|
|
+ }
|
|
|
|
|
+ state = currentState{}
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // Parse multi-line mode
|
|
|
|
|
+ err = readBody(line, &state)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ ch <- &Payload{
|
|
|
|
|
+ Err: errors.New("protocol error: " + string(line)),
|
|
|
|
|
+ }
|
|
|
|
|
+ state = currentState{}
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if state.finished() {
|
|
|
|
|
+ var result resp.Reply
|
|
|
|
|
+ switch state.msgType {
|
|
|
|
|
+ case '*':
|
|
|
|
|
+ result = reply.NewMultiBulkReply(state.args)
|
|
|
|
|
+ case '$':
|
|
|
|
|
+ result = reply.NewBulkReply(state.args[0])
|
|
|
|
|
+ }
|
|
|
|
|
+ ch <- &Payload{
|
|
|
|
|
+ Data: result,
|
|
|
|
|
+ }
|
|
|
|
|
+ state = currentState{} // Reset state
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } // End for true
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// Read a line of data
|
|
|
|
|
+// The read data may be a complete line or an incomplete line
|
|
|
|
|
+//
|
|
|
|
|
+// Return values:
|
|
|
|
|
+// 1. Data read
|
|
|
|
|
+// 2. Whether an incomplete line was read
|
|
|
|
|
+// 3. Error message
|
|
|
|
|
+// For example: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
|
|
+// The data read may be: *3\r\n, $3\r\nSET\r\n, $3\r\nkey\r\n, $5\r\nvalue\r\n
|
|
|
|
|
+// The data read may be: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
|
|
+func readLine(reader *bufio.Reader, state *currentState) ([]byte, bool, error) {
|
|
|
|
|
+ var msg []byte
|
|
|
|
|
+ var err error
|
|
|
|
|
+ if state.bulkLen == 0 { // \r\n split
|
|
|
|
|
+ // Read a line of data
|
|
|
|
|
+ msg, err = reader.ReadBytes('\n')
|
|
|
|
|
+ logger.Info("\r\n***readLine: ", string(msg))
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, true, err
|
|
|
|
|
+ }
|
|
|
|
|
+ // Data that does not end with \r\n
|
|
|
|
|
+ if len(msg) == 0 || msg[len(msg)-2] != '\r' {
|
|
|
|
|
+ return nil, false, errors.New("readLine-protocol error: " + string(msg))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // ELSE previously read $number, strictly read number of bytes
|
|
|
|
|
+ msg = make([]byte, state.bulkLen+2)
|
|
|
|
|
+ _, err := io.ReadFull(reader, msg)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, true, err
|
|
|
|
|
+ }
|
|
|
|
|
+ // Data that does not end with \r\n
|
|
|
|
|
+ if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
|
|
|
|
|
+ return nil, false, errors.New("readLine-protocol error: " + string(msg))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Reset bulkLen
|
|
|
|
|
+ state.bulkLen = 0
|
|
|
|
|
+ }
|
|
|
|
|
+ return msg, false, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// Parse the header information of the string
|
|
|
|
|
+//
|
|
|
|
|
+// For example: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
|
|
+// Parsed as: *3\r\n
|
|
|
|
|
+func parseMultiBulkHeader(msg []byte, state *currentState) error {
|
|
|
|
|
+ var err error
|
|
|
|
|
+ var expectedLine uint64
|
|
|
|
|
+ expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return errors.New("protocol error: " + string(msg))
|
|
|
|
|
+ }
|
|
|
|
|
+ if expectedLine == 0 {
|
|
|
|
|
+ state.expectedArgsCount = 0
|
|
|
|
|
+ return nil
|
|
|
|
|
+ } else if expectedLine > 0 {
|
|
|
|
|
+ state.msgType = msg[0]
|
|
|
|
|
+ state.readingMultiLine = true
|
|
|
|
|
+ state.expectedArgsCount = int(expectedLine)
|
|
|
|
|
+ state.args = make([][]byte, 0, expectedLine)
|
|
|
|
|
+ return nil
|
|
|
|
|
+ } else {
|
|
|
|
|
+ return errors.New("protocol error: " + string(msg))
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// Single line string
|
|
|
|
|
+//
|
|
|
|
|
+// $4\r\nPING\r\n
|
|
|
|
|
+func parseBulkHeader(msg []byte, state *currentState) error {
|
|
|
|
|
+ var err error
|
|
|
|
|
+ // $ Start line, read bulkLen
|
|
|
|
|
+ state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return errors.New("protocol error: " + string(msg))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if state.bulkLen == -1 {
|
|
|
|
|
+ return nil
|
|
|
|
|
+ } else if state.bulkLen > 0 {
|
|
|
|
|
+ state.msgType = msg[0] // $
|
|
|
|
|
+ state.readingMultiLine = true
|
|
|
|
|
+ state.expectedArgsCount = 1
|
|
|
|
|
+ state.args = make([][]byte, 0, 1)
|
|
|
|
|
+ return nil
|
|
|
|
|
+ } else {
|
|
|
|
|
+ return errors.New("protocol error: " + string(msg))
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// Parse single line reply
|
|
|
|
|
+//
|
|
|
|
|
+// For example:
|
|
|
|
|
+// +OK\r\n
|
|
|
|
|
+// -err\r\n
|
|
|
|
|
+// :1\r\n
|
|
|
|
|
+func parseSingleLineReply(msg []byte) (resp.Reply, error) {
|
|
|
|
|
+ str := strings.TrimSuffix(string(msg), "\r\n") // Remove \r\n
|
|
|
|
|
+
|
|
|
|
|
+ var result resp.Reply
|
|
|
|
|
+ switch msg[0] {
|
|
|
|
|
+ case '+':
|
|
|
|
|
+ result = reply.NewStatusReply(str[1:])
|
|
|
|
|
+ case '-':
|
|
|
|
|
+ result = reply.NewErrReply(str[1:])
|
|
|
|
|
+ case ':':
|
|
|
|
|
+ val, err := strconv.ParseInt(str[1:], 10, 64)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, errors.New("protocol error: " + string(msg))
|
|
|
|
|
+ }
|
|
|
|
|
+ result = reply.NewIntReply(val)
|
|
|
|
|
+ default:
|
|
|
|
|
+ return nil, errors.New("protocol error: " + string(msg))
|
|
|
|
|
+ }
|
|
|
|
|
+ return result, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// readBody parses the message body part in the Redis protocol, handling bulk strings starting with $ or ordinary strings
|
|
|
|
|
+//
|
|
|
|
|
+// For example:
|
|
|
|
|
+// PING\r\n
|
|
|
|
|
+// SET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
|
|
+//
|
|
|
|
|
+// Parameters:
|
|
|
|
|
+//
|
|
|
|
|
+// msg: Complete message byte slice, including \r\n ending
|
|
|
|
|
+// state: Pointer to current parsing state, used to store parsing results
|
|
|
|
|
+//
|
|
|
|
|
+// Return value:
|
|
|
|
|
+//
|
|
|
|
|
+// error: Errors that occurred during parsing, such as protocol format errors, etc.
|
|
|
|
|
+func readBody(msg []byte, state *currentState) error {
|
|
|
|
|
+ line := msg[0 : len(msg)-2] // Remove \r\n
|
|
|
|
|
+
|
|
|
|
|
+ var err error
|
|
|
|
|
+
|
|
|
|
|
+ // Handle bulk string format starting with $
|
|
|
|
|
+ if line[0] == '$' {
|
|
|
|
|
+ state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return errors.New("protocol error: " + string(msg))
|
|
|
|
|
+ }
|
|
|
|
|
+ // Handle empty string case, $0\r\n represents empty string
|
|
|
|
|
+ if state.bulkLen <= 0 {
|
|
|
|
|
+ state.args = append(state.args, []byte{})
|
|
|
|
|
+ state.bulkLen = 0
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // Handle ordinary strings, directly add to argument list
|
|
|
|
|
+ state.args = append(state.args, line)
|
|
|
|
|
+ }
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|