diff --git a/app.go b/app.go index 45e6598..6b52d8c 100644 --- a/app.go +++ b/app.go @@ -11,80 +11,100 @@ import ( // Config 定义 Gateway 基础配置 type Config struct { - Redis string // Redis 注册中心/配置中心 URL - Prefix string // Redis Key 的前缀 - Channel string // Redis Pub/Sub 通道名称 + Redis string // Redis 配置中心 URL (与发现服务的注册中心独立,若为空则不启用动态配置) + Prefix string // Redis Key 的前缀 } var GatewayConf = Config{ - Prefix: "gateway", - Channel: "gateway:channel", + Prefix: "gateway", } -// GatewayApp 定义网关应用 +// GatewayApp 定义网关应用,融合了 WebServer 的原生能力 type GatewayApp struct { - rd *redis.Redis - pubsubChannel string - cancelPubSub context.CancelFunc + *service.WebServer + rd *redis.Redis + pubsubChannel string + cancelPubSub context.CancelFunc } // NewGatewayApp 创建 Gateway func NewGatewayApp() *GatewayApp { - g := &GatewayApp{} - g.pubsubChannel = GatewayConf.Channel - return g + return &GatewayApp{ + WebServer: service.NewWebServer(), + } } -// Init 初始化 Gateway,从 Redis 加载并订阅更新 -func (g *GatewayApp) Init() error { +// Start 启动网关服务 (实现 starter.Service) +func (g *GatewayApp) Start(ctx context.Context, logger *log.Logger) error { + if GatewayConf.Prefix == "" { + GatewayConf.Prefix = "gateway" + } + g.pubsubChannel = GatewayConf.Prefix + ":channel" + if GatewayConf.Redis != "" { - g.rd = redis.GetRedis(GatewayConf.Redis, log.DefaultLogger) + g.rd = redis.GetRedis(GatewayConf.Redis, logger) } if g.rd != nil && g.rd.Error != nil { - log.DefaultLogger.Error("gateway redis connection failed", "error", g.rd.Error.Error()) + logger.Error("gateway redis connection failed", "error", g.rd.Error.Error()) g.rd = nil } - // 初始全量加载 - g.loadAll() + // 初始全量加载 Redis 动态配置 + g.loadAll(logger) - // 注册 Reload 钩子 (响应 SIGHUP 或 kill 命令),兜底机制全量加载 - service.OnReload(func() error { - g.loadAll() - return nil - }) - - // 开启 Redis 订阅,处理针对特定 Host 的局部更新 + // 开启 Redis 订阅 if g.rd != nil { - ctx, cancel := context.WithCancel(context.Background()) + subCtx, cancel := context.WithCancel(context.Background()) g.cancelPubSub = cancel - go g.subscribe(ctx) + go g.subscribe(subCtx, logger) } - return nil + // 启动底层的 WebServer,处理所有实际的 HTTP 连接和发现注册 + return g.WebServer.Start(ctx, logger) } -// loadAll 初始化阶段从 Redis 扫描所有网关配置并加载 -func (g *GatewayApp) loadAll() { +// Stop 停止网关服务 (实现 starter.Service) +func (g *GatewayApp) Stop(ctx context.Context) error { + if g.cancelPubSub != nil { + g.cancelPubSub() + } + if g.rd != nil { + g.rd.Unsubscribe(g.pubsubChannel) + } + // 停止底层的 WebServer + return g.WebServer.Stop(ctx) +} + +// Reload 重载网关配置 (实现 starter.Reloader) +func (g *GatewayApp) Reload() error { + // 1. 触发底层 WebServer 的重载 (会重新加载本地 yaml 配置文件中的固定路由策略,并触发 OnReload 钩子) + err := g.WebServer.Reload() + + // 2. 重新全量拉取 Redis 中的动态配置 + logger := log.DefaultLogger + logger.Info("gateway reloading dynamic configurations from redis...") + g.loadAll(logger) + + return err +} + +// loadAll 初始化或 Reload 阶段从 Redis 扫描所有网关配置并加载 +func (g *GatewayApp) loadAll(logger *log.Logger) { if g.rd == nil { return } - log.DefaultLogger.Info("gateway loading full configuration") + logger.Info("gateway loading full configuration") - // 为了简化,由于 Redis KEYS 命令在线上不推荐,更好的方式是 Gateway 启动时只加载本地固定配置, - // 动态配置按需拉取?不,网关启动必须拉取全量。 - // 但如果我们改成以 host 为 Key,就需要维护一个 Set 存放所有的 host,或者直接遍历 KEYS gateway:* - // 为了最佳实践,我们在 redis 维护一个 set: `gateway:hosts` 存放所有的域名 hosts := g.rd.Do("SMEMBERS", GatewayConf.Prefix+":hosts").Strings() for _, host := range hosts { - g.loadHost(host) + g.loadHost(host, logger) } } // loadHost 加载指定 Host 下的所有类型配置 -func (g *GatewayApp) loadHost(host string) { +func (g *GatewayApp) loadHost(host string, logger *log.Logger) { if g.rd == nil || host == "" { return } @@ -116,31 +136,20 @@ func (g *GatewayApp) loadHost(host string) { } service.ReplaceStatics(host, staticConfig) - log.DefaultLogger.Info("gateway host configuration updated via pub/sub", "host", host, + logger.Info("gateway host configuration updated via pub/sub", "host", host, "proxies", len(proxyRules), "rewrites", len(rewriteRules), "statics", len(staticConfig)) } -func (g *GatewayApp) subscribe(ctx context.Context) { - log.DefaultLogger.Info("gateway subscribed to redis channel", "channel", g.pubsubChannel) +func (g *GatewayApp) subscribe(ctx context.Context, logger *log.Logger) { + logger.Info("gateway subscribed to redis channel", "channel", g.pubsubChannel) g.rd.Subscribe(g.pubsubChannel, nil, func(data []byte) { host := string(data) host = strings.TrimSpace(strings.Trim(host, "\"")) // 兼容裸字符串和 JSON 字符串 - + if host != "" { - // 触发对应 Host 的局部刷新 - g.loadHost(host) + g.loadHost(host, logger) } }) - + <-ctx.Done() } - -// Stop 停止应用 -func (g *GatewayApp) Stop() { - if g.cancelPubSub != nil { - g.cancelPubSub() - } - if g.rd != nil { - g.rd.Unsubscribe(g.pubsubChannel) - } -} diff --git a/app_test.go b/app_test.go index b6defd9..8d6bce2 100644 --- a/app_test.go +++ b/app_test.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "apigo.cc/go/log" "apigo.cc/go/redis" @@ -42,7 +43,6 @@ func TestGateway(t *testing.T) { // 3. 配置 Gateway GatewayConf.Redis = registry GatewayConf.Prefix = "gateway" - GatewayConf.Channel = "gateway:channel" // 初始化网关的配置到 Redis proxyRules := []service.ProxyRule{ @@ -73,13 +73,8 @@ func TestGateway(t *testing.T) { // 4. 启动 Gateway 应用 app := NewGatewayApp() - err := app.Init() - if err != nil { - t.Fatalf("Failed to init gateway: %v", err) - } - defer app.Stop() - - // 启动网关的 HTTP 监听 + + // 设置独立的端口给 Gateway 的 WebServer 避免与 backend 冲突 service.Config.App = "gateway" service.Config.Listen = ":0" // 重置发现,确保网关独立 @@ -89,11 +84,17 @@ func TestGateway(t *testing.T) { service.Config.Calls = map[string]service.CallConfig{ "test-backend": {Timeout: 1000}, } - asGw := service.AsyncStart() - defer asGw.Stop() + + ctx := context.Background() + err := app.Start(ctx, log.DefaultLogger) + if err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer app.Stop(ctx) + time.Sleep(200 * time.Millisecond) - _, gwPort, _ := net.SplitHostPort(asGw.Addr) + _, gwPort, _ := net.SplitHostPort(app.Addr) client := &gohttp.Client{Timeout: 2 * time.Second} diff --git a/fix.sh b/fix.sh deleted file mode 100644 index 6bf4109..0000000 --- a/fix.sh +++ /dev/null @@ -1,4 +0,0 @@ -sed -i '' '/"apigo.cc\/go\/discover"/d' app_test.go -sed -i '' '/"apigo.cc\/go\/starter"/d' app.go -sed -i '' '/"apigo.cc\/go\/timer"/d' app.go -sed -i '' '/"time"/d' app.go diff --git a/main.go b/main.go index d2d20e7..5a5e35c 100644 --- a/main.go +++ b/main.go @@ -3,32 +3,24 @@ package main import ( "apigo.cc/go/config" "apigo.cc/go/log" - "apigo.cc/go/service" "apigo.cc/go/starter" - "fmt" - "os" "time" ) func main() { - // 加载默认配置 + // 1. 加载默认配置,支持覆盖与环境变量注入 _ = config.Load(&GatewayConf, "gateway") - app := NewGatewayApp() - if err := app.Init(); err != nil { - fmt.Printf("Gateway init error: %v\n", err) - os.Exit(1) - } - + // 2. 初始化 Starter 信息 starter.SetAppInfo("gateway", "2.0.0") - // 注册 Gateway 服务核心: service.WebServer - webServer := service.NewWebServer() - starter.Register("gateway-web", webServer, 100, 5*time.Second, 10*time.Second) + // 3. 注册 Gateway 服务 + // GatewayApp 自身实现了 starter.Service 和 starter.Reloader 接口 + app := NewGatewayApp() + starter.Register("gateway-core", app, 100, 5*time.Second, 10*time.Second) - // 运行 + // 4. 运行服务生命周期,响应 start/stop/reload 等命令 starter.Run() - - app.Stop() - log.DefaultLogger.Info("gateway exited") + + log.DefaultLogger.Info("gateway process exited") }