|
|
@@ -1,8 +1,8 @@
|
|
|
// Author: simon (ynwdlxm@163.com)
|
|
|
// Date: 2025/7/11 15:58
|
|
|
-// Desc: 解析器
|
|
|
+// Desc: Parser for Redis protocol
|
|
|
//
|
|
|
-// 解析器用于将Redis协议的字节流解析为抽象的响应对象
|
|
|
+// The parser is used to parse the byte stream of the Redis protocol into abstract response objects
|
|
|
|
|
|
package parser
|
|
|
|
|
|
@@ -20,11 +20,11 @@ import (
|
|
|
"github.com/runningwater/go-redis/resp/reply"
|
|
|
)
|
|
|
|
|
|
-// Payload 解析器的载荷
|
|
|
-// 用于存储解析器的状态信息
|
|
|
+// Payload is the payload of the parser
|
|
|
+// Used to store the state information of the parser
|
|
|
type Payload struct {
|
|
|
- Data resp.Reply // 解析后的响应对象, 与返回数据结构一致
|
|
|
- Err error // 解析过程中发生的错误
|
|
|
+ Data resp.Reply // Parsed response object, consistent with the return data structure
|
|
|
+ Err error // Errors that occurred during parsing
|
|
|
}
|
|
|
|
|
|
func (p *Payload) String() string {
|
|
|
@@ -32,27 +32,27 @@ func (p *Payload) String() string {
|
|
|
}
|
|
|
|
|
|
type currentState struct {
|
|
|
- readingMultiLine bool // 是否正在读取多行数据
|
|
|
- expectedArgsCount int // 期望的参数数量
|
|
|
- msgType byte // 消息类型
|
|
|
- args [][]byte // 参数列表
|
|
|
- bulkLen int64 // 批量数据的长度
|
|
|
+ readingMultiLine bool // Whether multi-line data is being read
|
|
|
+ expectedArgsCount int // Expected number of arguments
|
|
|
+ msgType byte // Message type
|
|
|
+ args [][]byte // Argument list
|
|
|
+ bulkLen int64 // Length of bulk data
|
|
|
}
|
|
|
|
|
|
func (s *currentState) finished() bool {
|
|
|
return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
|
|
|
}
|
|
|
|
|
|
-// ParseStream 解析输入流中的数据
|
|
|
-// 解析器会根据Redis协议的规则解析输入流中的数据
|
|
|
-// 解析后的结果会通过通道发送给调用方
|
|
|
-// 参数:
|
|
|
+// ParseStream parses data in the input stream
|
|
|
+// The parser parses the data in the input stream according to the rules of the Redis protocol
|
|
|
+// The parsed results will be sent to the caller through the channel
|
|
|
+// Parameters:
|
|
|
//
|
|
|
-// reader io.Reader - 输入流读取器
|
|
|
+// reader io.Reader - Input stream reader
|
|
|
//
|
|
|
-// 返回值:
|
|
|
+// Return value:
|
|
|
//
|
|
|
-// <-chan *Payload - 解析结果通道,每个元素都是一个解析后的Payload对象
|
|
|
+// <-chan *Payload - Parse result channel, each element is a parsed Payload object
|
|
|
func ParseStream(reader io.Reader) <-chan *Payload {
|
|
|
logger.Debug("parse stream...")
|
|
|
ch := make(chan *Payload)
|
|
|
@@ -70,26 +70,23 @@ func parse0(reader io.Reader, ch chan<- *Payload) {
|
|
|
bufReader := bufio.NewReader(reader)
|
|
|
var state currentState
|
|
|
var err error
|
|
|
- var line []byte // 一行数据
|
|
|
- for { // 死循环
|
|
|
+ var line []byte // A line of data
|
|
|
+ for { // Infinite loop
|
|
|
var ioErr bool
|
|
|
line, ioErr, err = readLine(bufReader, &state)
|
|
|
if err != nil {
|
|
|
+ ch <- &Payload{
|
|
|
+ Err: err,
|
|
|
+ }
|
|
|
if ioErr {
|
|
|
- ch <- &Payload{
|
|
|
- Err: err,
|
|
|
- }
|
|
|
close(ch)
|
|
|
return
|
|
|
}
|
|
|
- ch <- &Payload{
|
|
|
- Err: err,
|
|
|
- }
|
|
|
- state = currentState{} // 重置状态
|
|
|
+ state = currentState{} // Reset state
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- // 判断是不是多行模式
|
|
|
+ // Check if it is multi-line mode
|
|
|
if !state.readingMultiLine {
|
|
|
if line[0] == '*' { // *3/r/n
|
|
|
err = parseMultiBulkHeader(line, &state)
|
|
|
@@ -97,14 +94,14 @@ func parse0(reader io.Reader, ch chan<- *Payload) {
|
|
|
ch <- &Payload{
|
|
|
Err: errors.New("protocol error: " + string(line)),
|
|
|
}
|
|
|
- state = currentState{} // 重置状态
|
|
|
+ state = currentState{} // Reset state
|
|
|
continue
|
|
|
}
|
|
|
if state.expectedArgsCount == 0 {
|
|
|
ch <- &Payload{
|
|
|
Data: reply.NewEmptyMultiBulkReply(),
|
|
|
}
|
|
|
- state = currentState{} // 重置状态
|
|
|
+ state = currentState{} // Reset state
|
|
|
continue
|
|
|
}
|
|
|
} else if line[0] == '$' { // $3/r/n
|
|
|
@@ -113,7 +110,7 @@ func parse0(reader io.Reader, ch chan<- *Payload) {
|
|
|
ch <- &Payload{
|
|
|
Err: errors.New("protocol error: " + string(line)),
|
|
|
}
|
|
|
- state = currentState{} // 重置状态
|
|
|
+ state = currentState{} // Reset state
|
|
|
continue
|
|
|
}
|
|
|
// $-1\r\n
|
|
|
@@ -121,7 +118,7 @@ func parse0(reader io.Reader, ch chan<- *Payload) {
|
|
|
ch <- &Payload{
|
|
|
Data: reply.NewNullBulkReply(),
|
|
|
}
|
|
|
- state = currentState{} // 重置状态
|
|
|
+ state = currentState{} // Reset state
|
|
|
continue
|
|
|
}
|
|
|
} else {
|
|
|
@@ -135,7 +132,7 @@ func parse0(reader io.Reader, ch chan<- *Payload) {
|
|
|
continue
|
|
|
}
|
|
|
} else {
|
|
|
- // 解析多行模式
|
|
|
+ // Parse multi-line mode
|
|
|
err = readBody(line, &state)
|
|
|
if err != nil {
|
|
|
ch <- &Payload{
|
|
|
@@ -154,62 +151,60 @@ func parse0(reader io.Reader, ch chan<- *Payload) {
|
|
|
}
|
|
|
ch <- &Payload{
|
|
|
Data: result,
|
|
|
- Err: err,
|
|
|
}
|
|
|
- state = currentState{} // 重置状态
|
|
|
+ state = currentState{} // Reset state
|
|
|
}
|
|
|
}
|
|
|
} // End for true
|
|
|
}
|
|
|
|
|
|
-// 读取一行数据
|
|
|
-// 读取到的数据可能是完整的一行,也可能是不完整的一行
|
|
|
+// Read a line of data
|
|
|
+// The read data may be a complete line or an incomplete line
|
|
|
//
|
|
|
-// 返回值:
|
|
|
-// 1. 读取到的数据
|
|
|
-// 2. 是否读取到了不完整的一行
|
|
|
-// 3. 错误信息
|
|
|
-// 例如: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
-// 读取到的数据可能是: *3\r\n, $3\r\nSET\r\n, $3\r\nkey\r\n, $5\r\nvalue\r\n
|
|
|
-// 读取到的数据可能是: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
+// Return values:
|
|
|
+// 1. Data read
|
|
|
+// 2. Whether an incomplete line was read
|
|
|
+// 3. Error message
|
|
|
+// For example: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
+// The data read may be: *3\r\n, $3\r\nSET\r\n, $3\r\nkey\r\n, $5\r\nvalue\r\n
|
|
|
+// The data read may be: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
func readLine(reader *bufio.Reader, state *currentState) ([]byte, bool, error) {
|
|
|
var msg []byte
|
|
|
var err error
|
|
|
- if state.bulkLen == 0 { // \r\n 切分
|
|
|
- // 读取一行数据
|
|
|
- reader.Reset(reader)
|
|
|
+ if state.bulkLen == 0 { // \r\n split
|
|
|
+ // Read a line of data
|
|
|
msg, err = reader.ReadBytes('\n')
|
|
|
logger.Info("\r\n***readLine: ", string(msg))
|
|
|
if err != nil {
|
|
|
return nil, true, err
|
|
|
}
|
|
|
- // 不是 \r\n 结尾的数据
|
|
|
+ // Data that does not end with \r\n
|
|
|
if len(msg) == 0 || msg[len(msg)-2] != '\r' {
|
|
|
return nil, false, errors.New("readLine-protocol error: " + string(msg))
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
- // ELSE 之前读取到了 $数字,严格读取数字个字节
|
|
|
+ // ELSE previously read $number, strictly read number of bytes
|
|
|
msg = make([]byte, state.bulkLen+2)
|
|
|
_, err := io.ReadFull(reader, msg)
|
|
|
if err != nil {
|
|
|
return nil, true, err
|
|
|
}
|
|
|
- // 不是 \r\n 结尾的数据
|
|
|
+ // Data that does not end with \r\n
|
|
|
if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
|
|
|
return nil, false, errors.New("readLine-protocol error: " + string(msg))
|
|
|
}
|
|
|
|
|
|
- // 重置 bulkLen
|
|
|
+ // Reset bulkLen
|
|
|
state.bulkLen = 0
|
|
|
}
|
|
|
return msg, false, nil
|
|
|
}
|
|
|
|
|
|
-// 解析串的头部信息
|
|
|
+// Parse the header information of the string
|
|
|
//
|
|
|
-// 例如: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
-// 解析为: *3\r\n
|
|
|
+// For example: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
+// Parsed as: *3\r\n
|
|
|
func parseMultiBulkHeader(msg []byte, state *currentState) error {
|
|
|
var err error
|
|
|
var expectedLine uint64
|
|
|
@@ -231,12 +226,12 @@ func parseMultiBulkHeader(msg []byte, state *currentState) error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// 单行字符串
|
|
|
+// Single line string
|
|
|
//
|
|
|
-// $4\r\nPING\r\n
|
|
|
+// $4\r\nPING\r\n
|
|
|
func parseBulkHeader(msg []byte, state *currentState) error {
|
|
|
var err error
|
|
|
- // $ 开头的行,读取 bulkLen
|
|
|
+ // $ Start line, read bulkLen
|
|
|
state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
|
|
|
if err != nil {
|
|
|
return errors.New("protocol error: " + string(msg))
|
|
|
@@ -255,14 +250,14 @@ func parseBulkHeader(msg []byte, state *currentState) error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// 解析单行回复
|
|
|
+// Parse single line reply
|
|
|
//
|
|
|
-// 例如:
|
|
|
-// +OK\r\n
|
|
|
-// -err\r\n
|
|
|
-// :1\r\n
|
|
|
+// For example:
|
|
|
+// +OK\r\n
|
|
|
+// -err\r\n
|
|
|
+// :1\r\n
|
|
|
func parseSingleLineReply(msg []byte) (resp.Reply, error) {
|
|
|
- str := strings.TrimSuffix(string(msg), "\r\n") // 去除 \r\n
|
|
|
+ str := strings.TrimSuffix(string(msg), "\r\n") // Remove \r\n
|
|
|
|
|
|
var result resp.Reply
|
|
|
switch msg[0] {
|
|
|
@@ -282,38 +277,38 @@ func parseSingleLineReply(msg []byte) (resp.Reply, error) {
|
|
|
return result, nil
|
|
|
}
|
|
|
|
|
|
-// readBody 解析Redis协议中的消息体部分,处理以$开头的bulk字符串或普通字符串
|
|
|
+// readBody parses the message body part in the Redis protocol, handling bulk strings starting with $ or ordinary strings
|
|
|
//
|
|
|
-// 例如:
|
|
|
-// PING\r\n
|
|
|
-// SET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
+// For example:
|
|
|
+// PING\r\n
|
|
|
+// SET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
|
|
//
|
|
|
-// 参数:
|
|
|
+// Parameters:
|
|
|
//
|
|
|
-// msg: 完整的消息字节切片,包含\r\n结尾
|
|
|
-// state: 当前解析状态的指针,用于存储解析结果
|
|
|
+// msg: Complete message byte slice, including \r\n ending
|
|
|
+// state: Pointer to current parsing state, used to store parsing results
|
|
|
//
|
|
|
-// 返回值:
|
|
|
+// Return value:
|
|
|
//
|
|
|
-// error: 解析过程中出现的错误,如协议格式错误等
|
|
|
+// error: Errors that occurred during parsing, such as protocol format errors, etc.
|
|
|
func readBody(msg []byte, state *currentState) error {
|
|
|
- line := msg[:len(msg)-2] // 去除 \r\n
|
|
|
+ line := msg[0 : len(msg)-2] // Remove \r\n
|
|
|
|
|
|
var err error
|
|
|
|
|
|
- // 处理以$开头的bulk字符串格式
|
|
|
+ // Handle bulk string format starting with $
|
|
|
if line[0] == '$' {
|
|
|
state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
|
|
|
if err != nil {
|
|
|
return errors.New("protocol error: " + string(msg))
|
|
|
}
|
|
|
- // 处理空字符串情况,$0\r\n表示空字符串
|
|
|
+ // Handle empty string case, $0\r\n represents empty string
|
|
|
if state.bulkLen <= 0 {
|
|
|
state.args = append(state.args, []byte{})
|
|
|
state.bulkLen = 0
|
|
|
}
|
|
|
} else {
|
|
|
- // 处理普通字符串,直接添加到参数列表中
|
|
|
+ // Handle ordinary strings, directly add to argument list
|
|
|
state.args = append(state.args, line)
|
|
|
}
|
|
|
return nil
|