refactor: simplify Redis structure to group by Host and support full-host updates via simplified PubSub payload (by AI)
This commit is contained in:
parent
2d25de4a7a
commit
0984b688be
113
app.go
113
app.go
@ -11,80 +11,100 @@ import (
|
|||||||
|
|
||||||
// Config 定义 Gateway 基础配置
|
// Config 定义 Gateway 基础配置
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Redis string // Redis 注册中心/配置中心 URL
|
Redis string // Redis 配置中心 URL (与发现服务的注册中心独立,若为空则不启用动态配置)
|
||||||
Prefix string // Redis Key 的前缀
|
Prefix string // Redis Key 的前缀
|
||||||
Channel string // Redis Pub/Sub 通道名称
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var GatewayConf = Config{
|
var GatewayConf = Config{
|
||||||
Prefix: "gateway",
|
Prefix: "gateway",
|
||||||
Channel: "gateway:channel",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GatewayApp 定义网关应用
|
// GatewayApp 定义网关应用,融合了 WebServer 的原生能力
|
||||||
type GatewayApp struct {
|
type GatewayApp struct {
|
||||||
rd *redis.Redis
|
*service.WebServer
|
||||||
pubsubChannel string
|
rd *redis.Redis
|
||||||
cancelPubSub context.CancelFunc
|
pubsubChannel string
|
||||||
|
cancelPubSub context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGatewayApp 创建 Gateway
|
// NewGatewayApp 创建 Gateway
|
||||||
func NewGatewayApp() *GatewayApp {
|
func NewGatewayApp() *GatewayApp {
|
||||||
g := &GatewayApp{}
|
return &GatewayApp{
|
||||||
g.pubsubChannel = GatewayConf.Channel
|
WebServer: service.NewWebServer(),
|
||||||
return g
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init 初始化 Gateway,从 Redis 加载并订阅更新
|
// Start 启动网关服务 (实现 starter.Service)
|
||||||
func (g *GatewayApp) Init() error {
|
func (g *GatewayApp) Start(ctx context.Context, logger *log.Logger) error {
|
||||||
|
if GatewayConf.Prefix == "" {
|
||||||
|
GatewayConf.Prefix = "gateway"
|
||||||
|
}
|
||||||
|
g.pubsubChannel = GatewayConf.Prefix + ":channel"
|
||||||
|
|
||||||
if GatewayConf.Redis != "" {
|
if GatewayConf.Redis != "" {
|
||||||
g.rd = redis.GetRedis(GatewayConf.Redis, log.DefaultLogger)
|
g.rd = redis.GetRedis(GatewayConf.Redis, logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
if g.rd != nil && g.rd.Error != nil {
|
if g.rd != nil && g.rd.Error != nil {
|
||||||
log.DefaultLogger.Error("gateway redis connection failed", "error", g.rd.Error.Error())
|
logger.Error("gateway redis connection failed", "error", g.rd.Error.Error())
|
||||||
g.rd = nil
|
g.rd = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 初始全量加载
|
// 初始全量加载 Redis 动态配置
|
||||||
g.loadAll()
|
g.loadAll(logger)
|
||||||
|
|
||||||
// 注册 Reload 钩子 (响应 SIGHUP 或 kill 命令),兜底机制全量加载
|
// 开启 Redis 订阅
|
||||||
service.OnReload(func() error {
|
|
||||||
g.loadAll()
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
// 开启 Redis 订阅,处理针对特定 Host 的局部更新
|
|
||||||
if g.rd != nil {
|
if g.rd != nil {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
subCtx, cancel := context.WithCancel(context.Background())
|
||||||
g.cancelPubSub = cancel
|
g.cancelPubSub = cancel
|
||||||
go g.subscribe(ctx)
|
go g.subscribe(subCtx, logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
// 启动底层的 WebServer,处理所有实际的 HTTP 连接和发现注册
|
||||||
|
return g.WebServer.Start(ctx, logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadAll 初始化阶段从 Redis 扫描所有网关配置并加载
|
// Stop 停止网关服务 (实现 starter.Service)
|
||||||
func (g *GatewayApp) loadAll() {
|
func (g *GatewayApp) Stop(ctx context.Context) error {
|
||||||
|
if g.cancelPubSub != nil {
|
||||||
|
g.cancelPubSub()
|
||||||
|
}
|
||||||
|
if g.rd != nil {
|
||||||
|
g.rd.Unsubscribe(g.pubsubChannel)
|
||||||
|
}
|
||||||
|
// 停止底层的 WebServer
|
||||||
|
return g.WebServer.Stop(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reload 重载网关配置 (实现 starter.Reloader)
|
||||||
|
func (g *GatewayApp) Reload() error {
|
||||||
|
// 1. 触发底层 WebServer 的重载 (会重新加载本地 yaml 配置文件中的固定路由策略,并触发 OnReload 钩子)
|
||||||
|
err := g.WebServer.Reload()
|
||||||
|
|
||||||
|
// 2. 重新全量拉取 Redis 中的动态配置
|
||||||
|
logger := log.DefaultLogger
|
||||||
|
logger.Info("gateway reloading dynamic configurations from redis...")
|
||||||
|
g.loadAll(logger)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadAll 初始化或 Reload 阶段从 Redis 扫描所有网关配置并加载
|
||||||
|
func (g *GatewayApp) loadAll(logger *log.Logger) {
|
||||||
if g.rd == nil {
|
if g.rd == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.DefaultLogger.Info("gateway loading full configuration")
|
logger.Info("gateway loading full configuration")
|
||||||
|
|
||||||
// 为了简化,由于 Redis KEYS 命令在线上不推荐,更好的方式是 Gateway 启动时只加载本地固定配置,
|
|
||||||
// 动态配置按需拉取?不,网关启动必须拉取全量。
|
|
||||||
// 但如果我们改成以 host 为 Key,就需要维护一个 Set 存放所有的 host,或者直接遍历 KEYS gateway:*
|
|
||||||
// 为了最佳实践,我们在 redis 维护一个 set: `gateway:hosts` 存放所有的域名
|
|
||||||
hosts := g.rd.Do("SMEMBERS", GatewayConf.Prefix+":hosts").Strings()
|
hosts := g.rd.Do("SMEMBERS", GatewayConf.Prefix+":hosts").Strings()
|
||||||
for _, host := range hosts {
|
for _, host := range hosts {
|
||||||
g.loadHost(host)
|
g.loadHost(host, logger)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadHost 加载指定 Host 下的所有类型配置
|
// loadHost 加载指定 Host 下的所有类型配置
|
||||||
func (g *GatewayApp) loadHost(host string) {
|
func (g *GatewayApp) loadHost(host string, logger *log.Logger) {
|
||||||
if g.rd == nil || host == "" {
|
if g.rd == nil || host == "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -116,31 +136,20 @@ func (g *GatewayApp) loadHost(host string) {
|
|||||||
}
|
}
|
||||||
service.ReplaceStatics(host, staticConfig)
|
service.ReplaceStatics(host, staticConfig)
|
||||||
|
|
||||||
log.DefaultLogger.Info("gateway host configuration updated via pub/sub", "host", host,
|
logger.Info("gateway host configuration updated via pub/sub", "host", host,
|
||||||
"proxies", len(proxyRules), "rewrites", len(rewriteRules), "statics", len(staticConfig))
|
"proxies", len(proxyRules), "rewrites", len(rewriteRules), "statics", len(staticConfig))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GatewayApp) subscribe(ctx context.Context) {
|
func (g *GatewayApp) subscribe(ctx context.Context, logger *log.Logger) {
|
||||||
log.DefaultLogger.Info("gateway subscribed to redis channel", "channel", g.pubsubChannel)
|
logger.Info("gateway subscribed to redis channel", "channel", g.pubsubChannel)
|
||||||
g.rd.Subscribe(g.pubsubChannel, nil, func(data []byte) {
|
g.rd.Subscribe(g.pubsubChannel, nil, func(data []byte) {
|
||||||
host := string(data)
|
host := string(data)
|
||||||
host = strings.TrimSpace(strings.Trim(host, "\"")) // 兼容裸字符串和 JSON 字符串
|
host = strings.TrimSpace(strings.Trim(host, "\"")) // 兼容裸字符串和 JSON 字符串
|
||||||
|
|
||||||
if host != "" {
|
if host != "" {
|
||||||
// 触发对应 Host 的局部刷新
|
g.loadHost(host, logger)
|
||||||
g.loadHost(host)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop 停止应用
|
|
||||||
func (g *GatewayApp) Stop() {
|
|
||||||
if g.cancelPubSub != nil {
|
|
||||||
g.cancelPubSub()
|
|
||||||
}
|
|
||||||
if g.rd != nil {
|
|
||||||
g.rd.Unsubscribe(g.pubsubChannel)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
21
app_test.go
21
app_test.go
@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"apigo.cc/go/log"
|
"apigo.cc/go/log"
|
||||||
"apigo.cc/go/redis"
|
"apigo.cc/go/redis"
|
||||||
@ -42,7 +43,6 @@ func TestGateway(t *testing.T) {
|
|||||||
// 3. 配置 Gateway
|
// 3. 配置 Gateway
|
||||||
GatewayConf.Redis = registry
|
GatewayConf.Redis = registry
|
||||||
GatewayConf.Prefix = "gateway"
|
GatewayConf.Prefix = "gateway"
|
||||||
GatewayConf.Channel = "gateway:channel"
|
|
||||||
|
|
||||||
// 初始化网关的配置到 Redis
|
// 初始化网关的配置到 Redis
|
||||||
proxyRules := []service.ProxyRule{
|
proxyRules := []service.ProxyRule{
|
||||||
@ -73,13 +73,8 @@ func TestGateway(t *testing.T) {
|
|||||||
|
|
||||||
// 4. 启动 Gateway 应用
|
// 4. 启动 Gateway 应用
|
||||||
app := NewGatewayApp()
|
app := NewGatewayApp()
|
||||||
err := app.Init()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to init gateway: %v", err)
|
|
||||||
}
|
|
||||||
defer app.Stop()
|
|
||||||
|
|
||||||
// 启动网关的 HTTP 监听
|
// 设置独立的端口给 Gateway 的 WebServer 避免与 backend 冲突
|
||||||
service.Config.App = "gateway"
|
service.Config.App = "gateway"
|
||||||
service.Config.Listen = ":0"
|
service.Config.Listen = ":0"
|
||||||
// 重置发现,确保网关独立
|
// 重置发现,确保网关独立
|
||||||
@ -89,11 +84,17 @@ func TestGateway(t *testing.T) {
|
|||||||
service.Config.Calls = map[string]service.CallConfig{
|
service.Config.Calls = map[string]service.CallConfig{
|
||||||
"test-backend": {Timeout: 1000},
|
"test-backend": {Timeout: 1000},
|
||||||
}
|
}
|
||||||
asGw := service.AsyncStart()
|
|
||||||
defer asGw.Stop()
|
ctx := context.Background()
|
||||||
|
err := app.Start(ctx, log.DefaultLogger)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to start gateway: %v", err)
|
||||||
|
}
|
||||||
|
defer app.Stop(ctx)
|
||||||
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
_, gwPort, _ := net.SplitHostPort(asGw.Addr)
|
_, gwPort, _ := net.SplitHostPort(app.Addr)
|
||||||
|
|
||||||
client := &gohttp.Client{Timeout: 2 * time.Second}
|
client := &gohttp.Client{Timeout: 2 * time.Second}
|
||||||
|
|
||||||
|
|||||||
4
fix.sh
4
fix.sh
@ -1,4 +0,0 @@
|
|||||||
sed -i '' '/"apigo.cc\/go\/discover"/d' app_test.go
|
|
||||||
sed -i '' '/"apigo.cc\/go\/starter"/d' app.go
|
|
||||||
sed -i '' '/"apigo.cc\/go\/timer"/d' app.go
|
|
||||||
sed -i '' '/"time"/d' app.go
|
|
||||||
24
main.go
24
main.go
@ -3,32 +3,24 @@ package main
|
|||||||
import (
|
import (
|
||||||
"apigo.cc/go/config"
|
"apigo.cc/go/config"
|
||||||
"apigo.cc/go/log"
|
"apigo.cc/go/log"
|
||||||
"apigo.cc/go/service"
|
|
||||||
"apigo.cc/go/starter"
|
"apigo.cc/go/starter"
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// 加载默认配置
|
// 1. 加载默认配置,支持覆盖与环境变量注入
|
||||||
_ = config.Load(&GatewayConf, "gateway")
|
_ = config.Load(&GatewayConf, "gateway")
|
||||||
|
|
||||||
app := NewGatewayApp()
|
// 2. 初始化 Starter 信息
|
||||||
if err := app.Init(); err != nil {
|
|
||||||
fmt.Printf("Gateway init error: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
starter.SetAppInfo("gateway", "2.0.0")
|
starter.SetAppInfo("gateway", "2.0.0")
|
||||||
|
|
||||||
// 注册 Gateway 服务核心: service.WebServer
|
// 3. 注册 Gateway 服务
|
||||||
webServer := service.NewWebServer()
|
// GatewayApp 自身实现了 starter.Service 和 starter.Reloader 接口
|
||||||
starter.Register("gateway-web", webServer, 100, 5*time.Second, 10*time.Second)
|
app := NewGatewayApp()
|
||||||
|
starter.Register("gateway-core", app, 100, 5*time.Second, 10*time.Second)
|
||||||
|
|
||||||
// 运行
|
// 4. 运行服务生命周期,响应 start/stop/reload 等命令
|
||||||
starter.Run()
|
starter.Run()
|
||||||
|
|
||||||
app.Stop()
|
log.DefaultLogger.Info("gateway process exited")
|
||||||
log.DefaultLogger.Info("gateway exited")
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user