From 2d25de4a7a94e0b9765d0031b6cb0cb4011b72e7 Mon Sep 17 00:00:00 2001 From: AI Engineer Date: Tue, 12 May 2026 23:53:22 +0800 Subject: [PATCH] refactor: simplify Redis structure to group by Host and support full-host updates via simplified PubSub payload (by AI) --- app.go | 119 ++++++++++++++++++++-------------------------------- app_test.go | 15 ++++--- go.mod | 6 +-- 3 files changed, 55 insertions(+), 85 deletions(-) diff --git a/app.go b/app.go index e7b11bf..45e6598 100644 --- a/app.go +++ b/app.go @@ -6,15 +6,9 @@ import ( "apigo.cc/go/redis" "apigo.cc/go/service" "context" + "strings" ) -// EventMessage 定义从 Redis 接收到的更新事件结构 -type EventMessage struct { - Action string `json:"action"` // "update" - Type string `json:"type"` // "proxy", "rewrite", "static" - Host string `json:"host"` -} - // Config 定义 Gateway 基础配置 type Config struct { Redis string // Redis 注册中心/配置中心 URL @@ -30,9 +24,6 @@ var GatewayConf = Config{ // GatewayApp 定义网关应用 type GatewayApp struct { rd *redis.Redis - proxiesKey string - rewritesKey string - staticsKey string pubsubChannel string cancelPubSub context.CancelFunc } @@ -40,9 +31,6 @@ type GatewayApp struct { // NewGatewayApp 创建 Gateway func NewGatewayApp() *GatewayApp { g := &GatewayApp{} - g.proxiesKey = GatewayConf.Prefix + ":proxies" - g.rewritesKey = GatewayConf.Prefix + ":rewrites" - g.staticsKey = GatewayConf.Prefix + ":statics" g.pubsubChannel = GatewayConf.Channel return g } @@ -77,7 +65,7 @@ func (g *GatewayApp) Init() error { return nil } -// loadAll 初始化阶段从 Redis 的 Hash 表拉取全量配置并解析 +// loadAll 初始化阶段从 Redis 扫描所有网关配置并加载 func (g *GatewayApp) loadAll() { if g.rd == nil { return @@ -85,79 +73,62 @@ func (g *GatewayApp) loadAll() { log.DefaultLogger.Info("gateway loading full configuration") - // 1. Proxies - proxiesHash := g.rd.Do("HGETALL", g.proxiesKey).StringMap() - for host, jsonStr := range proxiesHash { - var rules []service.ProxyRule - _ = cast.UnmarshalJSON([]byte(jsonStr), &rules) - service.ReplaceProxies(host, rules) - } - - // 2. Rewrites - rewritesHash := g.rd.Do("HGETALL", g.rewritesKey).StringMap() - for host, jsonStr := range rewritesHash { - var rules []service.RewriteRule - _ = cast.UnmarshalJSON([]byte(jsonStr), &rules) - service.ReplaceRewrites(host, rules) - } - - // 3. Statics - staticsHash := g.rd.Do("HGETALL", g.staticsKey).StringMap() - for host, jsonStr := range staticsHash { - var config map[string]string - _ = cast.UnmarshalJSON([]byte(jsonStr), &config) - if config != nil { - service.ReplaceStatics(host, config) - } + // 为了简化,由于 Redis KEYS 命令在线上不推荐,更好的方式是 Gateway 启动时只加载本地固定配置, + // 动态配置按需拉取?不,网关启动必须拉取全量。 + // 但如果我们改成以 host 为 Key,就需要维护一个 Set 存放所有的 host,或者直接遍历 KEYS gateway:* + // 为了最佳实践,我们在 redis 维护一个 set: `gateway:hosts` 存放所有的域名 + hosts := g.rd.Do("SMEMBERS", GatewayConf.Prefix+":hosts").Strings() + for _, host := range hosts { + g.loadHost(host) } } -// loadHost 仅加载指定 Host 下的指定类型配置 -func (g *GatewayApp) loadHost(typ string, host string) { - if g.rd == nil { +// loadHost 加载指定 Host 下的所有类型配置 +func (g *GatewayApp) loadHost(host string) { + if g.rd == nil || host == "" { return } - switch typ { - case "proxy": - jsonStr := g.rd.Do("HGET", g.proxiesKey, host).String() - var rules []service.ProxyRule - if jsonStr != "" { - _ = cast.UnmarshalJSON([]byte(jsonStr), &rules) - } - service.ReplaceProxies(host, rules) - log.DefaultLogger.Info("gateway proxy updated via pub/sub", "host", host, "count", len(rules)) - case "rewrite": - jsonStr := g.rd.Do("HGET", g.rewritesKey, host).String() - var rules []service.RewriteRule - if jsonStr != "" { - _ = cast.UnmarshalJSON([]byte(jsonStr), &rules) - } - service.ReplaceRewrites(host, rules) - log.DefaultLogger.Info("gateway rewrite updated via pub/sub", "host", host, "count", len(rules)) - case "static": - jsonStr := g.rd.Do("HGET", g.staticsKey, host).String() - var config map[string]string - if jsonStr != "" { - _ = cast.UnmarshalJSON([]byte(jsonStr), &config) - } - if config == nil { - config = make(map[string]string) - } - service.ReplaceStatics(host, config) - log.DefaultLogger.Info("gateway static updated via pub/sub", "host", host, "count", len(config)) + hashKey := GatewayConf.Prefix + ":host:" + host + hashMap := g.rd.Do("HGETALL", hashKey).StringMap() + + // 1. Proxies + var proxyRules []service.ProxyRule + if jsonStr := hashMap["proxies"]; jsonStr != "" { + _ = cast.UnmarshalJSON([]byte(jsonStr), &proxyRules) } + service.ReplaceProxies(host, proxyRules) + + // 2. Rewrites + var rewriteRules []service.RewriteRule + if jsonStr := hashMap["rewrites"]; jsonStr != "" { + _ = cast.UnmarshalJSON([]byte(jsonStr), &rewriteRules) + } + service.ReplaceRewrites(host, rewriteRules) + + // 3. Statics + var staticConfig map[string]string + if jsonStr := hashMap["statics"]; jsonStr != "" { + _ = cast.UnmarshalJSON([]byte(jsonStr), &staticConfig) + } + if staticConfig == nil { + staticConfig = make(map[string]string) + } + service.ReplaceStatics(host, staticConfig) + + log.DefaultLogger.Info("gateway host configuration updated via pub/sub", "host", host, + "proxies", len(proxyRules), "rewrites", len(rewriteRules), "statics", len(staticConfig)) } func (g *GatewayApp) subscribe(ctx context.Context) { log.DefaultLogger.Info("gateway subscribed to redis channel", "channel", g.pubsubChannel) g.rd.Subscribe(g.pubsubChannel, nil, func(data []byte) { - var msg EventMessage - if err := cast.UnmarshalJSON(data, &msg); err == nil && msg.Action == "update" { + host := string(data) + host = strings.TrimSpace(strings.Trim(host, "\"")) // 兼容裸字符串和 JSON 字符串 + + if host != "" { // 触发对应 Host 的局部刷新 - g.loadHost(msg.Type, msg.Host) - } else { - log.DefaultLogger.Warning("gateway received invalid pub/sub message", "data", string(data)) + g.loadHost(host) } }) diff --git a/app_test.go b/app_test.go index 20a1ebf..b6defd9 100644 --- a/app_test.go +++ b/app_test.go @@ -24,7 +24,7 @@ func TestGateway(t *testing.T) { } // 清理测试数据 - rd.Do("DEL", "gateway:proxies", "gateway:rewrites", "gateway:statics") + rd.Do("DEL", "gateway:host:gw.test", "gateway:hosts") // 2. 启动一个后端测试服务 test-backend service.Config.App = "test-backend" @@ -56,8 +56,9 @@ func TestGateway(t *testing.T) { proxyJson, _ := json.Marshal(proxyRules) rewriteJson, _ := json.Marshal(rewriteRules) - rd.Do("HSET", "gateway:proxies", "gw.test", string(proxyJson)) - rd.Do("HSET", "gateway:rewrites", "gw.test", string(rewriteJson)) + rd.Do("SADD", "gateway:hosts", "gw.test") + rd.Do("HSET", "gateway:host:gw.test", "proxies", string(proxyJson)) + rd.Do("HSET", "gateway:host:gw.test", "rewrites", string(rewriteJson)) // 创建临时静态目录 tmpDir, _ := os.MkdirTemp("", "gateway-static") @@ -68,7 +69,7 @@ func TestGateway(t *testing.T) { "/ui": tmpDir, } staticJson, _ := json.Marshal(staticConfig) - rd.Do("HSET", "gateway:statics", "gw.test", string(staticJson)) + rd.Do("HSET", "gateway:host:gw.test", "statics", string(staticJson)) // 4. 启动 Gateway 应用 app := NewGatewayApp() @@ -168,12 +169,10 @@ func TestGateway(t *testing.T) { {Path: "/new-direct", ToApp: "test-backend", ToPath: "/hello"}, } newProxyJson, _ := json.Marshal(newProxyRules) - rd.Do("HSET", "gateway:proxies", "gw.test", string(newProxyJson)) + rd.Do("HSET", "gateway:host:gw.test", "proxies", string(newProxyJson)) // 发布更新消息 - msg := EventMessage{Action: "update", Type: "proxy", Host: "gw.test"} - msgJson, _ := json.Marshal(msg) - rd.Do("PUBLISH", "gateway:channel", string(msgJson)) + rd.Do("PUBLISH", "gateway:channel", `"gw.test"`) // 等待接收和更新 time.Sleep(300 * time.Millisecond) diff --git a/go.mod b/go.mod index 8755b28..5e9ba23 100644 --- a/go.mod +++ b/go.mod @@ -5,23 +5,23 @@ go 1.25.0 require ( apigo.cc/go/cast v1.3.1 apigo.cc/go/config v1.3.0 - apigo.cc/go/discover v1.3.0 - apigo.cc/go/http v1.3.0 apigo.cc/go/log v1.3.1 apigo.cc/go/redis v1.3.0 apigo.cc/go/service v1.3.1 apigo.cc/go/starter v1.0.1 - apigo.cc/go/timer v1.3.0 ) require ( apigo.cc/go/crypto v1.3.0 // indirect + apigo.cc/go/discover v1.3.0 // indirect apigo.cc/go/encoding v1.3.0 // indirect apigo.cc/go/file v1.3.0 // indirect + apigo.cc/go/http v1.3.0 // indirect apigo.cc/go/id v1.3.0 // indirect apigo.cc/go/rand v1.3.0 // indirect apigo.cc/go/safe v1.3.0 // indirect apigo.cc/go/shell v1.3.0 // indirect + apigo.cc/go/timer v1.3.0 // indirect github.com/gomodule/redigo v1.9.3 // indirect github.com/gorilla/websocket v1.5.3 // indirect golang.org/x/crypto v0.51.0 // indirect