diff --git a/AppClient.go b/AppClient.go index 19f5bec..804520a 100644 --- a/AppClient.go +++ b/AppClient.go @@ -36,8 +36,9 @@ func (ac *AppClient) Next(app string, request *http.Request) *NodeInfo { func (ac *AppClient) CheckApp(app string) bool { nodes := ac.discoverer.GetAppNodes(app) if nodes == nil { + conf := ac.discoverer.GetConfig() if !ac.discoverer.AddExternalApp(app, "") { - ac.logError("app not found", "app", app, "calls", ac.discoverer.Config.Calls) + ac.logError("app not found", "app", app, "calls", conf.Calls) return false } } @@ -62,9 +63,10 @@ func (ac *AppClient) NextWithNode(app, withNode string, request *http.Request) * return allNodes[withNode] } + conf := ac.discoverer.GetConfig() readyNodes := make([]*NodeInfo, 0, len(allNodes)) for _, node := range allNodes { - if ac.excludes[node.Addr] || node.FailedTimes.Load() >= int32(ac.discoverer.Config.CallRetryTimes) { + if ac.excludes[node.Addr] || node.FailedTimes.Load() >= int32(conf.CallRetryTimes) { continue } readyNodes = append(readyNodes, node) diff --git a/Caller.go b/Caller.go index 0d75591..f3d7a70 100644 --- a/Caller.go +++ b/Caller.go @@ -5,7 +5,6 @@ import ( "net/http" "reflect" "strings" - "sync" "time" "github.com/gorilla/websocket" @@ -14,20 +13,17 @@ import ( "apigo.cc/go/log" ) -var appClientPools = make(map[string]*gohttp.Client) -var appClientPoolsLock sync.RWMutex - -func getHttpClient(app string, timeout time.Duration, h2c bool) *gohttp.Client { - appClientPoolsLock.RLock() - c := appClientPools[app] - appClientPoolsLock.RUnlock() +func (d *Discoverer) getHttpClient(app string, timeout time.Duration, h2c bool) *gohttp.Client { + d.appClientPoolsLock.RLock() + c := d.appClientPools[app] + d.appClientPoolsLock.RUnlock() if c != nil { return c } - appClientPoolsLock.Lock() - defer appClientPoolsLock.Unlock() - c = appClientPools[app] + d.appClientPoolsLock.Lock() + defer d.appClientPoolsLock.Unlock() + c = d.appClientPools[app] if c != nil { return c } @@ -37,7 +33,7 @@ func getHttpClient(app string, timeout time.Duration, h2c bool) *gohttp.Client { } else { c = gohttp.NewClient(timeout) } - appClientPools[app] = c + d.appClientPools[app] = c return c } @@ -131,8 +127,9 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat callerHeaders[headers[i-1]] = headers[i] } + conf := c.discoverer.GetConfig() if c.discoverer.isServer { - callerHeaders[HeaderFromApp] = c.discoverer.Config.App + callerHeaders[HeaderFromApp] = conf.App callerHeaders[HeaderFromNode] = c.discoverer.myAddr } @@ -188,7 +185,7 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat scheme = "https" } - hc := getHttpClient(app, callInfo.Timeout, callInfo.HttpVersion == 2 && !callInfo.SSL) + hc := c.discoverer.getHttpClient(app, callInfo.Timeout, callInfo.HttpVersion == 2 && !callInfo.SSL) hc.NoBody = c.NoBody var res *gohttp.Result @@ -243,7 +240,7 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat c.logError(errStr, "app", app, "node", node.Addr, "path", path, "attempts", appClient.attempts) appClient.Log(node.Addr, usedTimeMs, fmt.Errorf("%s", errStr)) - if node.FailedTimes.Load() >= int32(c.discoverer.Config.CallRetryTimes) { + if node.FailedTimes.Load() >= int32(conf.CallRetryTimes) { c.discoverer.logError("node isolated locally due to high failures", "app", app, "node", node.Addr) } continue diff --git a/Config.go b/Config.go index aea514e..67bf9b3 100644 --- a/Config.go +++ b/Config.go @@ -1,5 +1,9 @@ package discover +import ( + "sync" +) + // ConfigStruct 存储发现服务的配置 type ConfigStruct struct { Registry string // 注册中心地址,如 redis://:@127.0.0.1:6379/15 @@ -15,3 +19,19 @@ var Config = ConfigStruct{ Weight: 100, CallRetryTimes: 10, } + +var configLock sync.RWMutex + +// SetConfig 安全地设置全局配置 +func SetConfig(conf ConfigStruct) { + configLock.Lock() + defer configLock.Unlock() + Config = conf +} + +// GetConfig 安全地获取全局配置 +func GetConfig() ConfigStruct { + configLock.RLock() + defer configLock.RUnlock() + return Config +} diff --git a/Discover.go b/Discover.go index 8de97de..7d8d60f 100644 --- a/Discover.go +++ b/Discover.go @@ -9,11 +9,13 @@ import ( "regexp" "strings" "sync" + "sync/atomic" "syscall" "time" "apigo.cc/go/cast" "apigo.cc/go/config" + gohttp "apigo.cc/go/http" "apigo.cc/go/id" "apigo.cc/go/log" "apigo.cc/go/redis" @@ -21,14 +23,15 @@ import ( // Discoverer 发现服务实例 type Discoverer struct { - Config ConfigStruct + config ConfigStruct + configLock sync.RWMutex serverRedisPool *redis.Redis clientRedisPool *redis.Redis pubsubRedisPool *redis.Redis isServer bool isClient bool - daemonRunning bool + daemonRunning atomic.Bool myAddr string logger *log.Logger inited bool @@ -39,6 +42,9 @@ type Discoverer struct { appNodes map[string]map[string]*NodeInfo appSubscribed map[string]bool + appClientPools map[string]*gohttp.Client + appClientPoolsLock sync.RWMutex + settedRoute func(*AppClient, *http.Request) settedLoadBalancer LoadBalancer } @@ -56,7 +62,7 @@ var DefaultDiscoverer = NewDiscoverer() // NewDiscoverer 创建一个新的发现服务实例 func NewDiscoverer() *Discoverer { return &Discoverer{ - Config: ConfigStruct{ + config: ConfigStruct{ Weight: 100, CallRetryTimes: 10, }, @@ -64,10 +70,25 @@ func NewDiscoverer() *Discoverer { calls: make(map[string]*callInfoType), appNodes: make(map[string]map[string]*NodeInfo), appSubscribed: make(map[string]bool), + appClientPools: make(map[string]*gohttp.Client), settedLoadBalancer: &DefaultLoadBalancer{}, } } +// GetConfig 安全地获取配置 +func (d *Discoverer) GetConfig() ConfigStruct { + d.configLock.RLock() + defer d.configLock.RUnlock() + return d.config +} + +// SetConfig 安全地设置配置 +func (d *Discoverer) SetConfig(conf ConfigStruct) { + d.configLock.Lock() + defer d.configLock.Unlock() + d.config = conf +} + // IsServer 返回当前节点是否作为服务端运行 func (d *Discoverer) IsServer() bool { return d.isServer } @@ -75,11 +96,13 @@ func (d *Discoverer) IsServer() bool { return d.isServer } func (d *Discoverer) IsClient() bool { return d.isClient } func (d *Discoverer) logError(msg string, extra ...any) { - d.logger.Error("Discover: "+msg, append(extra, "app", d.Config.App, "addr", d.myAddr)...) + conf := d.GetConfig() + d.logger.Error("Discover: "+msg, append(extra, "app", conf.App, "addr", d.myAddr)...) } func (d *Discoverer) logInfo(msg string, extra ...any) { - d.logger.Info("Discover: "+msg, append(extra, "app", d.Config.App, "addr", d.myAddr)...) + conf := d.GetConfig() + d.logger.Info("Discover: "+msg, append(extra, "app", conf.App, "addr", d.myAddr)...) } // SetLogger 设置 Discover 使用的全局 Logger @@ -95,21 +118,25 @@ func (d *Discoverer) Init() { return } d.inited = true + + conf := d.GetConfig() // 如果是默认实例,尝试加载配置 if d == DefaultDiscoverer { - _ = config.Load(&d.Config, "discover") - Config = d.Config // 保持 Config 变量同步 + _ = config.Load(&conf, "discover") + d.SetConfig(conf) + SetConfig(conf) // 保持全局 Config 变量同步 } - if d.Config.CallRetryTimes <= 0 { - d.Config.CallRetryTimes = 10 + if conf.CallRetryTimes <= 0 { + conf.CallRetryTimes = 10 } - if d.Config.Weight <= 0 { - d.Config.Weight = 100 + if conf.Weight <= 0 { + conf.Weight = 100 } - if d.Config.Registry == "" { - d.Config.Registry = DefaultRegistry + if conf.Registry == "" { + conf.Registry = DefaultRegistry } + d.SetConfig(conf) if d.logger == log.DefaultLogger || d.logger == nil { d.logger = log.New(id.MakeID(12)) @@ -121,19 +148,20 @@ func (d *Discoverer) Start(addr string) bool { d.Init() d.myAddr = addr - d.isServer = d.Config.App != "" && d.Config.Weight > 0 - if d.isServer && d.Config.Registry != "" { - d.serverRedisPool = redis.GetRedis(d.Config.Registry, d.logger) + conf := d.GetConfig() + d.isServer = conf.App != "" && conf.Weight > 0 + if d.isServer && conf.Registry != "" { + d.serverRedisPool = redis.GetRedis(conf.Registry, d.logger) if d.serverRedisPool.Error != nil { d.logError(d.serverRedisPool.Error.Error()) } // 注册节点 - if d.serverRedisPool.Do("HSET", d.Config.App, addr, d.Config.Weight).Error == nil { - d.serverRedisPool.Do("SETEX", d.Config.App+"_"+addr, 10, "1") + if d.serverRedisPool.Do("HSET", conf.App, addr, conf.Weight).Error == nil { + d.serverRedisPool.Do("SETEX", conf.App+"_"+addr, 10, "1") d.logInfo("registered") - d.serverRedisPool.PUBLISH("CH_"+d.Config.App, fmt.Sprintf("%s %d", addr, d.Config.Weight)) - d.daemonRunning = true + d.serverRedisPool.PUBLISH("CH_"+conf.App, fmt.Sprintf("%s %d", addr, conf.Weight)) + d.daemonRunning.Store(true) d.daemonStopChan = make(chan bool) go d.daemon() } else { @@ -143,8 +171,8 @@ func (d *Discoverer) Start(addr string) bool { calls := d.getCalls() if len(calls) > 0 { - for app, conf := range calls { - d.addApp(app, conf, false) + for app, c := range calls { + d.addApp(app, c, false) } if !d.startSub() { return false @@ -158,21 +186,22 @@ func (d *Discoverer) daemon() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - for d.daemonRunning { + for d.daemonRunning.Load() { <-ticker.C - if !d.daemonRunning { + if !d.daemonRunning.Load() { break } + conf := d.GetConfig() if d.isServer && d.serverRedisPool != nil { - if !d.serverRedisPool.Do("HEXISTS", d.Config.App, d.myAddr).Bool() { + if !d.serverRedisPool.Do("HEXISTS", conf.App, d.myAddr).Bool() { d.logInfo("lost app registered info, re-registering") - if d.serverRedisPool.Do("HSET", d.Config.App, d.myAddr, d.Config.Weight).Error == nil { - d.serverRedisPool.Do("SETEX", d.Config.App+"_"+d.myAddr, 10, "1") - d.serverRedisPool.PUBLISH("CH_"+d.Config.App, fmt.Sprintf("%s %d", d.myAddr, d.Config.Weight)) + if d.serverRedisPool.Do("HSET", conf.App, d.myAddr, conf.Weight).Error == nil { + d.serverRedisPool.Do("SETEX", conf.App+"_"+d.myAddr, 10, "1") + d.serverRedisPool.PUBLISH("CH_"+conf.App, fmt.Sprintf("%s %d", d.myAddr, conf.Weight)) } } else { - d.serverRedisPool.Do("SETEX", d.Config.App+"_"+d.myAddr, 10, "1") + d.serverRedisPool.Do("SETEX", conf.App+"_"+d.myAddr, 10, "1") } } } @@ -183,17 +212,18 @@ func (d *Discoverer) daemon() { } func (d *Discoverer) startSub() bool { - if d.Config.Registry == "" { + conf := d.GetConfig() + if conf.Registry == "" { return true } d.appLock.Lock() if d.clientRedisPool == nil { - d.clientRedisPool = redis.GetRedis(d.Config.Registry, d.logger) + d.clientRedisPool = redis.GetRedis(conf.Registry, d.logger) } if d.pubsubRedisPool == nil { - d.pubsubRedisPool = redis.GetRedis(d.Config.Registry, d.logger.New(id.MakeID(12))) + d.pubsubRedisPool = redis.GetRedis(conf.Registry, d.logger.New(id.MakeID(12))) // 订阅所有已注册的应用 for app := range d.appSubscribed { d.subscribeAppUnderLock(app) @@ -232,16 +262,25 @@ func (d *Discoverer) Stop() { d.isClient = false } + conf := d.GetConfig() if d.isServer { - d.daemonRunning = false + d.daemonRunning.Store(false) if d.serverRedisPool != nil { - d.serverRedisPool.Do("HDEL", d.Config.App, d.myAddr) - d.serverRedisPool.Do("DEL", d.Config.App+"_"+d.myAddr) - d.serverRedisPool.PUBLISH("CH_"+d.Config.App, fmt.Sprintf("%s %d", d.myAddr, 0)) + d.serverRedisPool.Do("HDEL", conf.App, d.myAddr) + d.serverRedisPool.Do("DEL", conf.App+"_"+d.myAddr) + d.serverRedisPool.PUBLISH("CH_"+conf.App, fmt.Sprintf("%s %d", d.myAddr, 0)) } d.isServer = false } d.appLock.Unlock() + + // 释放 HTTP 连接池 + d.appClientPoolsLock.Lock() + for _, client := range d.appClientPools { + client.Destroy() + } + d.appClientPools = make(map[string]*gohttp.Client) + d.appClientPoolsLock.Unlock() } // Wait 等待守护进程退出 @@ -273,6 +312,7 @@ func (d *Discoverer) EasyStart() (string, int) { _ = ln.Close() port = addrInfo.Port + conf := d.GetConfig() ip := addrInfo.IP if !ip.IsGlobalUnicast() { addrs, _ := net.InterfaceAddrs() @@ -282,7 +322,7 @@ func (d *Discoverer) EasyStart() (string, int) { if ip4 == nil || !ip4.IsGlobalUnicast() { continue } - if d.Config.IpPrefix != "" && strings.HasPrefix(ip4.String(), d.Config.IpPrefix) { + if conf.IpPrefix != "" && strings.HasPrefix(ip4.String(), conf.IpPrefix) { ip = ip4 break } @@ -339,14 +379,16 @@ var numberMatcher = regexp.MustCompile(`^\d+(s|ms|us|µs|ns?)?$`) func (d *Discoverer) addApp(app, callConf string, fetch bool) bool { d.appLock.Lock() - if d.Config.Calls == nil { - d.Config.Calls = make(map[string]string) + conf := d.GetConfig() + if conf.Calls == nil { + conf.Calls = make(map[string]string) } - if d.Config.Calls[app] == callConf && d.appNodes[app] != nil { + if conf.Calls[app] == callConf && d.appNodes[app] != nil { d.appLock.Unlock() return false } - d.Config.Calls[app] = callConf + conf.Calls[app] = callConf + d.SetConfig(conf) callInfo := &callInfoType{ Timeout: 10 * time.Second, @@ -437,10 +479,9 @@ func (d *Discoverer) getAppNodes(app string) map[string]*NodeInfo { } func (d *Discoverer) getCalls() map[string]string { - d.appLock.RLock() - defer d.appLock.RUnlock() + conf := d.GetConfig() calls := make(map[string]string) - for k, v := range d.Config.Calls { + for k, v := range conf.Calls { calls[k] = v } return calls diff --git a/Discover_test.go b/Discover_test.go index ea7f256..773d4b6 100644 --- a/Discover_test.go +++ b/Discover_test.go @@ -46,8 +46,10 @@ func TestDiscover(t *testing.T) { defer server.Close() // 配置 Discover - discover.DefaultDiscoverer.Config.App = "test-app" - discover.DefaultDiscoverer.Config.Registry = "redis://127.0.0.1:6379/15" + conf := discover.DefaultDiscoverer.GetConfig() + conf.App = "test-app" + conf.Registry = "redis://127.0.0.1:6379/15" + discover.DefaultDiscoverer.SetConfig(conf) // 启动 Discover if !discover.Start("127.0.0.1:18001") { diff --git a/MultiInstance_test.go b/MultiInstance_test.go index 1d643c8..3572949 100644 --- a/MultiInstance_test.go +++ b/MultiInstance_test.go @@ -30,8 +30,10 @@ func TestMultipleDiscoverer(t *testing.T) { // 实例 1 d1 := discover.NewDiscoverer() - d1.Config.App = "app1" - d1.Config.Registry = registry + c1conf := d1.GetConfig() + c1conf.App = "app1" + c1conf.Registry = registry + d1.SetConfig(c1conf) if !d1.Start("127.0.0.1:18011") { t.Skip("redis not available") } @@ -39,8 +41,10 @@ func TestMultipleDiscoverer(t *testing.T) { // 实例 2 d2 := discover.NewDiscoverer() - d2.Config.App = "app2" - d2.Config.Registry = registry + c2conf := d2.GetConfig() + c2conf.App = "app2" + c2conf.Registry = registry + d2.SetConfig(c2conf) if !d2.Start("127.0.0.1:18012") { t.Skip("redis not available") }