优化:实现 HTTP 连接池实例隔离,增强配置并发安全,改进资源回收逻辑(by AI)

This commit is contained in:
AI Engineer 2026-05-05 14:52:32 +08:00
parent 4b47f31c80
commit 5eefc06bba
6 changed files with 133 additions and 67 deletions

View File

@ -36,8 +36,9 @@ func (ac *AppClient) Next(app string, request *http.Request) *NodeInfo {
func (ac *AppClient) CheckApp(app string) bool {
nodes := ac.discoverer.GetAppNodes(app)
if nodes == nil {
conf := ac.discoverer.GetConfig()
if !ac.discoverer.AddExternalApp(app, "") {
ac.logError("app not found", "app", app, "calls", ac.discoverer.Config.Calls)
ac.logError("app not found", "app", app, "calls", conf.Calls)
return false
}
}
@ -62,9 +63,10 @@ func (ac *AppClient) NextWithNode(app, withNode string, request *http.Request) *
return allNodes[withNode]
}
conf := ac.discoverer.GetConfig()
readyNodes := make([]*NodeInfo, 0, len(allNodes))
for _, node := range allNodes {
if ac.excludes[node.Addr] || node.FailedTimes.Load() >= int32(ac.discoverer.Config.CallRetryTimes) {
if ac.excludes[node.Addr] || node.FailedTimes.Load() >= int32(conf.CallRetryTimes) {
continue
}
readyNodes = append(readyNodes, node)

View File

@ -5,7 +5,6 @@ import (
"net/http"
"reflect"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
@ -14,20 +13,17 @@ import (
"apigo.cc/go/log"
)
var appClientPools = make(map[string]*gohttp.Client)
var appClientPoolsLock sync.RWMutex
func getHttpClient(app string, timeout time.Duration, h2c bool) *gohttp.Client {
appClientPoolsLock.RLock()
c := appClientPools[app]
appClientPoolsLock.RUnlock()
func (d *Discoverer) getHttpClient(app string, timeout time.Duration, h2c bool) *gohttp.Client {
d.appClientPoolsLock.RLock()
c := d.appClientPools[app]
d.appClientPoolsLock.RUnlock()
if c != nil {
return c
}
appClientPoolsLock.Lock()
defer appClientPoolsLock.Unlock()
c = appClientPools[app]
d.appClientPoolsLock.Lock()
defer d.appClientPoolsLock.Unlock()
c = d.appClientPools[app]
if c != nil {
return c
}
@ -37,7 +33,7 @@ func getHttpClient(app string, timeout time.Duration, h2c bool) *gohttp.Client {
} else {
c = gohttp.NewClient(timeout)
}
appClientPools[app] = c
d.appClientPools[app] = c
return c
}
@ -131,8 +127,9 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat
callerHeaders[headers[i-1]] = headers[i]
}
conf := c.discoverer.GetConfig()
if c.discoverer.isServer {
callerHeaders[HeaderFromApp] = c.discoverer.Config.App
callerHeaders[HeaderFromApp] = conf.App
callerHeaders[HeaderFromNode] = c.discoverer.myAddr
}
@ -188,7 +185,7 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat
scheme = "https"
}
hc := getHttpClient(app, callInfo.Timeout, callInfo.HttpVersion == 2 && !callInfo.SSL)
hc := c.discoverer.getHttpClient(app, callInfo.Timeout, callInfo.HttpVersion == 2 && !callInfo.SSL)
hc.NoBody = c.NoBody
var res *gohttp.Result
@ -243,7 +240,7 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat
c.logError(errStr, "app", app, "node", node.Addr, "path", path, "attempts", appClient.attempts)
appClient.Log(node.Addr, usedTimeMs, fmt.Errorf("%s", errStr))
if node.FailedTimes.Load() >= int32(c.discoverer.Config.CallRetryTimes) {
if node.FailedTimes.Load() >= int32(conf.CallRetryTimes) {
c.discoverer.logError("node isolated locally due to high failures", "app", app, "node", node.Addr)
}
continue

View File

@ -1,5 +1,9 @@
package discover
import (
"sync"
)
// ConfigStruct 存储发现服务的配置
type ConfigStruct struct {
Registry string // 注册中心地址,如 redis://:@127.0.0.1:6379/15
@ -15,3 +19,19 @@ var Config = ConfigStruct{
Weight: 100,
CallRetryTimes: 10,
}
var configLock sync.RWMutex
// SetConfig 安全地设置全局配置
func SetConfig(conf ConfigStruct) {
configLock.Lock()
defer configLock.Unlock()
Config = conf
}
// GetConfig 安全地获取全局配置
func GetConfig() ConfigStruct {
configLock.RLock()
defer configLock.RUnlock()
return Config
}

View File

@ -9,11 +9,13 @@ import (
"regexp"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"apigo.cc/go/cast"
"apigo.cc/go/config"
gohttp "apigo.cc/go/http"
"apigo.cc/go/id"
"apigo.cc/go/log"
"apigo.cc/go/redis"
@ -21,14 +23,15 @@ import (
// Discoverer 发现服务实例
type Discoverer struct {
Config ConfigStruct
config ConfigStruct
configLock sync.RWMutex
serverRedisPool *redis.Redis
clientRedisPool *redis.Redis
pubsubRedisPool *redis.Redis
isServer bool
isClient bool
daemonRunning bool
daemonRunning atomic.Bool
myAddr string
logger *log.Logger
inited bool
@ -39,6 +42,9 @@ type Discoverer struct {
appNodes map[string]map[string]*NodeInfo
appSubscribed map[string]bool
appClientPools map[string]*gohttp.Client
appClientPoolsLock sync.RWMutex
settedRoute func(*AppClient, *http.Request)
settedLoadBalancer LoadBalancer
}
@ -56,7 +62,7 @@ var DefaultDiscoverer = NewDiscoverer()
// NewDiscoverer 创建一个新的发现服务实例
func NewDiscoverer() *Discoverer {
return &Discoverer{
Config: ConfigStruct{
config: ConfigStruct{
Weight: 100,
CallRetryTimes: 10,
},
@ -64,10 +70,25 @@ func NewDiscoverer() *Discoverer {
calls: make(map[string]*callInfoType),
appNodes: make(map[string]map[string]*NodeInfo),
appSubscribed: make(map[string]bool),
appClientPools: make(map[string]*gohttp.Client),
settedLoadBalancer: &DefaultLoadBalancer{},
}
}
// GetConfig 安全地获取配置
func (d *Discoverer) GetConfig() ConfigStruct {
d.configLock.RLock()
defer d.configLock.RUnlock()
return d.config
}
// SetConfig 安全地设置配置
func (d *Discoverer) SetConfig(conf ConfigStruct) {
d.configLock.Lock()
defer d.configLock.Unlock()
d.config = conf
}
// IsServer 返回当前节点是否作为服务端运行
func (d *Discoverer) IsServer() bool { return d.isServer }
@ -75,11 +96,13 @@ func (d *Discoverer) IsServer() bool { return d.isServer }
func (d *Discoverer) IsClient() bool { return d.isClient }
func (d *Discoverer) logError(msg string, extra ...any) {
d.logger.Error("Discover: "+msg, append(extra, "app", d.Config.App, "addr", d.myAddr)...)
conf := d.GetConfig()
d.logger.Error("Discover: "+msg, append(extra, "app", conf.App, "addr", d.myAddr)...)
}
func (d *Discoverer) logInfo(msg string, extra ...any) {
d.logger.Info("Discover: "+msg, append(extra, "app", d.Config.App, "addr", d.myAddr)...)
conf := d.GetConfig()
d.logger.Info("Discover: "+msg, append(extra, "app", conf.App, "addr", d.myAddr)...)
}
// SetLogger 设置 Discover 使用的全局 Logger
@ -95,21 +118,25 @@ func (d *Discoverer) Init() {
return
}
d.inited = true
conf := d.GetConfig()
// 如果是默认实例,尝试加载配置
if d == DefaultDiscoverer {
_ = config.Load(&d.Config, "discover")
Config = d.Config // 保持 Config 变量同步
_ = config.Load(&conf, "discover")
d.SetConfig(conf)
SetConfig(conf) // 保持全局 Config 变量同步
}
if d.Config.CallRetryTimes <= 0 {
d.Config.CallRetryTimes = 10
if conf.CallRetryTimes <= 0 {
conf.CallRetryTimes = 10
}
if d.Config.Weight <= 0 {
d.Config.Weight = 100
if conf.Weight <= 0 {
conf.Weight = 100
}
if d.Config.Registry == "" {
d.Config.Registry = DefaultRegistry
if conf.Registry == "" {
conf.Registry = DefaultRegistry
}
d.SetConfig(conf)
if d.logger == log.DefaultLogger || d.logger == nil {
d.logger = log.New(id.MakeID(12))
@ -121,19 +148,20 @@ func (d *Discoverer) Start(addr string) bool {
d.Init()
d.myAddr = addr
d.isServer = d.Config.App != "" && d.Config.Weight > 0
if d.isServer && d.Config.Registry != "" {
d.serverRedisPool = redis.GetRedis(d.Config.Registry, d.logger)
conf := d.GetConfig()
d.isServer = conf.App != "" && conf.Weight > 0
if d.isServer && conf.Registry != "" {
d.serverRedisPool = redis.GetRedis(conf.Registry, d.logger)
if d.serverRedisPool.Error != nil {
d.logError(d.serverRedisPool.Error.Error())
}
// 注册节点
if d.serverRedisPool.Do("HSET", d.Config.App, addr, d.Config.Weight).Error == nil {
d.serverRedisPool.Do("SETEX", d.Config.App+"_"+addr, 10, "1")
if d.serverRedisPool.Do("HSET", conf.App, addr, conf.Weight).Error == nil {
d.serverRedisPool.Do("SETEX", conf.App+"_"+addr, 10, "1")
d.logInfo("registered")
d.serverRedisPool.PUBLISH("CH_"+d.Config.App, fmt.Sprintf("%s %d", addr, d.Config.Weight))
d.daemonRunning = true
d.serverRedisPool.PUBLISH("CH_"+conf.App, fmt.Sprintf("%s %d", addr, conf.Weight))
d.daemonRunning.Store(true)
d.daemonStopChan = make(chan bool)
go d.daemon()
} else {
@ -143,8 +171,8 @@ func (d *Discoverer) Start(addr string) bool {
calls := d.getCalls()
if len(calls) > 0 {
for app, conf := range calls {
d.addApp(app, conf, false)
for app, c := range calls {
d.addApp(app, c, false)
}
if !d.startSub() {
return false
@ -158,21 +186,22 @@ func (d *Discoverer) daemon() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for d.daemonRunning {
for d.daemonRunning.Load() {
<-ticker.C
if !d.daemonRunning {
if !d.daemonRunning.Load() {
break
}
conf := d.GetConfig()
if d.isServer && d.serverRedisPool != nil {
if !d.serverRedisPool.Do("HEXISTS", d.Config.App, d.myAddr).Bool() {
if !d.serverRedisPool.Do("HEXISTS", conf.App, d.myAddr).Bool() {
d.logInfo("lost app registered info, re-registering")
if d.serverRedisPool.Do("HSET", d.Config.App, d.myAddr, d.Config.Weight).Error == nil {
d.serverRedisPool.Do("SETEX", d.Config.App+"_"+d.myAddr, 10, "1")
d.serverRedisPool.PUBLISH("CH_"+d.Config.App, fmt.Sprintf("%s %d", d.myAddr, d.Config.Weight))
if d.serverRedisPool.Do("HSET", conf.App, d.myAddr, conf.Weight).Error == nil {
d.serverRedisPool.Do("SETEX", conf.App+"_"+d.myAddr, 10, "1")
d.serverRedisPool.PUBLISH("CH_"+conf.App, fmt.Sprintf("%s %d", d.myAddr, conf.Weight))
}
} else {
d.serverRedisPool.Do("SETEX", d.Config.App+"_"+d.myAddr, 10, "1")
d.serverRedisPool.Do("SETEX", conf.App+"_"+d.myAddr, 10, "1")
}
}
}
@ -183,17 +212,18 @@ func (d *Discoverer) daemon() {
}
func (d *Discoverer) startSub() bool {
if d.Config.Registry == "" {
conf := d.GetConfig()
if conf.Registry == "" {
return true
}
d.appLock.Lock()
if d.clientRedisPool == nil {
d.clientRedisPool = redis.GetRedis(d.Config.Registry, d.logger)
d.clientRedisPool = redis.GetRedis(conf.Registry, d.logger)
}
if d.pubsubRedisPool == nil {
d.pubsubRedisPool = redis.GetRedis(d.Config.Registry, d.logger.New(id.MakeID(12)))
d.pubsubRedisPool = redis.GetRedis(conf.Registry, d.logger.New(id.MakeID(12)))
// 订阅所有已注册的应用
for app := range d.appSubscribed {
d.subscribeAppUnderLock(app)
@ -232,16 +262,25 @@ func (d *Discoverer) Stop() {
d.isClient = false
}
conf := d.GetConfig()
if d.isServer {
d.daemonRunning = false
d.daemonRunning.Store(false)
if d.serverRedisPool != nil {
d.serverRedisPool.Do("HDEL", d.Config.App, d.myAddr)
d.serverRedisPool.Do("DEL", d.Config.App+"_"+d.myAddr)
d.serverRedisPool.PUBLISH("CH_"+d.Config.App, fmt.Sprintf("%s %d", d.myAddr, 0))
d.serverRedisPool.Do("HDEL", conf.App, d.myAddr)
d.serverRedisPool.Do("DEL", conf.App+"_"+d.myAddr)
d.serverRedisPool.PUBLISH("CH_"+conf.App, fmt.Sprintf("%s %d", d.myAddr, 0))
}
d.isServer = false
}
d.appLock.Unlock()
// 释放 HTTP 连接池
d.appClientPoolsLock.Lock()
for _, client := range d.appClientPools {
client.Destroy()
}
d.appClientPools = make(map[string]*gohttp.Client)
d.appClientPoolsLock.Unlock()
}
// Wait 等待守护进程退出
@ -273,6 +312,7 @@ func (d *Discoverer) EasyStart() (string, int) {
_ = ln.Close()
port = addrInfo.Port
conf := d.GetConfig()
ip := addrInfo.IP
if !ip.IsGlobalUnicast() {
addrs, _ := net.InterfaceAddrs()
@ -282,7 +322,7 @@ func (d *Discoverer) EasyStart() (string, int) {
if ip4 == nil || !ip4.IsGlobalUnicast() {
continue
}
if d.Config.IpPrefix != "" && strings.HasPrefix(ip4.String(), d.Config.IpPrefix) {
if conf.IpPrefix != "" && strings.HasPrefix(ip4.String(), conf.IpPrefix) {
ip = ip4
break
}
@ -339,14 +379,16 @@ var numberMatcher = regexp.MustCompile(`^\d+(s|ms|us|µs|ns?)?$`)
func (d *Discoverer) addApp(app, callConf string, fetch bool) bool {
d.appLock.Lock()
if d.Config.Calls == nil {
d.Config.Calls = make(map[string]string)
conf := d.GetConfig()
if conf.Calls == nil {
conf.Calls = make(map[string]string)
}
if d.Config.Calls[app] == callConf && d.appNodes[app] != nil {
if conf.Calls[app] == callConf && d.appNodes[app] != nil {
d.appLock.Unlock()
return false
}
d.Config.Calls[app] = callConf
conf.Calls[app] = callConf
d.SetConfig(conf)
callInfo := &callInfoType{
Timeout: 10 * time.Second,
@ -437,10 +479,9 @@ func (d *Discoverer) getAppNodes(app string) map[string]*NodeInfo {
}
func (d *Discoverer) getCalls() map[string]string {
d.appLock.RLock()
defer d.appLock.RUnlock()
conf := d.GetConfig()
calls := make(map[string]string)
for k, v := range d.Config.Calls {
for k, v := range conf.Calls {
calls[k] = v
}
return calls

View File

@ -46,8 +46,10 @@ func TestDiscover(t *testing.T) {
defer server.Close()
// 配置 Discover
discover.DefaultDiscoverer.Config.App = "test-app"
discover.DefaultDiscoverer.Config.Registry = "redis://127.0.0.1:6379/15"
conf := discover.DefaultDiscoverer.GetConfig()
conf.App = "test-app"
conf.Registry = "redis://127.0.0.1:6379/15"
discover.DefaultDiscoverer.SetConfig(conf)
// 启动 Discover
if !discover.Start("127.0.0.1:18001") {

View File

@ -30,8 +30,10 @@ func TestMultipleDiscoverer(t *testing.T) {
// 实例 1
d1 := discover.NewDiscoverer()
d1.Config.App = "app1"
d1.Config.Registry = registry
c1conf := d1.GetConfig()
c1conf.App = "app1"
c1conf.Registry = registry
d1.SetConfig(c1conf)
if !d1.Start("127.0.0.1:18011") {
t.Skip("redis not available")
}
@ -39,8 +41,10 @@ func TestMultipleDiscoverer(t *testing.T) {
// 实例 2
d2 := discover.NewDiscoverer()
d2.Config.App = "app2"
d2.Config.Registry = registry
c2conf := d2.GetConfig()
c2conf.App = "app2"
c2conf.Registry = registry
d2.SetConfig(c2conf)
if !d2.Start("127.0.0.1:18012") {
t.Skip("redis not available")
}