package discover import ( "fmt" "net/http" "strings" "sync" "sync/atomic" "time" "apigo.cc/go/cast" gohttp "apigo.cc/go/http" "apigo.cc/go/id" "apigo.cc/go/log" "apigo.cc/go/redis" ) // Discoverer 发现服务实例 type Discoverer struct { config Config registry string app string serverRedisPool *redis.Redis clientRedisPool *redis.Redis pubsubRedisPool *redis.Redis isServer bool isClient bool daemonRunning atomic.Bool myAddr string logger *log.Logger daemonStopSignal chan struct{} daemonDoneSignal chan struct{} appLock sync.RWMutex calls map[string]CallConfig appNodes map[string]map[string]*NodeInfo appSubscribed map[string]bool appClientPools map[string]*gohttp.Client appClientPoolsLock sync.RWMutex settedRoute func(*AppClient, *http.Request) settedLoadBalancer LoadBalancer } // IsServer 返回当前节点是否作为服务端运行 func (d *Discoverer) IsServer() bool { return d.isServer } // IsClient 返回当前节点是否作为客户端运行 func (d *Discoverer) IsClient() bool { return d.isClient } func (d *Discoverer) logError(msg string, extra ...any) { d.logger.Error("Discover: "+msg, append(extra, "app", d.app, "addr", d.myAddr)...) } func (d *Discoverer) logInfo(msg string, extra ...any) { d.logger.Info("Discover: "+msg, append(extra, "app", d.app, "addr", d.myAddr)...) } // SetLogger 设置 Discover 使用的全局 Logger func (d *Discoverer) SetLogger(logger *log.Logger) { d.logger = logger } // New 创建一个新的发现服务实例 func New(logger *log.Logger, confs ...Config) *Discoverer { var conf Config if len(confs) > 0 { conf = confs[0] } if conf.CallRetryTimes <= 0 { conf.CallRetryTimes = 10 } if conf.Weight <= 0 { conf.Weight = 100 } if logger == nil { logger = log.DefaultLogger } d := &Discoverer{ config: conf, calls: make(map[string]CallConfig), appNodes: make(map[string]map[string]*NodeInfo), appSubscribed: make(map[string]bool), appClientPools: make(map[string]*gohttp.Client), settedLoadBalancer: &DefaultLoadBalancer{}, daemonStopSignal: make(chan struct{}), daemonDoneSignal: make(chan struct{}), logger: logger, } return d } // Start 启动服务发现,指定当前节点的外部访问地址 func Start(registry, app, addr string, logger *log.Logger, confs ...Config) *Discoverer { d := New(logger, confs...) d.registry = registry d.app = app d.myAddr = addr d.isServer = d.app != "" && d.config.Weight > 0 if d.isServer && d.registry != "" { d.serverRedisPool = redis.GetRedis(d.registry, d.logger) if d.serverRedisPool.Error != nil { d.logError(d.serverRedisPool.Error.Error()) } // 注册节点 if d.serverRedisPool.Do("HSET", d.app, addr, d.config.Weight).Error == nil { d.serverRedisPool.Do("SETEX", d.app+"_"+addr, 10, "1") d.logInfo("registered") d.serverRedisPool.PUBLISH("CH_"+d.app, fmt.Sprintf("%s %d", addr, d.config.Weight)) d.daemonRunning.Store(true) d.daemonStopSignal = make(chan struct{}) d.daemonDoneSignal = make(chan struct{}) go d.daemon() } else { d.logError("register failed") } } calls := d.config.Calls if len(calls) > 0 { for callApp, c := range calls { d.addApp(callApp, c, false) } if !d.startSub() { return d } } return d } // Open 启动服务发现仅作为客户端 func Open(registry string, logger *log.Logger, confs ...Config) *Discoverer { d := New(logger, confs...) d.registry = registry calls := d.config.Calls if len(calls) > 0 { for callApp, c := range calls { d.addApp(callApp, c, false) } } d.startSub() return d } func (d *Discoverer) daemon() { d.logInfo("daemon thread started") ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for d.daemonRunning.Load() { select { case <-ticker.C: if !d.daemonRunning.Load() { break } if d.isServer && d.serverRedisPool != nil { if !d.serverRedisPool.Do("HEXISTS", d.app, d.myAddr).Bool() { d.logInfo("lost app registered info, re-registering") if d.serverRedisPool.Do("HSET", d.app, d.myAddr, d.config.Weight).Error == nil { d.serverRedisPool.Do("SETEX", d.app+"_"+d.myAddr, 10, "1") d.serverRedisPool.PUBLISH("CH_"+d.app, fmt.Sprintf("%s %d", d.myAddr, d.config.Weight)) } } else { d.serverRedisPool.Do("SETEX", d.app+"_"+d.myAddr, 10, "1") } } case <-d.daemonStopSignal: goto done } } done: d.logInfo("daemon thread stopped") close(d.daemonDoneSignal) } func (d *Discoverer) startSub() bool { if d.registry == "" { return true } d.appLock.Lock() if d.clientRedisPool == nil { d.clientRedisPool = redis.GetRedis(d.registry, d.logger) } if d.pubsubRedisPool == nil { d.pubsubRedisPool = redis.GetRedis(d.registry, d.logger.New(id.MakeID(12))) // 订阅所有已注册的应用 for app := range d.appSubscribed { d.subscribeApp(app) } // 必须在释放锁之前完成配置,但在释放锁之后启动,避免死锁 d.appLock.Unlock() d.pubsubRedisPool.Start() d.appLock.Lock() } d.isClient = true d.appLock.Unlock() return true } func (d *Discoverer) subscribeApp(app string) { if d.pubsubRedisPool == nil { d.appSubscribed[app] = true return } d.pubsubRedisPool.Subscribe("CH_"+app, func() { d.fetchApp(app) }, func(data []byte) { a := strings.Split(string(data), " ") addr := a[0] weight := 0 if len(a) == 2 { weight = cast.Int(a[1]) } d.logInfo("received node update", "app", app, "addr", addr, "weight", weight) d.pushNode(app, addr, weight) }) } func (d *Discoverer) subscribeAppWithLock(app string) { d.appLock.Lock() defer d.appLock.Unlock() d.subscribeApp(app) } // Stop 停止 Discover 并从注册中心注销当前节点 func (d *Discoverer) Stop() { d.appLock.Lock() // 1. 提取需要的状态,提前修改标志位 isClient := d.isClient pubsub := d.pubsubRedisPool d.isClient = false isServer := d.isServer serverPool := d.serverRedisPool myAddr := d.myAddr if isServer { d.daemonRunning.Store(false) if d.daemonStopSignal != nil { close(d.daemonStopSignal) } d.isServer = false } // 核心修复:在这里尽早释放锁!避免与 pushNode 回调发生死锁 d.appLock.Unlock() // 2. 在无锁状态下进行耗时的网络和停止操作 if isClient && pubsub != nil { pubsub.Stop() } if isServer && serverPool != nil { serverPool.Do("HDEL", d.app, myAddr) serverPool.Do("DEL", d.app+"_"+myAddr) serverPool.PUBLISH("CH_"+d.app, fmt.Sprintf("%s %d", myAddr, 0)) } // 3. 释放 HTTP 连接池 d.appClientPoolsLock.Lock() for _, client := range d.appClientPools { client.Destroy() } d.appClientPools = make(map[string]*gohttp.Client) d.appClientPoolsLock.Unlock() // 4. 重置状态以支持重新启动 d.appLock.Lock() d.serverRedisPool = nil d.clientRedisPool = nil d.pubsubRedisPool = nil d.appNodes = make(map[string]map[string]*NodeInfo) d.appSubscribed = make(map[string]bool) d.appLock.Unlock() } // Wait 等待守护进程退出 func (d *Discoverer) Wait() { if d.daemonDoneSignal != nil { <-d.daemonDoneSignal } } // AddExternalApp 动态添加需要发现的外部应用 func (d *Discoverer) AddExternalApp(app string, callConf CallConfig) bool { if d.addApp(app, callConf, true) { if !d.isClient { d.startSub() } else { d.subscribeAppWithLock(app) } d.fetchApp(app) // 同步拉取一次 return true } return false } func (d *Discoverer) getCallInfo(app string) (CallConfig, bool) { d.appLock.RLock() defer d.appLock.RUnlock() info, exists := d.calls[app] return info, exists } func (d *Discoverer) addApp(app string, callConf CallConfig, fetch bool) bool { d.appLock.Lock() // 1. 写时复制(Copy-on-Write):创建一个全新的 Map 避免影响读操作 newCalls := make(map[string]CallConfig) for k, v := range d.config.Calls { newCalls[k] = v } if existing, ok := newCalls[app]; ok { // compare? simple enough to just overwrite if we want to be safe, but let's check basic equality or just overwrite _ = existing } if d.appNodes[app] != nil { // If nodes exist, we might just be updating config } newCalls[app] = callConf d.config.Calls = newCalls // 将新的 Map 赋值给 Config d.calls[app] = callConf if d.appNodes[app] == nil { d.appNodes[app] = make(map[string]*NodeInfo) } d.appSubscribed[app] = true d.appLock.Unlock() if fetch && d.isClient { d.fetchApp(app) } return true } func (d *Discoverer) fetchApp(app string) { d.appLock.RLock() pool := d.clientRedisPool d.appLock.RUnlock() if pool == nil { return } results := pool.Do("HGETALL", app).ResultMap() // 检查存活 for addr := range results { if !pool.Do("EXISTS", app+"_"+addr).Bool() { pool.Do("HDEL", app, addr) delete(results, addr) } } currentNodes := d.getAppNodes(app) if currentNodes != nil { for addr := range currentNodes { if _, ok := results[addr]; !ok { d.pushNode(app, addr, 0) } } } for addr, res := range results { d.pushNode(app, addr, res.Int()) } } func (d *Discoverer) getAppNodes(app string) map[string]*NodeInfo { d.appLock.RLock() defer d.appLock.RUnlock() if d.appNodes[app] == nil { return nil } nodes := make(map[string]*NodeInfo) for k, v := range d.appNodes[app] { nodes[k] = v } return nodes } // SetNode 手动设置某个服务的节点信息 func (d *Discoverer) SetNode(app, addr string, weight int) { d.pushNode(app, addr, weight) } // GetAppNodes 获取某个应用的所有节点列表 func (d *Discoverer) GetAppNodes(app string) map[string]*NodeInfo { return d.getAppNodes(app) } func (d *Discoverer) pushNode(app, addr string, weight int) { d.appLock.Lock() defer d.appLock.Unlock() if weight <= 0 { if d.appNodes[app] != nil { delete(d.appNodes[app], addr) } return } if d.appNodes[app] == nil { d.appNodes[app] = make(map[string]*NodeInfo) } if node, ok := d.appNodes[app][addr]; ok { if node.Weight != weight { used := node.UsedTimes.Load() node.UsedTimes.Store(uint64(float64(used) / float64(node.Weight) * float64(weight))) node.Weight = weight } } else { var avgUsed uint64 = 0 if len(d.appNodes[app]) > 0 { var totalScore float64 for _, n := range d.appNodes[app] { totalScore += float64(n.UsedTimes.Load()) / float64(n.Weight) } avgUsed = uint64(totalScore / float64(len(d.appNodes[app])) * float64(weight)) } node := &NodeInfo{ Addr: addr, Weight: weight, } node.UsedTimes.Store(avgUsed) d.appNodes[app][addr] = node } }