discover/Discover.go

449 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package discover
import (
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"apigo.cc/go/cast"
gohttp "apigo.cc/go/http"
"apigo.cc/go/id"
"apigo.cc/go/log"
"apigo.cc/go/redis"
)
// Discoverer 发现服务实例
type Discoverer struct {
config Config
registry string
app string
serverRedisPool *redis.Redis
clientRedisPool *redis.Redis
pubsubRedisPool *redis.Redis
isServer bool
isClient bool
daemonRunning atomic.Bool
myAddr string
logger *log.Logger
daemonStopSignal chan struct{}
daemonDoneSignal chan struct{}
appLock sync.RWMutex
calls map[string]CallConfig
appNodes map[string]map[string]*NodeInfo
appSubscribed map[string]bool
appClientPools map[string]*gohttp.Client
appClientPoolsLock sync.RWMutex
settedRoute func(*AppClient, *http.Request)
settedLoadBalancer LoadBalancer
}
// IsServer 返回当前节点是否作为服务端运行
func (d *Discoverer) IsServer() bool { return d.isServer }
// IsClient 返回当前节点是否作为客户端运行
func (d *Discoverer) IsClient() bool { return d.isClient }
func (d *Discoverer) logError(msg string, extra ...any) {
d.logger.Error("Discover: "+msg, append(extra, "app", d.app, "addr", d.myAddr)...)
}
func (d *Discoverer) logInfo(msg string, extra ...any) {
d.logger.Info("Discover: "+msg, append(extra, "app", d.app, "addr", d.myAddr)...)
}
// SetLogger 设置 Discover 使用的全局 Logger
func (d *Discoverer) SetLogger(logger *log.Logger) {
d.logger = logger
}
// New 创建一个新的发现服务实例
func New(logger *log.Logger, confs ...Config) *Discoverer {
var conf Config
if len(confs) > 0 {
conf = confs[0]
}
if conf.CallRetryTimes <= 0 {
conf.CallRetryTimes = 10
}
if conf.Weight <= 0 {
conf.Weight = 100
}
if logger == nil {
logger = log.DefaultLogger
}
d := &Discoverer{
config: conf,
calls: make(map[string]CallConfig),
appNodes: make(map[string]map[string]*NodeInfo),
appSubscribed: make(map[string]bool),
appClientPools: make(map[string]*gohttp.Client),
settedLoadBalancer: &DefaultLoadBalancer{},
daemonStopSignal: make(chan struct{}),
daemonDoneSignal: make(chan struct{}),
logger: logger,
}
return d
}
// Start 启动服务发现,指定当前节点的外部访问地址
func Start(registry, app, addr string, logger *log.Logger, confs ...Config) *Discoverer {
d := New(logger, confs...)
d.registry = registry
d.app = app
d.myAddr = addr
d.isServer = d.app != "" && d.config.Weight > 0
if d.isServer && d.registry != "" {
d.serverRedisPool = redis.GetRedis(d.registry, d.logger)
if d.serverRedisPool.Error != nil {
d.logError(d.serverRedisPool.Error.Error())
}
// 注册节点
if d.serverRedisPool.Do("HSET", d.app, addr, d.config.Weight).Error == nil {
d.serverRedisPool.Do("SETEX", d.app+"_"+addr, 10, "1")
d.logInfo("registered")
d.serverRedisPool.PUBLISH("CH_"+d.app, fmt.Sprintf("%s %d", addr, d.config.Weight))
d.daemonRunning.Store(true)
d.daemonStopSignal = make(chan struct{})
d.daemonDoneSignal = make(chan struct{})
go d.daemon()
} else {
d.logError("register failed")
}
}
calls := d.config.Calls
if len(calls) > 0 {
for callApp, c := range calls {
d.addApp(callApp, c, false)
}
if !d.startSub() {
return d
}
}
return d
}
// Open 启动服务发现仅作为客户端
func Open(registry string, logger *log.Logger, confs ...Config) *Discoverer {
d := New(logger, confs...)
d.registry = registry
calls := d.config.Calls
if len(calls) > 0 {
for callApp, c := range calls {
d.addApp(callApp, c, false)
}
}
d.startSub()
return d
}
func (d *Discoverer) daemon() {
d.logInfo("daemon thread started")
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for d.daemonRunning.Load() {
select {
case <-ticker.C:
if !d.daemonRunning.Load() {
break
}
if d.isServer && d.serverRedisPool != nil {
if !d.serverRedisPool.Do("HEXISTS", d.app, d.myAddr).Bool() {
d.logInfo("lost app registered info, re-registering")
if d.serverRedisPool.Do("HSET", d.app, d.myAddr, d.config.Weight).Error == nil {
d.serverRedisPool.Do("SETEX", d.app+"_"+d.myAddr, 10, "1")
d.serverRedisPool.PUBLISH("CH_"+d.app, fmt.Sprintf("%s %d", d.myAddr, d.config.Weight))
}
} else {
d.serverRedisPool.Do("SETEX", d.app+"_"+d.myAddr, 10, "1")
}
}
case <-d.daemonStopSignal:
goto done
}
}
done:
d.logInfo("daemon thread stopped")
close(d.daemonDoneSignal)
}
func (d *Discoverer) startSub() bool {
if d.registry == "" {
return true
}
d.appLock.Lock()
if d.clientRedisPool == nil {
d.clientRedisPool = redis.GetRedis(d.registry, d.logger)
}
if d.pubsubRedisPool == nil {
d.pubsubRedisPool = redis.GetRedis(d.registry, d.logger.New(id.Get12BytesUltraPerSecond()))
// 订阅所有已注册的应用
for app := range d.appSubscribed {
d.subscribeApp(app)
}
// 必须在释放锁之前完成配置,但在释放锁之后启动,避免死锁
d.appLock.Unlock()
d.pubsubRedisPool.Start()
d.appLock.Lock()
}
d.isClient = true
d.appLock.Unlock()
return true
}
func (d *Discoverer) subscribeApp(app string) {
if d.pubsubRedisPool == nil {
d.appSubscribed[app] = true
return
}
d.pubsubRedisPool.Subscribe("CH_"+app, func() {
d.fetchApp(app)
}, func(data []byte) {
a := strings.Split(string(data), " ")
addr := a[0]
weight := 0
if len(a) == 2 {
weight = cast.Int(a[1])
}
d.logInfo("received node update", "app", app, "addr", addr, "weight", weight)
d.pushNode(app, addr, weight)
})
}
func (d *Discoverer) subscribeAppWithLock(app string) {
d.appLock.Lock()
defer d.appLock.Unlock()
d.subscribeApp(app)
}
// Stop 停止 Discover 并从注册中心注销当前节点
func (d *Discoverer) Stop() {
d.appLock.Lock()
// 1. 提取需要的状态,提前修改标志位
isClient := d.isClient
pubsub := d.pubsubRedisPool
d.isClient = false
isServer := d.isServer
serverPool := d.serverRedisPool
myAddr := d.myAddr
if isServer {
d.daemonRunning.Store(false)
if d.daemonStopSignal != nil {
close(d.daemonStopSignal)
}
d.isServer = false
}
// 核心修复:在这里尽早释放锁!避免与 pushNode 回调发生死锁
d.appLock.Unlock()
// 2. 在无锁状态下进行耗时的网络和停止操作
if isClient && pubsub != nil {
pubsub.Stop()
}
if isServer && serverPool != nil {
serverPool.Do("HDEL", d.app, myAddr)
serverPool.Do("DEL", d.app+"_"+myAddr)
serverPool.PUBLISH("CH_"+d.app, fmt.Sprintf("%s %d", myAddr, 0))
}
// 3. 释放 HTTP 连接池
d.appClientPoolsLock.Lock()
for _, client := range d.appClientPools {
client.Destroy()
}
d.appClientPools = make(map[string]*gohttp.Client)
d.appClientPoolsLock.Unlock()
// 4. 重置状态以支持重新启动
d.appLock.Lock()
d.serverRedisPool = nil
d.clientRedisPool = nil
d.pubsubRedisPool = nil
d.appNodes = make(map[string]map[string]*NodeInfo)
d.appSubscribed = make(map[string]bool)
d.appLock.Unlock()
}
// Wait 等待守护进程退出
func (d *Discoverer) Wait() {
if d.daemonDoneSignal != nil {
<-d.daemonDoneSignal
}
}
// AddExternalApp 动态添加需要发现的外部应用
func (d *Discoverer) AddExternalApp(app string, callConf CallConfig) bool {
if d.addApp(app, callConf, true) {
if !d.isClient {
d.startSub()
} else {
d.subscribeAppWithLock(app)
}
d.fetchApp(app) // 同步拉取一次
return true
}
return false
}
func (d *Discoverer) getCallInfo(app string) (CallConfig, bool) {
d.appLock.RLock()
defer d.appLock.RUnlock()
info, exists := d.calls[app]
return info, exists
}
func (d *Discoverer) addApp(app string, callConf CallConfig, fetch bool) bool {
d.appLock.Lock()
// 1. 写时复制Copy-on-Write创建一个全新的 Map 避免影响读操作
newCalls := make(map[string]CallConfig)
for k, v := range d.config.Calls {
newCalls[k] = v
}
if existing, ok := newCalls[app]; ok {
// compare? simple enough to just overwrite if we want to be safe, but let's check basic equality or just overwrite
_ = existing
}
if d.appNodes[app] != nil {
// If nodes exist, we might just be updating config
}
newCalls[app] = callConf
d.config.Calls = newCalls // 将新的 Map 赋值给 Config
d.calls[app] = callConf
if d.appNodes[app] == nil {
d.appNodes[app] = make(map[string]*NodeInfo)
}
d.appSubscribed[app] = true
d.appLock.Unlock()
if fetch && d.isClient {
d.fetchApp(app)
}
return true
}
func (d *Discoverer) fetchApp(app string) {
d.appLock.RLock()
pool := d.clientRedisPool
d.appLock.RUnlock()
if pool == nil {
return
}
results := pool.Do("HGETALL", app).ResultMap()
// 检查存活
for addr := range results {
if !pool.Do("EXISTS", app+"_"+addr).Bool() {
pool.Do("HDEL", app, addr)
delete(results, addr)
}
}
currentNodes := d.getAppNodes(app)
if currentNodes != nil {
for addr := range currentNodes {
if _, ok := results[addr]; !ok {
d.pushNode(app, addr, 0)
}
}
}
for addr, res := range results {
d.pushNode(app, addr, res.Int())
}
}
func (d *Discoverer) getAppNodes(app string) map[string]*NodeInfo {
d.appLock.RLock()
defer d.appLock.RUnlock()
if d.appNodes[app] == nil {
return nil
}
nodes := make(map[string]*NodeInfo)
for k, v := range d.appNodes[app] {
nodes[k] = v
}
return nodes
}
// SetNode 手动设置某个服务的节点信息
func (d *Discoverer) SetNode(app, addr string, weight int) {
d.pushNode(app, addr, weight)
}
// GetAppNodes 获取某个应用的所有节点列表
func (d *Discoverer) GetAppNodes(app string) map[string]*NodeInfo {
return d.getAppNodes(app)
}
func (d *Discoverer) pushNode(app, addr string, weight int) {
d.appLock.Lock()
defer d.appLock.Unlock()
if weight <= 0 {
if d.appNodes[app] != nil {
delete(d.appNodes[app], addr)
}
return
}
if d.appNodes[app] == nil {
d.appNodes[app] = make(map[string]*NodeInfo)
}
if node, ok := d.appNodes[app][addr]; ok {
if node.Weight != weight {
used := node.UsedTimes.Load()
node.UsedTimes.Store(uint64(float64(used) / float64(node.Weight) * float64(weight)))
node.Weight = weight
}
} else {
var avgUsed uint64 = 0
if len(d.appNodes[app]) > 0 {
var totalScore float64
for _, n := range d.appNodes[app] {
totalScore += float64(n.UsedTimes.Load()) / float64(n.Weight)
}
avgUsed = uint64(totalScore / float64(len(d.appNodes[app])) * float64(weight))
}
node := &NodeInfo{
Addr: addr,
Weight: weight,
}
node.UsedTimes.Store(avgUsed)
d.appNodes[app][addr] = node
}
}