Compare commits

..

No commits in common. "v1.0.4" and "master" have entirely different histories.

14 changed files with 293 additions and 639 deletions

View File

@ -8,65 +8,58 @@ import (
// AppClient 用于管理单个请求的重试和负载均衡状态 // AppClient 用于管理单个请求的重试和负载均衡状态
type AppClient struct { type AppClient struct {
discoverer *Discoverer excludes map[string]bool
excludes map[string]bool // 本次请求已排除的节点 tryTimes int
attempts int // 本次请求的重试次数 Logger *log.Logger
Logger *log.Logger // 用于日志记录的 Logger App string
App string // 目标应用名称 Method string
Method string // 请求方法 Path string
Path string // 请求路径 Data *map[string]any
Data map[string]any // 请求数据 Headers *map[string]string
Headers map[string]string // 请求头
} }
// logError 记录 Discover 客户端错误 func (ac *AppClient) logError(error string, extra ...any) {
func (ac *AppClient) logError(msg string, extra ...any) {
if ac.Logger == nil { if ac.Logger == nil {
ac.Logger = log.DefaultLogger ac.Logger = log.DefaultLogger
} }
ac.Logger.Error("Discover Client: "+msg, extra...) ac.Logger.Error("Discover Client: "+error, extra...)
} }
// Next 获取下一个可用节点
func (ac *AppClient) Next(app string, request *http.Request) *NodeInfo { func (ac *AppClient) Next(app string, request *http.Request) *NodeInfo {
return ac.NextWithNode(app, "", request) return ac.NextWithNode(app, "", request)
} }
// CheckApp 检查并尝试添加应用
func (ac *AppClient) CheckApp(app string) bool { func (ac *AppClient) CheckApp(app string) bool {
nodes := ac.discoverer.GetAppNodes(app) nodes := getAppNodes(app)
if nodes == nil { if nodes == nil {
conf := ac.discoverer.GetConfig() if !addApp(app, "", true) {
if !ac.discoverer.AddExternalApp(app, "") { ac.logError("app not found", "app", app, "calls", Config.Calls)
ac.logError("app not found", "app", app, "calls", conf.Calls)
return false return false
} }
} }
return true return true
} }
// NextWithNode 获取下一个可用节点,支持指定节点
func (ac *AppClient) NextWithNode(app, withNode string, request *http.Request) *NodeInfo { func (ac *AppClient) NextWithNode(app, withNode string, request *http.Request) *NodeInfo {
if ac.excludes == nil { if ac.excludes == nil {
ac.excludes = make(map[string]bool) ac.excludes = make(map[string]bool)
} }
allNodes := ac.discoverer.GetAppNodes(app) allNodes := getAppNodes(app)
if len(allNodes) == 0 { if len(allNodes) == 0 {
ac.logError("node not found", "app", app) ac.logError("node not found", "app", app)
return nil return nil
} }
ac.attempts++ ac.tryTimes++
if withNode != "" { if withNode != "" {
ac.excludes[withNode] = true ac.excludes[withNode] = true
return allNodes[withNode] return allNodes[withNode]
} }
conf := ac.discoverer.GetConfig() readyNodes := make([]*NodeInfo, 0)
readyNodes := make([]*NodeInfo, 0, len(allNodes))
for _, node := range allNodes { for _, node := range allNodes {
if ac.excludes[node.Addr] || node.FailedTimes.Load() >= int32(conf.CallRetryTimes) { if ac.excludes[node.Addr] || node.FailedTimes >= Config.CallRetryTimes {
continue continue
} }
readyNodes = append(readyNodes, node) readyNodes = append(readyNodes, node)
@ -83,14 +76,14 @@ func (ac *AppClient) NextWithNode(app, withNode string, request *http.Request) *
var node *NodeInfo var node *NodeInfo
if len(readyNodes) > 0 { if len(readyNodes) > 0 {
node = ac.discoverer.settedLoadBalancer.Next(ac, readyNodes, request) node = settedLoadBalancer.Next(ac, readyNodes, request)
if node != nil { if node != nil {
ac.excludes[node.Addr] = true ac.excludes[node.Addr] = true
} }
} }
if node == nil { if node == nil {
ac.logError("no available node", "app", app, "attempts", ac.attempts) ac.logError("no available node", "app", app, "tryTimes", ac.tryTimes)
} }
return node return node

View File

@ -1,21 +1,9 @@
# CHANGELOG # CHANGELOG
## v1.0.3 (2026-05-05)
- 架构深度优化:将 HTTP 客户端连接池(`appClientPools`)移入 `Discoverer` 实例,实现完全的资源隔离。
- 并发安全增强:引入读写锁保护 `Config` 结构,防止高并发下的配置读写冲突。
- 生命周期管理优化:使用 `atomic.Bool` 管理 `daemonRunning` 状态,确保线程安全。
- 资源回收机制:在 `Stop()` 方法中新增 HTTP 连接池清理逻辑(调用 `Destroy` 释放闲置连接),防止内存与句柄泄漏。
- 接口严谨性:将 `Discoverer` 内部字段(如 `config`)设为私有,通过 `GetConfig`/`SetConfig` 统一访问。
## v1.0.2
- 架构重构:支持多 Discoverer 实例,消灭包级全局状态。
- 兼容性:保留包级 API 转发至 `DefaultDiscoverer`
## v1.0.1
- 优化代码规范:修复变量名冲突,改进命名语义。
- 性能优化:优化 `AppClient` 类型,减少寻址开销。
- 故障隔离:实现本地隔离机制,不再篡改全局 Redis 状态。
- 压力缓解:心跳间隔优化至 5 秒。
## v1.0.0 ## v1.0.0
- 初始版本:从 `ssgo/discover` 迁移并重构。 - 从 `ssgo/discover` 迁移至 `apigo.cc/go/discover`
- 采用全新的 `apigo.cc/go` 基础设施log, redis, http, cast, u
- 优化了注册中心同步机制,使用 `redis.Subscribe` 简化 PubSub 处理。
- 增强了负载均衡算法,引入更精确的得分计算。
- 统一了 Header 定义,对齐 `go/http` 标准。
- 移除所有 `panic`,通过 `error` 返回和日志记录确保系统稳定性。

View File

@ -5,6 +5,7 @@ import (
"net/http" "net/http"
"reflect" "reflect"
"strings" "strings"
"sync"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -13,17 +14,20 @@ import (
"apigo.cc/go/log" "apigo.cc/go/log"
) )
func (d *Discoverer) getHttpClient(app string, timeout time.Duration, h2c bool) *gohttp.Client { var appClientPools = make(map[string]*gohttp.Client)
d.appClientPoolsLock.RLock() var appClientPoolsLock sync.RWMutex
c := d.appClientPools[app]
d.appClientPoolsLock.RUnlock() func getHttpClient(app string, timeout time.Duration, h2c bool) *gohttp.Client {
appClientPoolsLock.RLock()
c := appClientPools[app]
appClientPoolsLock.RUnlock()
if c != nil { if c != nil {
return c return c
} }
d.appClientPoolsLock.Lock() appClientPoolsLock.Lock()
defer d.appClientPoolsLock.Unlock() defer appClientPoolsLock.Unlock()
c = d.appClientPools[app] c = appClientPools[app]
if c != nil { if c != nil {
return c return c
} }
@ -33,68 +37,52 @@ func (d *Discoverer) getHttpClient(app string, timeout time.Duration, h2c bool)
} else { } else {
c = gohttp.NewClient(timeout) c = gohttp.NewClient(timeout)
} }
d.appClientPools[app] = c appClientPools[app] = c
return c return c
} }
// Caller 用于发起服务间调用
type Caller struct { type Caller struct {
discoverer *Discoverer Request *http.Request
Request *http.Request // 原始请求,用于透传 Header NoBody bool
NoBody bool // 是否不发送请求体 logger *log.Logger
logger *log.Logger // 用于日志记录的 Logger
} }
// NewCaller 创建一个新的调用器
func NewCaller(request *http.Request, logger *log.Logger) *Caller { func NewCaller(request *http.Request, logger *log.Logger) *Caller {
return DefaultDiscoverer.NewCaller(request, logger) return &Caller{Request: request, logger: logger}
} }
// NewCaller 创建一个新的调用器实例 func (c *Caller) logError(error string, extra ...any) {
func (d *Discoverer) NewCaller(request *http.Request, logger *log.Logger) *Caller {
return &Caller{discoverer: d, Request: request, logger: logger}
}
// logError 记录 Discover 调用器错误
func (c *Caller) logError(msg string, extra ...any) {
if c.logger == nil { if c.logger == nil {
c.logger = log.DefaultLogger c.logger = log.DefaultLogger
} }
c.logger.Error("Discover Caller: "+msg, extra...) c.logger.Error("Discover Caller: "+error, extra...)
} }
// Get 发起 GET 请求
func (c *Caller) Get(app, path string, headers ...string) *gohttp.Result { func (c *Caller) Get(app, path string, headers ...string) *gohttp.Result {
return c.Do("GET", app, path, nil, headers...) return c.Do("GET", app, path, nil, headers...)
} }
// Post 发起 POST 请求
func (c *Caller) Post(app, path string, data any, headers ...string) *gohttp.Result { func (c *Caller) Post(app, path string, data any, headers ...string) *gohttp.Result {
return c.Do("POST", app, path, data, headers...) return c.Do("POST", app, path, data, headers...)
} }
// Put 发起 PUT 请求
func (c *Caller) Put(app, path string, data any, headers ...string) *gohttp.Result { func (c *Caller) Put(app, path string, data any, headers ...string) *gohttp.Result {
return c.Do("PUT", app, path, data, headers...) return c.Do("PUT", app, path, data, headers...)
} }
// Delete 发起 DELETE 请求
func (c *Caller) Delete(app, path string, data any, headers ...string) *gohttp.Result { func (c *Caller) Delete(app, path string, data any, headers ...string) *gohttp.Result {
return c.Do("DELETE", app, path, data, headers...) return c.Do("DELETE", app, path, data, headers...)
} }
// Head 发起 HEAD 请求
func (c *Caller) Head(app, path string, headers ...string) *gohttp.Result { func (c *Caller) Head(app, path string, headers ...string) *gohttp.Result {
return c.Do("HEAD", app, path, nil, headers...) return c.Do("HEAD", app, path, nil, headers...)
} }
// Do 发起通用请求
func (c *Caller) Do(method, app, path string, data any, headers ...string) *gohttp.Result { func (c *Caller) Do(method, app, path string, data any, headers ...string) *gohttp.Result {
r, _ := c.DoWithNode(method, app, "", path, data, headers...) r, _ := c.DoWithNode(method, app, "", path, data, headers...)
return r return r
} }
// Open 发起 WebSocket 连接
func (c *Caller) Open(app, path string, headers ...string) *websocket.Conn { func (c *Caller) Open(app, path string, headers ...string) *websocket.Conn {
r, _ := c.doWithNode(false, "WS", app, "", path, nil, headers...) r, _ := c.doWithNode(false, "WS", app, "", path, nil, headers...)
if v, ok := r.(*websocket.Conn); ok { if v, ok := r.(*websocket.Conn); ok {
@ -103,7 +91,6 @@ func (c *Caller) Open(app, path string, headers ...string) *websocket.Conn {
return nil return nil
} }
// DoWithNode 发起请求并返回结果及节点地址
func (c *Caller) DoWithNode(method, app, withNode, path string, data any, headers ...string) (*gohttp.Result, string) { func (c *Caller) DoWithNode(method, app, withNode, path string, data any, headers ...string) (*gohttp.Result, string) {
r, nodeAddr := c.doWithNode(false, method, app, withNode, path, data, headers...) r, nodeAddr := c.doWithNode(false, method, app, withNode, path, data, headers...)
if v, ok := r.(*gohttp.Result); ok { if v, ok := r.(*gohttp.Result); ok {
@ -112,7 +99,6 @@ func (c *Caller) DoWithNode(method, app, withNode, path string, data any, header
return nil, nodeAddr return nil, nodeAddr
} }
// ManualDoWithNode 发起请求(手动处理响应)并返回结果及节点地址
func (c *Caller) ManualDoWithNode(method, app, withNode, path string, data any, headers ...string) (*gohttp.Result, string) { func (c *Caller) ManualDoWithNode(method, app, withNode, path string, data any, headers ...string) (*gohttp.Result, string) {
r, nodeAddr := c.doWithNode(true, method, app, withNode, path, data, headers...) r, nodeAddr := c.doWithNode(true, method, app, withNode, path, data, headers...)
if v, ok := r.(*gohttp.Result); ok { if v, ok := r.(*gohttp.Result); ok {
@ -121,16 +107,15 @@ func (c *Caller) ManualDoWithNode(method, app, withNode, path string, data any,
return nil, nodeAddr return nil, nodeAddr
} }
func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, data any, headers ...string) (any, string) { func (c *Caller) doWithNode(manualDo bool, method, app, withNode, path string, data any, headers ...string) (any, string) {
callerHeaders := make(map[string]string) callerHeaders := make(map[string]string)
for i := 1; i < len(headers); i += 2 { for i := 1; i < len(headers); i += 2 {
callerHeaders[headers[i-1]] = headers[i] callerHeaders[headers[i-1]] = headers[i]
} }
conf := c.discoverer.GetConfig() if isServer {
if c.discoverer.isServer { callerHeaders[HeaderFromApp] = Config.App
callerHeaders[HeaderFromApp] = conf.App callerHeaders[HeaderFromNode] = myAddr
callerHeaders[HeaderFromNode] = c.discoverer.myAddr
} }
callData := make(map[string]any) callData := make(map[string]any)
@ -142,17 +127,16 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat
} }
appClient := AppClient{ appClient := AppClient{
discoverer: c.discoverer,
Logger: c.logger, Logger: c.logger,
App: app, App: app,
Method: method, Method: method,
Path: path, Path: path,
Data: callData, Data: &callData,
Headers: callerHeaders, Headers: &callerHeaders,
} }
if c.discoverer.settedRoute != nil { if settedRoute != nil {
c.discoverer.settedRoute(&appClient, c.Request) settedRoute(&appClient, c.Request)
app = appClient.App app = appClient.App
method = appClient.Method method = appClient.Method
path = appClient.Path path = appClient.Path
@ -162,7 +146,7 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat
return &gohttp.Result{Error: fmt.Errorf("app %s not found", app)}, "" return &gohttp.Result{Error: fmt.Errorf("app %s not found", app)}, ""
} }
callInfo := c.discoverer.getCallInfo(app) callInfo := getCallInfo(app)
if callInfo != nil && callInfo.Token != "" { if callInfo != nil && callInfo.Token != "" {
callerHeaders["Access-Token"] = callInfo.Token callerHeaders["Access-Token"] = callInfo.Token
} }
@ -178,14 +162,14 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat
break break
} }
node.UsedTimes.Add(1) node.UsedTimes++
startTime := time.Now() startTime := time.Now()
scheme := "http" scheme := "http"
if callInfo != nil && callInfo.SSL { if callInfo != nil && callInfo.SSL {
scheme = "https" scheme = "https"
} }
hc := c.discoverer.getHttpClient(app, callInfo.Timeout, callInfo.HttpVersion == 2 && !callInfo.SSL) hc := getHttpClient(app, callInfo.Timeout, callInfo.HttpVersion == 2 && !callInfo.SSL)
hc.NoBody = c.NoBody hc.NoBody = c.NoBody
var res *gohttp.Result var res *gohttp.Result
@ -210,13 +194,13 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat
res = &gohttp.Result{Error: err, Response: resp} res = &gohttp.Result{Error: err, Response: resp}
} else { } else {
if c.Request != nil { if c.Request != nil {
if manual { if manualDo {
res = hc.ManualDoByRequest(c.Request, method, url, data, settedHeaders...) res = hc.ManualDoByRequest(c.Request, method, url, data, settedHeaders...)
} else { } else {
res = hc.DoByRequest(c.Request, method, url, data, settedHeaders...) res = hc.DoByRequest(c.Request, method, url, data, settedHeaders...)
} }
} else { } else {
if manual { if manualDo {
res = hc.ManualDo(method, url, data, settedHeaders...) res = hc.ManualDo(method, url, data, settedHeaders...)
} else { } else {
res = hc.Do(method, url, data, settedHeaders...) res = hc.Do(method, url, data, settedHeaders...)
@ -225,11 +209,10 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat
} }
responseTime := time.Since(startTime) responseTime := time.Since(startTime)
usedTimeMs := float32(responseTime.Nanoseconds()) / 1e6 settedLoadBalancer.Response(&appClient, node, res.Error, res.Response, responseTime)
c.discoverer.settedLoadBalancer.Response(&appClient, node, res.Error, res.Response, responseTime)
if res.Error != nil || (res.Response != nil && res.Response.StatusCode >= 502 && res.Response.StatusCode <= 504) { if res.Error != nil || (res.Response != nil && res.Response.StatusCode >= 502 && res.Response.StatusCode <= 504) {
node.FailedTimes.Add(1) node.FailedTimes++
errStr := "" errStr := ""
if res.Error != nil { if res.Error != nil {
errStr = res.Error.Error() errStr = res.Error.Error()
@ -237,17 +220,18 @@ func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, dat
errStr = res.Response.Status errStr = res.Response.Status
} }
c.logError(errStr, "app", app, "node", node.Addr, "path", path, "attempts", appClient.attempts) c.logError(errStr, "app", app, "node", node.Addr, "path", path, "tryTimes", appClient.tryTimes)
appClient.Log(node.Addr, usedTimeMs, fmt.Errorf("%s", errStr))
if node.FailedTimes.Load() >= int32(conf.CallRetryTimes) { if node.FailedTimes >= Config.CallRetryTimes {
c.discoverer.logError("node isolated locally due to high failures", "app", app, "node", node.Addr) logError("node removed due to high failures", "app", app, "node", node.Addr)
if clientRedisPool != nil {
clientRedisPool.Do("HDEL", app, node.Addr)
clientRedisPool.PUBLISH("CH_"+app, fmt.Sprintf("%s 0", node.Addr))
}
} }
continue continue
} }
node.FailedTimes.Store(0)
appClient.Log(node.Addr, usedTimeMs, nil)
if strings.ToUpper(method) == "WS" { if strings.ToUpper(method) == "WS" {
return wsConn, node.Addr return wsConn, node.Addr
} }

View File

@ -1,37 +1,14 @@
package discover package discover
import ( // Config 存储发现服务的全局配置
"sync" var Config = struct {
)
// ConfigStruct 存储发现服务的配置
type ConfigStruct struct {
Registry string // 注册中心地址,如 redis://:@127.0.0.1:6379/15 Registry string // 注册中心地址,如 redis://:@127.0.0.1:6379/15
App string // 当前应用名称 App string // 当前应用名称
Weight int // 权重,默认为 100 Weight int // 权重,默认为 100
Calls map[string]string // 调用的应用列表及其配置 Calls map[string]string // 调用的应用列表及其配置
CallRetryTimes int // 调用重试次数 CallRetryTimes int // 调用重试次数
IpPrefix string // 指定使用的 IP 网段 IpPrefix string // 指定使用的 IP 网段
} }{
// Config 存储发现服务的全局配置(兼容旧代码)
var Config = ConfigStruct{
Weight: 100, Weight: 100,
CallRetryTimes: 10, CallRetryTimes: 10,
} }
var configLock sync.RWMutex
// SetConfig 安全地设置全局配置
func SetConfig(conf ConfigStruct) {
configLock.Lock()
defer configLock.Unlock()
Config = conf
}
// GetConfig 安全地获取全局配置
func GetConfig() ConfigStruct {
configLock.RLock()
defer configLock.RUnlock()
return Config
}

View File

@ -1,27 +1,35 @@
package discover package discover
import (
gohttp "apigo.cc/go/http"
)
const ( const (
HeaderFromApp = gohttp.HeaderFromApp HeaderFromApp = "X-Discover-From-App"
HeaderFromNode = gohttp.HeaderFromNode HeaderFromNode = "X-Discover-From-Node"
HeaderClientIP = gohttp.HeaderClientIP HeaderClientIp = "X-Client-Ip"
HeaderForwardedFor = gohttp.HeaderForwardedFor HeaderForwardedFor = "X-Forwarded-For"
HeaderUserID = gohttp.HeaderUserID HeaderUserId = "X-User-Id"
HeaderDeviceID = gohttp.HeaderDeviceID HeaderDeviceId = "X-Device-Id"
HeaderClientAppName = gohttp.HeaderClientAppName HeaderClientAppName = "X-Client-App-Name"
HeaderClientAppVersion = gohttp.HeaderClientAppVersion HeaderClientAppVersion = "X-Client-App-Version"
HeaderSessionID = gohttp.HeaderSessionID HeaderSessionId = "X-Session-Id"
HeaderRequestID = gohttp.HeaderRequestID HeaderRequestId = "X-Request-Id"
HeaderHost = gohttp.HeaderHost HeaderHost = "X-Host"
HeaderScheme = gohttp.HeaderScheme HeaderScheme = "X-Scheme"
HeaderUserAgent = gohttp.HeaderUserAgent HeaderUserAgent = "User-Agent"
) )
var RelayHeaders = gohttp.RelayHeaders var RelayHeaders = []string{
HeaderClientIp,
HeaderForwardedFor,
HeaderUserId,
HeaderDeviceId,
HeaderClientAppName,
HeaderClientAppVersion,
HeaderSessionId,
HeaderRequestId,
HeaderHost,
HeaderScheme,
HeaderUserAgent,
}
const DefaultRegistry = "127.0.0.1:6379::15" const DefaultRegistry = "127.0.0.1:6379::15"
const EnvRegistry = "DISCOVER_REGISTRY" const EnvRegistry = "DISCOVER_REGISTRY"

View File

@ -9,45 +9,36 @@ import (
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"syscall" "syscall"
"time" "time"
"apigo.cc/go/cast" "apigo.cc/go/cast"
"apigo.cc/go/config" "apigo.cc/go/config"
gohttp "apigo.cc/go/http"
"apigo.cc/go/id" "apigo.cc/go/id"
"apigo.cc/go/log" "apigo.cc/go/log"
"apigo.cc/go/redis" "apigo.cc/go/redis"
) )
// Discoverer 发现服务实例 var (
type Discoverer struct {
config ConfigStruct
configLock sync.RWMutex
serverRedisPool *redis.Redis serverRedisPool *redis.Redis
clientRedisPool *redis.Redis clientRedisPool *redis.Redis
pubsubRedisPool *redis.Redis pubsubRedisPool *redis.Redis
isServer bool isServer = false
isClient bool isClient = false
daemonRunning atomic.Bool daemonRunning = false
myAddr string myAddr = ""
logger *log.Logger _logger = log.DefaultLogger
inited bool _inited = false
daemonStopChan chan bool daemonStopChan chan bool
appLock sync.RWMutex appLock sync.RWMutex
calls map[string]*callInfoType _calls = map[string]*callInfoType{}
appNodes map[string]map[string]*NodeInfo _appNodes = map[string]map[string]*NodeInfo{}
appSubscribed map[string]bool appSubscribed = map[string]bool{}
appClientPools map[string]*gohttp.Client settedRoute func(*AppClient, *http.Request) = nil
appClientPoolsLock sync.RWMutex settedLoadBalancer LoadBalancer = &DefaultLoadBalancer{}
)
settedRoute func(*AppClient, *http.Request)
settedLoadBalancer LoadBalancer
}
type callInfoType struct { type callInfoType struct {
Timeout time.Duration Timeout time.Duration
@ -56,192 +47,138 @@ type callInfoType struct {
SSL bool SSL bool
} }
// DefaultDiscoverer 默认的全局发现服务实例 func IsServer() bool { return isServer }
var DefaultDiscoverer = NewDiscoverer() func IsClient() bool { return isClient }
// NewDiscoverer 创建一个新的发现服务实例 func logError(error string, extra ...any) {
func NewDiscoverer() *Discoverer { _logger.Error("Discover: "+error, append(extra, "app", Config.App, "addr", myAddr)...)
return &Discoverer{
config: ConfigStruct{
Weight: 100,
CallRetryTimes: 10,
},
logger: log.DefaultLogger,
calls: make(map[string]*callInfoType),
appNodes: make(map[string]map[string]*NodeInfo),
appSubscribed: make(map[string]bool),
appClientPools: make(map[string]*gohttp.Client),
settedLoadBalancer: &DefaultLoadBalancer{},
}
} }
// GetConfig 安全地获取配置 func logInfo(info string, extra ...any) {
func (d *Discoverer) GetConfig() ConfigStruct { _logger.Info("Discover: "+info, append(extra, "app", Config.App, "addr", myAddr)...)
d.configLock.RLock()
defer d.configLock.RUnlock()
return d.config
} }
// SetConfig 安全地设置配置 func SetLogger(logger *log.Logger) {
func (d *Discoverer) SetConfig(conf ConfigStruct) { _logger = logger
d.configLock.Lock()
defer d.configLock.Unlock()
d.config = conf
} }
// IsServer 返回当前节点是否作为服务端运行 func Init() {
func (d *Discoverer) IsServer() bool { return d.isServer } appLock.Lock()
defer appLock.Unlock()
// IsClient 返回当前节点是否作为客户端运行 if _inited {
func (d *Discoverer) IsClient() bool { return d.isClient }
func (d *Discoverer) logError(msg string, extra ...any) {
conf := d.GetConfig()
d.logger.Error("Discover: "+msg, append(extra, "app", conf.App, "addr", d.myAddr)...)
}
func (d *Discoverer) logInfo(msg string, extra ...any) {
conf := d.GetConfig()
d.logger.Info("Discover: "+msg, append(extra, "app", conf.App, "addr", d.myAddr)...)
}
// SetLogger 设置 Discover 使用的全局 Logger
func (d *Discoverer) SetLogger(logger *log.Logger) {
d.logger = logger
}
// Init 初始化 Discover 配置
func (d *Discoverer) Init() {
d.appLock.Lock()
defer d.appLock.Unlock()
if d.inited {
return return
} }
d.inited = true _inited = true
_ = config.Load(&Config, "discover")
conf := d.GetConfig() if Config.CallRetryTimes <= 0 {
// 如果是默认实例,尝试加载配置 Config.CallRetryTimes = 10
if d == DefaultDiscoverer { }
_ = config.Load(&conf, "discover") if Config.Weight <= 0 {
d.SetConfig(conf) Config.Weight = 100
SetConfig(conf) // 保持全局 Config 变量同步 }
if Config.Registry == "" {
Config.Registry = DefaultRegistry
} }
if conf.CallRetryTimes <= 0 { _logger = log.New(id.MakeID(12))
conf.CallRetryTimes = 10
}
if conf.Weight <= 0 {
conf.Weight = 100
}
if conf.Registry == "" {
conf.Registry = DefaultRegistry
}
d.SetConfig(conf)
if d.logger == log.DefaultLogger || d.logger == nil {
d.logger = log.New(id.MakeID(12))
}
} }
// Start 启动服务发现,指定当前节点的外部访问地址 func Start(addr string) bool {
func (d *Discoverer) Start(addr string) bool { Init()
d.Init() myAddr = addr
d.myAddr = addr
conf := d.GetConfig() isServer = Config.App != "" && Config.Weight > 0
d.isServer = conf.App != "" && conf.Weight > 0 if isServer && Config.Registry != "" {
if d.isServer && conf.Registry != "" { serverRedisPool = redis.GetRedis(Config.Registry, _logger)
d.serverRedisPool = redis.GetRedis(conf.Registry, d.logger) if serverRedisPool.Error != nil {
if d.serverRedisPool.Error != nil { logError(serverRedisPool.Error.Error())
d.logError(d.serverRedisPool.Error.Error())
} }
// 注册节点 // 注册节点
if d.serverRedisPool.Do("HSET", conf.App, addr, conf.Weight).Error == nil { if serverRedisPool.Do("HSET", Config.App, addr, Config.Weight).Error == nil {
d.serverRedisPool.Do("SETEX", conf.App+"_"+addr, 10, "1") serverRedisPool.Do("SETEX", Config.App+"_"+addr, 10, "1")
d.logInfo("registered") logInfo("registered")
d.serverRedisPool.PUBLISH("CH_"+conf.App, fmt.Sprintf("%s %d", addr, conf.Weight)) serverRedisPool.PUBLISH("CH_"+Config.App, fmt.Sprintf("%s %d", addr, Config.Weight))
d.daemonRunning.Store(true) daemonRunning = true
d.daemonStopChan = make(chan bool) daemonStopChan = make(chan bool)
go d.daemon() go daemon()
} else { } else {
d.logError("register failed") logError("register failed")
} }
} }
calls := d.getCalls() calls := getCalls()
if len(calls) > 0 { if len(calls) > 0 {
for app, c := range calls { for app, conf := range calls {
d.addApp(app, c, false) addApp(app, conf, false)
} }
if !d.startSub() { if !startSub() {
return false return false
} }
} }
return true return true
} }
func (d *Discoverer) daemon() { func daemon() {
d.logInfo("daemon thread started") logInfo("daemon thread started")
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(time.Second)
defer ticker.Stop() defer ticker.Stop()
for d.daemonRunning.Load() { for daemonRunning {
<-ticker.C <-ticker.C
if !d.daemonRunning.Load() { if !daemonRunning {
break break
} }
conf := d.GetConfig() if isServer && serverRedisPool != nil {
if d.isServer && d.serverRedisPool != nil { if !serverRedisPool.Do("HEXISTS", Config.App, myAddr).Bool() {
if !d.serverRedisPool.Do("HEXISTS", conf.App, d.myAddr).Bool() { logInfo("lost app registered info, re-registering")
d.logInfo("lost app registered info, re-registering") if serverRedisPool.Do("HSET", Config.App, myAddr, Config.Weight).Error == nil {
if d.serverRedisPool.Do("HSET", conf.App, d.myAddr, conf.Weight).Error == nil { serverRedisPool.Do("SETEX", Config.App+"_"+myAddr, 10, "1")
d.serverRedisPool.Do("SETEX", conf.App+"_"+d.myAddr, 10, "1") serverRedisPool.PUBLISH("CH_"+Config.App, fmt.Sprintf("%s %d", myAddr, Config.Weight))
d.serverRedisPool.PUBLISH("CH_"+conf.App, fmt.Sprintf("%s %d", d.myAddr, conf.Weight))
} }
} else { } else {
d.serverRedisPool.Do("SETEX", conf.App+"_"+d.myAddr, 10, "1") serverRedisPool.Do("SETEX", Config.App+"_"+myAddr, 10, "1")
} }
} }
} }
d.logInfo("daemon thread stopped") logInfo("daemon thread stopped")
if d.daemonStopChan != nil { if daemonStopChan != nil {
d.daemonStopChan <- true daemonStopChan <- true
} }
} }
func (d *Discoverer) startSub() bool { func startSub() bool {
conf := d.GetConfig() if Config.Registry == "" {
if conf.Registry == "" {
return true return true
} }
d.appLock.Lock() appLock.Lock()
if d.clientRedisPool == nil { if clientRedisPool == nil {
d.clientRedisPool = redis.GetRedis(conf.Registry, d.logger) clientRedisPool = redis.GetRedis(Config.Registry, _logger)
} }
if d.pubsubRedisPool == nil { if pubsubRedisPool == nil {
d.pubsubRedisPool = redis.GetRedis(conf.Registry, d.logger.New(id.MakeID(12))) pubsubRedisPool = redis.GetRedis(Config.Registry, _logger.New(id.MakeID(12)))
// 订阅所有已注册的应用 // 订阅所有已注册的应用
for app := range d.appSubscribed { for app := range appSubscribed {
d.subscribeAppUnderLock(app) subscribeAppUnderLock(app)
} }
// 必须在释放锁之前完成配置,但在释放锁之后启动,避免死锁 // 必须在释放锁之前完成配置,但在释放锁之后启动,避免死锁
d.appLock.Unlock() appLock.Unlock()
d.pubsubRedisPool.Start() pubsubRedisPool.Start()
d.appLock.Lock() appLock.Lock()
} }
d.isClient = true isClient = true
d.appLock.Unlock() appLock.Unlock()
return true return true
} }
func (d *Discoverer) subscribeAppUnderLock(app string) { func subscribeAppUnderLock(app string) {
d.pubsubRedisPool.Subscribe("CH_"+app, func() { pubsubRedisPool.Subscribe("CH_"+app, func() {
d.fetchApp(app) fetchApp(app)
}, func(data []byte) { }, func(data []byte) {
a := strings.Split(string(data), " ") a := strings.Split(string(data), " ")
addr := a[0] addr := a[0]
@ -249,51 +186,39 @@ func (d *Discoverer) subscribeAppUnderLock(app string) {
if len(a) == 2 { if len(a) == 2 {
weight = cast.Int(a[1]) weight = cast.Int(a[1])
} }
d.logInfo("received node update", "app", app, "addr", addr, "weight", weight) logInfo("received node update", "app", app, "addr", addr, "weight", weight)
d.pushNode(app, addr, weight) pushNode(app, addr, weight)
}) })
} }
// Stop 停止 Discover 并从注册中心注销当前节点 func Stop() {
func (d *Discoverer) Stop() { appLock.Lock()
d.appLock.Lock() if isClient && pubsubRedisPool != nil {
if d.isClient && d.pubsubRedisPool != nil { pubsubRedisPool.Stop()
d.pubsubRedisPool.Stop() isClient = false
d.isClient = false
} }
conf := d.GetConfig() if isServer {
if d.isServer { daemonRunning = false
d.daemonRunning.Store(false) if serverRedisPool != nil {
if d.serverRedisPool != nil { serverRedisPool.Do("HDEL", Config.App, myAddr)
d.serverRedisPool.Do("HDEL", conf.App, d.myAddr) serverRedisPool.Do("DEL", Config.App+"_"+myAddr)
d.serverRedisPool.Do("DEL", conf.App+"_"+d.myAddr) serverRedisPool.PUBLISH("CH_"+Config.App, fmt.Sprintf("%s %d", myAddr, 0))
d.serverRedisPool.PUBLISH("CH_"+conf.App, fmt.Sprintf("%s %d", d.myAddr, 0))
} }
d.isServer = false isServer = false
} }
d.appLock.Unlock() appLock.Unlock()
// 释放 HTTP 连接池
d.appClientPoolsLock.Lock()
for _, client := range d.appClientPools {
client.Destroy()
}
d.appClientPools = make(map[string]*gohttp.Client)
d.appClientPoolsLock.Unlock()
} }
// Wait 等待守护进程退出 func Wait() {
func (d *Discoverer) Wait() { if daemonStopChan != nil {
if d.daemonStopChan != nil { <-daemonStopChan
<-d.daemonStopChan daemonStopChan = nil
d.daemonStopChan = nil
} }
} }
// EasyStart 自动根据环境变量和本地网卡信息启动 Discover func EasyStart() (string, int) {
func (d *Discoverer) EasyStart() (string, int) { Init()
d.Init()
port := 0 port := 0
if listen := os.Getenv("DISCOVER_LISTEN"); listen != "" { if listen := os.Getenv("DISCOVER_LISTEN"); listen != "" {
if _, p, err := net.SplitHostPort(listen); err == nil { if _, p, err := net.SplitHostPort(listen); err == nil {
@ -305,14 +230,13 @@ func (d *Discoverer) EasyStart() (string, int) {
ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil { if err != nil {
d.logError("failed to listen", "err", err) logError("failed to listen", "err", err)
return "", 0 return "", 0
} }
addrInfo := ln.Addr().(*net.TCPAddr) addrInfo := ln.Addr().(*net.TCPAddr)
_ = ln.Close() _ = ln.Close()
port = addrInfo.Port port = addrInfo.Port
conf := d.GetConfig()
ip := addrInfo.IP ip := addrInfo.IP
if !ip.IsGlobalUnicast() { if !ip.IsGlobalUnicast() {
addrs, _ := net.InterfaceAddrs() addrs, _ := net.InterfaceAddrs()
@ -322,7 +246,7 @@ func (d *Discoverer) EasyStart() (string, int) {
if ip4 == nil || !ip4.IsGlobalUnicast() { if ip4 == nil || !ip4.IsGlobalUnicast() {
continue continue
} }
if conf.IpPrefix != "" && strings.HasPrefix(ip4.String(), conf.IpPrefix) { if Config.IpPrefix != "" && strings.HasPrefix(ip4.String(), Config.IpPrefix) {
ip = ip4 ip = ip4
break break
} }
@ -334,7 +258,7 @@ func (d *Discoverer) EasyStart() (string, int) {
} }
addr := fmt.Sprintf("%s:%d", ip.String(), port) addr := fmt.Sprintf("%s:%d", ip.String(), port)
if !d.Start(addr) { if !Start(addr) {
return "", 0 return "", 0
} }
@ -342,66 +266,48 @@ func (d *Discoverer) EasyStart() (string, int) {
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() { go func() {
<-sigChan <-sigChan
d.Stop() Stop()
}() }()
return ip.String(), port return ip.String(), port
} }
// AddExternalApp 动态添加需要发现的外部应用 func AddExternalApp(app, callConf string) bool {
func (d *Discoverer) AddExternalApp(app, callConf string) bool { if addApp(app, callConf, true) {
if d.addApp(app, callConf, true) { if !isClient {
if !d.isClient { startSub()
d.startSub()
} else { } else {
d.appLock.Lock() appLock.Lock()
d.subscribeAppUnderLock(app) subscribeAppUnderLock(app)
d.appLock.Unlock() appLock.Unlock()
d.fetchApp(app) // 同步拉取一次
} }
return true return true
} }
return false return false
} }
// SetNode 手动设置某个服务的节点信息 func SetNode(app, addr string, weight int) {
func (d *Discoverer) SetNode(app, addr string, weight int) { pushNode(app, addr, weight)
d.pushNode(app, addr, weight)
} }
func (d *Discoverer) getCallInfo(app string) *callInfoType { func getCallInfo(app string) *callInfoType {
d.appLock.RLock() appLock.RLock()
defer d.appLock.RUnlock() defer appLock.RUnlock()
return d.calls[app] return _calls[app]
} }
var numberMatcher = regexp.MustCompile(`^\d+(s|ms|us|µs|ns?)?$`) var numberMatcher = regexp.MustCompile(`^\d+(s|ms|us|µs|ns?)?$`)
func (d *Discoverer) addApp(app, callConf string, fetch bool) bool { func addApp(app, callConf string, fetch bool) bool {
d.appLock.Lock() appLock.Lock()
conf := d.GetConfig() if Config.Calls == nil {
Config.Calls = make(map[string]string)
// 1. 写时复制Copy-on-Write创建一个全新的 Map 避免影响读操作
newCalls := make(map[string]string)
for k, v := range conf.Calls {
newCalls[k] = v
} }
if Config.Calls[app] == callConf && _appNodes[app] != nil {
if newCalls[app] == callConf && d.appNodes[app] != nil { appLock.Unlock()
d.appLock.Unlock()
return false return false
} }
Config.Calls[app] = callConf
newCalls[app] = callConf
conf.Calls = newCalls // 将新的 Map 赋值给 ConfigStruct
// 2. 更新实例配置
d.SetConfig(conf)
// 3. 如果是默认的全局实例,保持包级全局配置同步
if d == DefaultDiscoverer {
SetConfig(conf)
}
callInfo := &callInfoType{ callInfo := &callInfoType{
Timeout: 10 * time.Second, Timeout: 10 * time.Second,
@ -433,23 +339,23 @@ func (d *Discoverer) addApp(app, callConf string, fetch bool) bool {
} }
} }
d.calls[app] = callInfo _calls[app] = callInfo
if d.appNodes[app] == nil { if _appNodes[app] == nil {
d.appNodes[app] = make(map[string]*NodeInfo) _appNodes[app] = make(map[string]*NodeInfo)
} }
d.appSubscribed[app] = true appSubscribed[app] = true
d.appLock.Unlock() appLock.Unlock()
if fetch && d.isClient { if fetch {
d.fetchApp(app) fetchApp(app)
} }
return true return true
} }
func (d *Discoverer) fetchApp(app string) { func fetchApp(app string) {
d.appLock.RLock() appLock.RLock()
pool := d.clientRedisPool pool := clientRedisPool
d.appLock.RUnlock() appLock.RUnlock()
if pool == nil { if pool == nil {
return return
} }
@ -464,131 +370,80 @@ func (d *Discoverer) fetchApp(app string) {
} }
} }
currentNodes := d.getAppNodes(app) currentNodes := getAppNodes(app)
if currentNodes != nil { if currentNodes != nil {
for addr := range currentNodes { for addr := range currentNodes {
if _, ok := results[addr]; !ok { if _, ok := results[addr]; !ok {
d.pushNode(app, addr, 0) pushNode(app, addr, 0)
} }
} }
} }
for addr, res := range results { for addr, res := range results {
d.pushNode(app, addr, res.Int()) pushNode(app, addr, res.Int())
} }
} }
func (d *Discoverer) getAppNodes(app string) map[string]*NodeInfo { func getAppNodes(app string) map[string]*NodeInfo {
d.appLock.RLock() appLock.RLock()
defer d.appLock.RUnlock() defer appLock.RUnlock()
if d.appNodes[app] == nil { if _appNodes[app] == nil {
return nil return nil
} }
nodes := make(map[string]*NodeInfo) nodes := make(map[string]*NodeInfo)
for k, v := range d.appNodes[app] { for k, v := range _appNodes[app] {
nodes[k] = v nodes[k] = v
} }
return nodes return nodes
} }
func (d *Discoverer) getCalls() map[string]string { func getCalls() map[string]string {
conf := d.GetConfig() appLock.RLock()
defer appLock.RUnlock()
calls := make(map[string]string) calls := make(map[string]string)
for k, v := range conf.Calls { for k, v := range Config.Calls {
calls[k] = v calls[k] = v
} }
return calls return calls
} }
// GetAppNodes 获取某个应用的所有节点列表 func GetAppNodes(app string) map[string]*NodeInfo {
func (d *Discoverer) GetAppNodes(app string) map[string]*NodeInfo { return getAppNodes(app)
return d.getAppNodes(app)
} }
func (d *Discoverer) pushNode(app, addr string, weight int) { func pushNode(app, addr string, weight int) {
d.appLock.Lock() appLock.Lock()
defer d.appLock.Unlock() defer appLock.Unlock()
if weight <= 0 { if weight <= 0 {
if d.appNodes[app] != nil { if _appNodes[app] != nil {
delete(d.appNodes[app], addr) delete(_appNodes[app], addr)
} }
return return
} }
if d.appNodes[app] == nil { if _appNodes[app] == nil {
d.appNodes[app] = make(map[string]*NodeInfo) _appNodes[app] = make(map[string]*NodeInfo)
} }
if node, ok := d.appNodes[app][addr]; ok { if node, ok := _appNodes[app][addr]; ok {
if node.Weight != weight { if node.Weight != weight {
used := node.UsedTimes.Load() node.UsedTimes = uint64(float64(node.UsedTimes) / float64(node.Weight) * float64(weight))
node.UsedTimes.Store(uint64(float64(used) / float64(node.Weight) * float64(weight)))
node.Weight = weight node.Weight = weight
} }
} else { } else {
var avgUsed uint64 = 0 var avgUsed uint64 = 0
if len(d.appNodes[app]) > 0 { if len(_appNodes[app]) > 0 {
var totalScore float64 var totalScore float64
for _, n := range d.appNodes[app] { for _, n := range _appNodes[app] {
totalScore += float64(n.UsedTimes.Load()) / float64(n.Weight) totalScore += float64(n.UsedTimes) / float64(n.Weight)
} }
avgUsed = uint64(totalScore / float64(len(d.appNodes[app])) * float64(weight)) avgUsed = uint64(totalScore / float64(len(_appNodes[app])) * float64(weight))
} }
node := &NodeInfo{ _appNodes[app][addr] = &NodeInfo{
Addr: addr, Addr: addr,
Weight: weight, Weight: weight,
} UsedTimes: avgUsed,
node.UsedTimes.Store(avgUsed)
d.appNodes[app][addr] = node
} }
} }
// 以下是包级别 API通过转发给 DefaultDiscoverer 实现兼容性
func IsServer() bool { return DefaultDiscoverer.IsServer() }
func IsClient() bool { return DefaultDiscoverer.IsClient() }
func logError(msg string, extra ...any) {
DefaultDiscoverer.logError(msg, extra...)
}
func logInfo(msg string, extra ...any) {
DefaultDiscoverer.logInfo(msg, extra...)
}
func SetLogger(logger *log.Logger) {
DefaultDiscoverer.SetLogger(logger)
}
func Init() {
DefaultDiscoverer.Init()
}
func Start(addr string) bool {
return DefaultDiscoverer.Start(addr)
}
func Stop() {
DefaultDiscoverer.Stop()
}
func Wait() {
DefaultDiscoverer.Wait()
}
func EasyStart() (string, int) {
return DefaultDiscoverer.EasyStart()
}
func AddExternalApp(app, callConf string) bool {
return DefaultDiscoverer.AddExternalApp(app, callConf)
}
func SetNode(app, addr string, weight int) {
DefaultDiscoverer.SetNode(app, addr, weight)
}
func GetAppNodes(app string) map[string]*NodeInfo {
return DefaultDiscoverer.GetAppNodes(app)
} }

View File

@ -46,10 +46,8 @@ func TestDiscover(t *testing.T) {
defer server.Close() defer server.Close()
// 配置 Discover // 配置 Discover
conf := discover.DefaultDiscoverer.GetConfig() discover.Config.App = "test-app"
conf.App = "test-app" discover.Config.Registry = "redis://127.0.0.1:6379/15"
conf.Registry = "redis://127.0.0.1:6379/15"
discover.DefaultDiscoverer.SetConfig(conf)
// 启动 Discover // 启动 Discover
if !discover.Start("127.0.0.1:18001") { if !discover.Start("127.0.0.1:18001") {
@ -140,24 +138,3 @@ func TestEasyStart(t *testing.T) {
fmt.Printf("EasyStart: %s:%d\n", ip, port) fmt.Printf("EasyStart: %s:%d\n", ip, port)
discover.Stop() discover.Stop()
} }
func BenchmarkDiscover(b *testing.B) {
discover.Config.App = "bench-app"
discover.SetNode("bench-app", "127.0.0.1:8080", 100)
discover.SetNode("bench-app", "127.0.0.1:8081", 100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
// 模拟一个不需要实际网络请求的调用过程,只测试 Discover 内部逻辑(负载均衡、节点选择等)
// 我们通过 Mock 或直接调用内部方法来实现
appClient := discover.AppClient{
App: "bench-app",
Method: "GET",
Path: "/",
}
node := appClient.Next("bench-app", nil)
if node == nil {
b.Fatal("no node")
}
}
}

View File

@ -7,12 +7,7 @@ import (
// SetLoadBalancer 设置全局负载均衡策略 // SetLoadBalancer 设置全局负载均衡策略
func SetLoadBalancer(lb LoadBalancer) { func SetLoadBalancer(lb LoadBalancer) {
DefaultDiscoverer.SetLoadBalancer(lb) settedLoadBalancer = lb
}
// SetLoadBalancer 设置负载均衡策略
func (d *Discoverer) SetLoadBalancer(lb LoadBalancer) {
d.settedLoadBalancer = lb
} }
// LoadBalancer 负载均衡接口 // LoadBalancer 负载均衡接口
@ -27,17 +22,20 @@ type LoadBalancer interface {
// DefaultLoadBalancer 默认负载均衡器(简单权重轮询/得分最小者优先) // DefaultLoadBalancer 默认负载均衡器(简单权重轮询/得分最小者优先)
type DefaultLoadBalancer struct{} type DefaultLoadBalancer struct{}
// Response 在默认负载均衡器中不再执行写操作,减少锁竞争
func (lb *DefaultLoadBalancer) Response(appClient *AppClient, node *NodeInfo, err error, response *http.Response, responseTime time.Duration) { func (lb *DefaultLoadBalancer) Response(appClient *AppClient, node *NodeInfo, err error, response *http.Response, responseTime time.Duration) {
node.Data.Store("score", float64(node.UsedTimes)/float64(node.Weight))
} }
// Next 根据得分UsedTimes / Weight选择得分最小的节点
func (lb *DefaultLoadBalancer) Next(appClient *AppClient, nodes []*NodeInfo, request *http.Request) *NodeInfo { func (lb *DefaultLoadBalancer) Next(appClient *AppClient, nodes []*NodeInfo, request *http.Request) *NodeInfo {
var minScore float64 = -1 var minScore float64 = -1
var minNode *NodeInfo var minNode *NodeInfo
for _, node := range nodes { for _, node := range nodes {
// 动态计算得分,避免使用 sync.Map 存储,减少内存分配和锁竞争 scoreValue, ok := node.Data.Load("score")
score := float64(node.UsedTimes.Load()) / float64(node.Weight) if !ok {
scoreValue = float64(node.UsedTimes) / float64(node.Weight)
node.Data.Store("score", scoreValue)
}
score := scoreValue.(float64)
if minNode == nil || score < minScore { if minNode == nil || score < minScore {
minScore = score minScore = score
minNode = node minNode = node

40
Log.go
View File

@ -1,40 +0,0 @@
package discover
import (
"apigo.cc/go/log"
)
const LogTypeDiscover = "discover"
type DiscoverLog struct {
log.BaseLog
App string
Method string
Path string
Node string
Attempts int
UsedTime float32
Error string
}
func (ac *AppClient) Log(node string, usedTime float32, err error) {
if ac.Logger == nil {
ac.Logger = log.DefaultLogger
}
if !ac.Logger.CheckLevel(log.INFO) {
return
}
entry := log.GetEntry[DiscoverLog]()
// 框架会自动调用 FillBase不再手动调用
entry.App = ac.App
entry.Method = ac.Method
entry.Path = ac.Path
entry.Node = node
entry.Attempts = ac.attempts
entry.UsedTime = usedTime
if err != nil {
entry.Error = err.Error()
}
ac.Logger.Log(entry)
}

View File

@ -1,78 +0,0 @@
package discover_test
import (
"fmt"
"net"
"net/http"
"testing"
"time"
"apigo.cc/go/discover"
)
func TestMultipleDiscoverer(t *testing.T) {
// 启动两个模拟服务
l1, _ := net.Listen("tcp", "127.0.0.1:18011")
mux1 := http.NewServeMux()
mux1.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte("OK1")) })
server1 := &http.Server{Handler: mux1}
go func() { _ = server1.Serve(l1) }()
defer server1.Close()
l2, _ := net.Listen("tcp", "127.0.0.1:18012")
mux2 := http.NewServeMux()
mux2.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte("OK2")) })
server2 := &http.Server{Handler: mux2}
go func() { _ = server2.Serve(l2) }()
defer server2.Close()
registry := "redis://127.0.0.1:6379/15"
// 实例 1
d1 := discover.NewDiscoverer()
c1conf := d1.GetConfig()
c1conf.App = "app1"
c1conf.Registry = registry
d1.SetConfig(c1conf)
if !d1.Start("127.0.0.1:18011") {
t.Skip("redis not available")
}
defer d1.Stop()
// 实例 2
d2 := discover.NewDiscoverer()
c2conf := d2.GetConfig()
c2conf.App = "app2"
c2conf.Registry = registry
d2.SetConfig(c2conf)
if !d2.Start("127.0.0.1:18012") {
t.Skip("redis not available")
}
defer d2.Stop()
// 实例 1 发现并调用自己
d1.AddExternalApp("app1", "1")
time.Sleep(200 * time.Millisecond) // 等待同步
c1 := d1.NewCaller(nil, nil)
res1 := c1.Get("app1", "/")
if res1.Error != nil || res1.String() != "OK1" {
t.Errorf("d1 call app1 failed: %v, %s", res1.Error, res1.String())
}
// 实例 2 发现并调用 实例 1
d2.AddExternalApp("app1", "1")
time.Sleep(200 * time.Millisecond) // 等待同步
c2 := d2.NewCaller(nil, nil)
res2 := c2.Get("app1", "/")
if res2.Error != nil || res2.String() != "OK1" {
t.Errorf("d2 call app1 failed: %v, %s", res2.Error, res2.String())
}
// 验证独立性d1 不应该能直接调用 app2 (除非手动 AddExternalApp)
res3 := c1.Get("app2", "/")
if res3.Error == nil {
t.Error("d1 should not find app2 without AddExternalApp")
}
fmt.Println("Multiple Discoverer instances verified")
}

View File

@ -2,14 +2,13 @@ package discover
import ( import (
"sync" "sync"
"sync/atomic"
) )
// NodeInfo 存储服务节点信息 // NodeInfo 存储服务节点信息
type NodeInfo struct { type NodeInfo struct {
Addr string // 节点地址 Addr string // 节点地址
Weight int // 节点权重 Weight int // 节点权重
UsedTimes atomic.Uint64 // 已使用次数 UsedTimes uint64 // 已使用次数
FailedTimes atomic.Int32 // 失败次数 FailedTimes int // 失败次数
Data sync.Map // 运行时自定义数据 Data sync.Map // 运行时自定义数据
} }

View File

@ -2,12 +2,7 @@ package discover
import "net/http" import "net/http"
// SetRoute 设置全局路由规则 // SetRoute 设置全局路由规则,可以在请求前修改 App、Method、Path 等信息
func SetRoute(route func(appClient *AppClient, request *http.Request)) { func SetRoute(route func(appClient *AppClient, request *http.Request)) {
DefaultDiscoverer.SetRoute(route) settedRoute = route
}
// SetRoute 设置路由规则
func (d *Discoverer) SetRoute(route func(appClient *AppClient, request *http.Request)) {
d.settedRoute = route
} }

View File

@ -5,7 +5,6 @@
2. **实时同步**: 验证通过 Redis PUBLISH 更新节点信息后,客户端能实时感知并更新本地节点列表。 2. **实时同步**: 验证通过 Redis PUBLISH 更新节点信息后,客户端能实时感知并更新本地节点列表。
3. **故障剔除**: 验证当节点调用持续失败时,能自动从本地列表中剔除。 3. **故障剔除**: 验证当节点调用持续失败时,能自动从本地列表中剔除。
4. **环境变量配置**: 验证 `EasyStart` 结合环境变量的启动流程。 4. **环境变量配置**: 验证 `EasyStart` 结合环境变量的启动流程。
5. **高效日志记录**: 验证 `DiscoverLog` 通过对象池和 `FillBase` 机制实现的高性能异步日志。
## 测试结果 ## 测试结果
- **Unit Tests**: `go test -v ./...` - **Unit Tests**: `go test -v ./...`
@ -13,5 +12,4 @@
- `TestEasyStart`: PASS - `TestEasyStart`: PASS
## Benchmark ## Benchmark
- `BenchmarkDiscover`: ~560 ns/op (Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz) - 待补充Discover 主要性能开销在负载均衡算法选择,单次选择耗时极低)。
- 负载均衡选择节点耗时极低,适合高并发场景。

2
go.mod
View File

@ -7,7 +7,7 @@ require (
apigo.cc/go/config v1.0.4 apigo.cc/go/config v1.0.4
apigo.cc/go/http v1.0.3 apigo.cc/go/http v1.0.3
apigo.cc/go/id v1.0.4 apigo.cc/go/id v1.0.4
apigo.cc/go/log v1.1.0 apigo.cc/go/log v1.0.2
apigo.cc/go/redis v1.0.2 apigo.cc/go/redis v1.0.2
github.com/gorilla/websocket v1.5.3 github.com/gorilla/websocket v1.5.3
) )