runningwater il y a 7 mois
commit
19c1e8d18b
12 fichiers modifiés avec 742 ajouts et 0 suppressions
  1. 162 0
      .gitignore
  2. 107 0
      config/config.go
  3. 3 0
      go.mod
  4. 14 0
      interface/tcp/handler.go
  5. 53 0
      lib/logger/files.go
  6. 116 0
      lib/logger/logger.go
  7. 20 0
      lib/sync/atomic/bool.go
  8. 43 0
      lib/sync/wait/wait.go
  9. 52 0
      main.go
  10. 9 0
      redis.conf
  11. 86 0
      tcp/echo.go
  12. 77 0
      tcp/server.go

+ 162 - 0
.gitignore

@@ -0,0 +1,162 @@
+### GoLand+all template
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
+# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+
+.dist
+
+# User-specific stuff
+.idea/
+
+# IntelliJ
+out/*
+
+# mpeltonen/sbt-idea plugin
+.idea_modules/
+
+# JIRA plugin
+atlassian-ide-plugin.xml
+
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+
+
+### IGORPro template
+# Avoid including Experiment files: they can be created and edited locally to test the ipf files
+*.pxp
+*.pxt
+*.uxp
+*.uxt
+
+### VisualStudioCode template
+.vscode/*
+!.vscode/settings.json
+!.vscode/tasks.json
+!.vscode/launch.json
+!.vscode/extensions.json
+!.vscode/*.code-snippets
+
+# Local History for Visual Studio Code
+.history/
+
+# Built Visual Studio Code Extensions
+*.vsix
+
+### Go template
+# If you prefer the allow list template instead of the deny list, see community template:
+# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
+#
+# Binaries for programs and plugins
+*.exe
+*.exe~
+*.dll
+*.so
+*.dylib
+
+# Test binary, built with `go test -c`
+*.test
+
+# Output of the go coverage tool, specifically when used with LiteIDE
+*.out
+
+# Dependency directories (remove the comment below to include it)
+# vendor/
+
+# Go workspace file
+go.work
+
+### IDAPro template
+# gitginore template for IDA Pro
+# website: https://www.hex-rays.com/index.shtml
+
+# IDA Pro Runtime temporary file
+*.id0
+*.id1
+*.id2
+*.nam
+*.til
+
+# IDA Pro 64 packaged data
+# *.i64
+# IDA Pro 32 packaged data
+# *.idb
+
+### GoLand template
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
+# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+
+
+### macOS template
+# General
+.DS_Store
+.AppleDouble
+.LSOverride
+
+# Icon must end with two \r
+Icon
+
+# Thumbnails
+._*
+
+# Files that might appear in the root of a volume
+.DocumentRevisions-V100
+.fseventsd
+.Spotlight-V100
+.TemporaryItems
+.Trashes
+.VolumeIcon.icns
+.com.apple.timemachine.donotpresent
+
+# Directories potentially created on remote AFP share
+.AppleDB
+.AppleDesktop
+Network Trash Folder
+Temporary Items
+.apdisk
+
+### SublimeText template
+# Cache files for Sublime Text
+*.tmlanguage.cache
+*.tmPreferences.cache
+*.stTheme.cache
+
+# Workspace files are user-specific
+*.sublime-workspace
+
+# Project files should be checked into the repository, unless a significant
+# proportion of contributors will probably not be using Sublime Text
+# *.sublime-project
+
+# SFTP configuration file
+sftp-config.json
+sftp-config-alt*.json
+
+# Package control specific files
+Package Control.last-run
+Package Control.ca-list
+Package Control.ca-bundle
+Package Control.system-ca-bundle
+Package Control.cache/
+Package Control.ca-certs/
+Package Control.merged-ca-bundle
+Package Control.user-ca-bundle
+oscrypto-ca-bundle.crt
+bh_unicode_properties.cache
+
+# Sublime-github package stores a github token in this file
+# https://packagecontrol.io/packages/sublime-github
+GitHub.sublime-settings
+
+
+
+logs/
+tmp/

+ 107 - 0
config/config.go

@@ -0,0 +1,107 @@
+package config
+
+import (
+	"bufio"
+
+	"io"
+	"os"
+	"reflect"
+	"strconv"
+	"strings"
+
+	"github.com/runningwater/go-redis/lib/logger"
+)
+
+// ServerProperties defines global config properties
+type ServerProperties struct {
+	Bind           string `cfg:"bind"`
+	Port           int    `cfg:"port"`
+	AppendOnly     bool   `cfg:"appendOnly"`
+	AppendFilename string `cfg:"appendFilename"`
+	MaxClients     int    `cfg:"maxclients"`
+	RequirePass    string `cfg:"requirepass"`
+	Databases      int    `cfg:"databases"`
+
+	Peers []string `cfg:"peers"`
+	Self  string   `cfg:"self"`
+}
+
+// Properties holds global config properties
+var Properties *ServerProperties
+
+func init() {
+	// default config
+	Properties = &ServerProperties{
+		Bind:       "127.0.0.1",
+		Port:       6379,
+		AppendOnly: false,
+	}
+}
+
+func parse(src io.Reader) *ServerProperties {
+	config := &ServerProperties{}
+
+	// read config file
+	rawMap := make(map[string]string)
+	scanner := bufio.NewScanner(src)
+	for scanner.Scan() {
+		line := scanner.Text()
+		if len(line) > 0 && line[0] == '#' {
+			continue
+		}
+		pivot := strings.IndexAny(line, " ")
+		if pivot > 0 && pivot < len(line)-1 { // separator found
+			key := line[0:pivot]
+			value := strings.Trim(line[pivot+1:], " ")
+			rawMap[strings.ToLower(key)] = value
+		}
+	}
+	if err := scanner.Err(); err != nil {
+		logger.Fatal(err)
+	}
+
+	// parse format
+	t := reflect.TypeOf(config)
+	v := reflect.ValueOf(config)
+	n := t.Elem().NumField()
+	for i := 0; i < n; i++ {
+		field := t.Elem().Field(i)
+		fieldVal := v.Elem().Field(i)
+		key, ok := field.Tag.Lookup("cfg")
+		if !ok {
+			key = field.Name
+		}
+		value, ok := rawMap[strings.ToLower(key)]
+		if ok {
+			// fill config
+			switch field.Type.Kind() {
+			case reflect.String:
+				fieldVal.SetString(value)
+			case reflect.Int:
+				intValue, err := strconv.ParseInt(value, 10, 64)
+				if err == nil {
+					fieldVal.SetInt(intValue)
+				}
+			case reflect.Bool:
+				boolValue := "yes" == value
+				fieldVal.SetBool(boolValue)
+			case reflect.Slice:
+				if field.Type.Elem().Kind() == reflect.String {
+					slice := strings.Split(value, ",")
+					fieldVal.Set(reflect.ValueOf(slice))
+				}
+			}
+		}
+	}
+	return config
+}
+
+// SetupConfig read config file and store properties into Properties
+func SetupConfig(configFilename string) {
+	file, err := os.Open(configFilename)
+	if err != nil {
+		panic(err)
+	}
+	defer file.Close()
+	Properties = parse(file)
+}

+ 3 - 0
go.mod

@@ -0,0 +1,3 @@
+module github.com/runningwater/go-redis
+
+go 1.24.2

+ 14 - 0
interface/tcp/handler.go

@@ -0,0 +1,14 @@
+package tcp
+
+import (
+	"context"
+	"net"
+)
+
+// Handler 业务逻辑处理接口
+// 当客户端连接建立时,会调用该接口的Handle方法
+// 当客户端连接断开时,会调用该接口的Close方法
+type Handler interface {
+	Handle(ctx context.Context, conn net.Conn)
+	Close() error
+}

+ 53 - 0
lib/logger/files.go

@@ -0,0 +1,53 @@
+package logger
+
+import (
+	"fmt"
+	"os"
+)
+
+func checkNotExist(src string) bool {
+	_, err := os.Stat(src)
+	return os.IsNotExist(err)
+}
+
+func checkPermission(src string) bool {
+	_, err := os.Stat(src)
+	return os.IsPermission(err)
+}
+
+func isNotExistMkDir(src string) error {
+	if notExist := checkNotExist(src); notExist == true {
+		if err := mkDir(src); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func mkDir(src string) error {
+	err := os.MkdirAll(src, os.ModePerm)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func mustOpen(fileName, dir string) (*os.File, error) {
+	perm := checkPermission(dir)
+	if perm == true {
+		return nil, fmt.Errorf("permission denied dir: %s", dir)
+	}
+
+	err := isNotExistMkDir(dir)
+	if err != nil {
+		return nil, fmt.Errorf("error during make dir %s, err: %s", dir, err)
+	}
+
+	f, err := os.OpenFile(dir+string(os.PathSeparator)+fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
+	if err != nil {
+		return nil, fmt.Errorf("fail to open file, err: %s", err)
+	}
+
+	return f, nil
+}

+ 116 - 0
lib/logger/logger.go

@@ -0,0 +1,116 @@
+package logger
+
+import (
+	"fmt"
+	"io"
+	"log"
+	"os"
+	"path/filepath"
+	"runtime"
+	"sync"
+	"time"
+)
+
+// Settings stores config for logger
+type Settings struct {
+	Path       string `yaml:"path"`
+	Name       string `yaml:"name"`
+	Ext        string `yaml:"ext"`
+	TimeFormat string `yaml:"time-format"`
+}
+
+var (
+	logFile            *os.File
+	defaultPrefix      = ""
+	defaultCallerDepth = 2
+	logger             *log.Logger
+	mu                 sync.Mutex
+	logPrefix          = ""
+	levelFlags         = []string{"DEBUG", "INFO", "WARN", "ERROR", "FATAL"}
+)
+
+type logLevel int
+
+// log levels
+const (
+	DEBUG logLevel = iota
+	INFO
+	WARNING
+	ERROR
+	FATAL
+)
+
+const flags = log.LstdFlags
+
+func init() {
+	logger = log.New(os.Stdout, defaultPrefix, flags)
+}
+
+// Setup initializes logger
+func Setup(settings *Settings) {
+	var err error
+	dir := settings.Path
+	fileName := fmt.Sprintf("%s-%s.%s",
+		settings.Name,
+		time.Now().Format(settings.TimeFormat),
+		settings.Ext)
+
+	logFile, err := mustOpen(fileName, dir)
+	if err != nil {
+		log.Fatalf("logging.Setup err: %s", err)
+	}
+
+	mw := io.MultiWriter(os.Stdout, logFile)
+	logger = log.New(mw, defaultPrefix, flags)
+}
+
+func setPrefix(level logLevel) {
+	_, file, line, ok := runtime.Caller(defaultCallerDepth)
+	if ok {
+		logPrefix = fmt.Sprintf("[%s][%s:%d] ", levelFlags[level], filepath.Base(file), line)
+	} else {
+		logPrefix = fmt.Sprintf("[%s] ", levelFlags[level])
+	}
+
+	logger.SetPrefix(logPrefix)
+}
+
+// Debug prints debug log
+func Debug(v ...any) {
+	mu.Lock()
+	defer mu.Unlock()
+	setPrefix(DEBUG)
+	logger.Println(v...)
+}
+
+// Info prints normal log
+func Info(v ...any) {
+	mu.Lock()
+	defer mu.Unlock()
+	setPrefix(INFO)
+	logger.Println(v...)
+}
+
+// Warn prints warning log
+func Warn(v ...any) {
+	mu.Lock()
+	defer mu.Unlock()
+	setPrefix(WARNING)
+	logger.Println(v...)
+}
+
+// Error prints error log
+func Error(v ...any) {
+	mu.Lock()
+	defer mu.Unlock()
+	setPrefix(ERROR)
+	logger.Println(v...)
+}
+
+// Fatal prints error log then stop the program
+func Fatal(v ...any) {
+	mu.Lock()
+	defer mu.Unlock()
+	setPrefix(FATAL)
+	logger.Fatalln(v...)
+}

+ 20 - 0
lib/sync/atomic/bool.go

@@ -0,0 +1,20 @@
+package atomic
+
+import "sync/atomic"
+
+// Boolean is a boolean value, all actions of it is atomic
+type Boolean uint32
+
+// Get reads the value atomically
+func (b *Boolean) Get() bool {
+	return atomic.LoadUint32((*uint32)(b)) != 0
+}
+
+// Set writes the value atomically
+func (b *Boolean) Set(v bool) {
+	if v {
+		atomic.StoreUint32((*uint32)(b), 1)
+	} else {
+		atomic.StoreUint32((*uint32)(b), 0)
+	}
+}

+ 43 - 0
lib/sync/wait/wait.go

@@ -0,0 +1,43 @@
+package wait
+
+import (
+	"sync"
+	"time"
+)
+
+// Wait is similar with sync.WaitGroup which can wait with timeout
+type Wait struct {
+	wg sync.WaitGroup
+}
+
+// Add adds delta, which may be negative, to the WaitGroup counter.
+func (w *Wait) Add(delta int) {
+	w.wg.Add(delta)
+}
+
+// Done decrements the WaitGroup counter by one
+func (w *Wait) Done() {
+	w.wg.Done()
+}
+
+// Wait blocks until the WaitGroup counter is zero.
+func (w *Wait) Wait() {
+	w.wg.Wait()
+}
+
+// WaitWithTimeout blocks until the WaitGroup counter is zero or timeout
+// returns true if timeout
+func (w *Wait) WaitWithTimeout(timeout time.Duration) bool {
+	c := make(chan bool, 1)
+	go func() {
+		defer close(c)
+		w.wg.Wait()
+		c <- true
+	}()
+	select {
+	case <-c:
+		return false // completed normally
+	case <-time.After(timeout):
+		return true // timed out
+	}
+}

+ 52 - 0
main.go

@@ -0,0 +1,52 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/5/26 11:33
+// Desc:
+
+package main
+
+import (
+	"fmt"
+	"os"
+
+	"github.com/runningwater/go-redis/config"
+	"github.com/runningwater/go-redis/lib/logger"
+	"github.com/runningwater/go-redis/tcp"
+)
+
+const configFile string = "redis.conf"
+
+var defaultProperties = &config.ServerProperties{
+	Bind: "0.0.0.0",
+	Port: 6379,
+}
+
+func fileExists(fileName string) bool {
+	info, err := os.Stat(fileName)
+	return err == nil && !info.IsDir()
+}
+
+func main() {
+	logger.Setup(&logger.Settings{
+		Path:       "logs",
+		Name:       "go-redis",
+		Ext:        "log",
+		TimeFormat: "2006-01-02",
+	})
+	if fileExists(configFile) {
+		config.SetupConfig(configFile)
+	} else {
+		config.Properties = defaultProperties
+	}
+
+	logger.Info("start server")
+
+	err := tcp.ListenAndServeWithSignal(
+		&tcp.Config{
+			Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port),
+		},
+		tcp.NewEcho(),
+	)
+	if err != nil {
+		logger.Error(err)
+	}
+}

+ 9 - 0
redis.conf

@@ -0,0 +1,9 @@
+bind 0.0.0.0
+protected-mode no
+port 6379
+daemonize yes
+pidfile /var/run/redis_6379.pid
+logfile "/var/log/redis_6379.log"
+dir "/var/lib/redis/6379"
+maxmemory 1gb
+maxmemory-policy allkeys-lru

+ 86 - 0
tcp/echo.go

@@ -0,0 +1,86 @@
+// Author: simon (ynwdlxm@163.com)
+// Date: 2025/5/26 10:52
+// Desc:
+
+package tcp
+
+import (
+	"bufio"
+	"context"
+	"io"
+	"net"
+	"sync"
+	"time"
+
+	"github.com/runningwater/go-redis/lib/logger"
+	"github.com/runningwater/go-redis/lib/sync/atomic"
+	"github.com/runningwater/go-redis/lib/sync/wait"
+)
+
+// EchoClient 客户端信息
+type EchoClient struct {
+	Conn    net.Conn
+	Waiting wait.Wait
+}
+
+func (e *EchoClient) Close() error {
+	e.Waiting.WaitWithTimeout(10 * time.Second)
+	_ = e.Conn.Close()
+	return nil
+}
+
+type Echo struct {
+	activeConn sync.Map
+	closing    atomic.Boolean
+}
+
+func NewEcho() *Echo {
+	return &Echo{
+		activeConn: sync.Map{},
+		closing:    0,
+	}
+}
+
+func (e *Echo) Handle(ctx context.Context, conn net.Conn) {
+	if e.closing.Get() {
+		_ = conn.Close()
+	}
+	// 记录客户端信息
+	client := &EchoClient{
+		Conn: conn,
+	}
+	e.activeConn.Store(client, struct{}{})
+
+	reader := bufio.NewReader(conn)
+	for {
+		msg, err := reader.ReadString('\n')
+		if err != nil {
+			if err == io.EOF {
+				logger.Info("Connection close")
+				e.activeConn.Delete(client)
+			} else {
+				logger.Warn(err)
+			}
+			return
+		}
+
+		client.Waiting.Add(1)
+		b := []byte(msg)
+		_, _ = conn.Write(b)
+		client.Waiting.Done()
+	}
+}
+
+func (e *Echo) Close() error {
+	logger.Info("handler shutting down")
+	e.closing.Set(true)
+
+	// 剔除所有连接
+	e.activeConn.Range(func(key, value any) bool {
+		client := key.(*EchoClient)
+		_ = client.Conn.Close()
+		return true
+	})
+
+	return nil
+}

+ 77 - 0
tcp/server.go

@@ -0,0 +1,77 @@
+package tcp
+
+import (
+	"context"
+	"net"
+	"os"
+	"os/signal"
+	"sync"
+	"syscall"
+
+	"github.com/runningwater/go-redis/interface/tcp"
+	"github.com/runningwater/go-redis/lib/logger"
+)
+
+type Config struct {
+	Address string
+}
+
+func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
+
+	closeChan := make(chan struct{})
+	sigChan := make(chan os.Signal)
+	signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
+	go func() {
+		sig := <-sigChan
+		switch sig {
+		case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
+			closeChan <- struct{}{}
+		}
+	}()
+
+	listener, err := net.Listen("tcp", cfg.Address)
+	if err != nil {
+		return err
+	}
+	logger.Info("start listening")
+
+	return ListenAndServe(listener, handler, closeChan)
+
+}
+
+func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) error {
+
+	go func() {
+		<-closeChan
+		logger.Info("shutting down...")
+		_ = listener.Close()
+		_ = handler.Close()
+	}()
+
+	defer func() {
+		_ = listener.Close()
+		_ = handler.Close()
+	}()
+
+	ctx := context.Background()
+	var waitDone sync.WaitGroup
+
+	for {
+		conn, err := listener.Accept()
+		if err != nil {
+			break
+		}
+
+		logger.Info("accept link")
+		waitDone.Add(1)
+		go func() {
+			defer func() {
+				waitDone.Done()
+			}()
+			handler.Handle(ctx, conn)
+		}()
+	} // end for
+	waitDone.Wait()
+
+	return nil
+}