discover/Caller.go

275 lines
7.7 KiB
Go
Raw Permalink Normal View History

package discover
import (
"fmt"
"net/http"
"reflect"
"strings"
"time"
"github.com/gorilla/websocket"
"apigo.cc/go/cast"
gohttp "apigo.cc/go/http"
"apigo.cc/go/log"
)
func (d *Discoverer) getHttpClient(app string, timeout time.Duration, h2c bool) *gohttp.Client {
d.appClientPoolsLock.RLock()
c := d.appClientPools[app]
d.appClientPoolsLock.RUnlock()
if c != nil {
return c
}
d.appClientPoolsLock.Lock()
defer d.appClientPoolsLock.Unlock()
c = d.appClientPools[app]
if c != nil {
return c
}
if h2c {
c = gohttp.NewClientH2C(timeout)
} else {
c = gohttp.NewClient(timeout)
}
d.appClientPools[app] = c
return c
}
// Caller 用于发起服务间调用
type Caller struct {
discoverer *Discoverer
Request *http.Request // 原始请求,用于透传 Header
NoBody bool // 是否不发送请求体
logger *log.Logger // 用于日志记录的 Logger
}
// NewCaller 创建一个新的调用器
func NewCaller(request *http.Request, logger *log.Logger) *Caller {
return DefaultDiscoverer.NewCaller(request, logger)
}
// NewCaller 创建一个新的调用器实例
func (d *Discoverer) NewCaller(request *http.Request, logger *log.Logger) *Caller {
return &Caller{discoverer: d, Request: request, logger: logger}
}
// logError 记录 Discover 调用器错误
func (c *Caller) logError(msg string, extra ...any) {
if c.logger == nil {
c.logger = log.DefaultLogger
}
c.logger.Error("Discover Caller: "+msg, extra...)
}
// Get 发起 GET 请求
func (c *Caller) Get(app, path string, headers ...string) *gohttp.Result {
return c.Do("GET", app, path, nil, headers...)
}
// Post 发起 POST 请求
func (c *Caller) Post(app, path string, data any, headers ...string) *gohttp.Result {
return c.Do("POST", app, path, data, headers...)
}
// Put 发起 PUT 请求
func (c *Caller) Put(app, path string, data any, headers ...string) *gohttp.Result {
return c.Do("PUT", app, path, data, headers...)
}
// Delete 发起 DELETE 请求
func (c *Caller) Delete(app, path string, data any, headers ...string) *gohttp.Result {
return c.Do("DELETE", app, path, data, headers...)
}
// Head 发起 HEAD 请求
func (c *Caller) Head(app, path string, headers ...string) *gohttp.Result {
return c.Do("HEAD", app, path, nil, headers...)
}
// Call 发起通用的泛型请求并自动解析响应
func Call[T any](method, app, path string, data any, headers ...string) (T, error) {
return CallT[T](DefaultDiscoverer.NewCaller(nil, nil), method, app, path, data, headers...)
}
// CallT 发起泛型请求并自动解析响应 (由于 Go 方法不支持泛型,故使用函数)
func CallT[T any](c *Caller, method, app, path string, data any, headers ...string) (T, error) {
var result T
res := c.Do(method, app, path, data, headers...)
if res.Error != nil {
return result, res.Error
}
err := res.To(&result)
return result, err
}
// Do 发起通用请求
func (c *Caller) Do(method, app, path string, data any, headers ...string) *gohttp.Result {
r, _ := c.DoWithNode(method, app, "", path, data, headers...)
return r
}
// Open 发起 WebSocket 连接
func (c *Caller) Open(app, path string, headers ...string) *websocket.Conn {
r, _ := c.doWithNode(false, "WS", app, "", path, nil, headers...)
if v, ok := r.(*websocket.Conn); ok {
return v
}
return nil
}
// DoWithNode 发起请求并返回结果及节点地址
func (c *Caller) DoWithNode(method, app, withNode, path string, data any, headers ...string) (*gohttp.Result, string) {
r, nodeAddr := c.doWithNode(false, method, app, withNode, path, data, headers...)
if v, ok := r.(*gohttp.Result); ok {
return v, nodeAddr
}
return nil, nodeAddr
}
// ManualDoWithNode 发起请求(手动处理响应)并返回结果及节点地址
func (c *Caller) ManualDoWithNode(method, app, withNode, path string, data any, headers ...string) (*gohttp.Result, string) {
r, nodeAddr := c.doWithNode(true, method, app, withNode, path, data, headers...)
if v, ok := r.(*gohttp.Result); ok {
return v, nodeAddr
}
return nil, nodeAddr
}
func (c *Caller) doWithNode(manual bool, method, app, withNode, path string, data any, headers ...string) (any, string) {
callerHeaders := make(map[string]string)
for i := 1; i < len(headers); i += 2 {
callerHeaders[headers[i-1]] = headers[i]
}
conf := c.discoverer.GetConfig()
if c.discoverer.isServer {
callerHeaders[HeaderFromApp] = conf.App
callerHeaders[HeaderFromNode] = c.discoverer.myAddr
}
callData := make(map[string]any)
if data != nil && !c.NoBody {
rv := cast.RealValue(reflect.ValueOf(data))
if rv.Kind() == reflect.Map || rv.Kind() == reflect.Struct {
cast.Convert(&callData, data)
}
}
appClient := AppClient{
discoverer: c.discoverer,
Logger: c.logger,
App: app,
Method: method,
Path: path,
Data: callData,
Headers: callerHeaders,
}
if c.discoverer.settedRoute != nil {
c.discoverer.settedRoute(&appClient, c.Request)
app = appClient.App
method = appClient.Method
path = appClient.Path
}
if !appClient.CheckApp(app) {
return &gohttp.Result{Error: fmt.Errorf("app %s not found", app)}, ""
}
callInfo := c.discoverer.getCallInfo(app)
if callInfo != nil && callInfo.Token != "" {
callerHeaders["Access-Token"] = callInfo.Token
}
settedHeaders := make([]string, 0, len(callerHeaders)*2)
for k, v := range callerHeaders {
settedHeaders = append(settedHeaders, k, v)
}
for {
node := appClient.NextWithNode(app, withNode, c.Request)
if node == nil {
break
}
node.UsedTimes.Add(1)
startTime := time.Now()
scheme := "http"
if callInfo != nil && callInfo.SSL {
scheme = "https"
}
hc := c.discoverer.getHttpClient(app, callInfo.Timeout, callInfo.HttpVersion == 2 && !callInfo.SSL)
hc.NoBody = c.NoBody
var res *gohttp.Result
var wsConn *websocket.Conn
url := fmt.Sprintf("%s://%s%s", scheme, node.Addr, path)
if strings.ToUpper(method) == "WS" {
dialer := websocket.DefaultDialer
h := http.Header{}
for i := 1; i < len(settedHeaders); i += 2 {
h.Set(settedHeaders[i-1], settedHeaders[i])
}
if scheme == "https" {
scheme = "wss"
} else {
scheme = "ws"
}
wsUrl := fmt.Sprintf("%s://%s%s", scheme, node.Addr, path)
conn, resp, err := dialer.Dial(wsUrl, h)
wsConn = conn
res = &gohttp.Result{Error: err, Response: resp}
} else {
if c.Request != nil {
if manual {
res = hc.ManualDoByRequest(c.Request, method, url, data, settedHeaders...)
} else {
res = hc.DoByRequest(c.Request, method, url, data, settedHeaders...)
}
} else {
if manual {
res = hc.ManualDo(method, url, data, settedHeaders...)
} else {
res = hc.Do(method, url, data, settedHeaders...)
}
}
}
responseTime := time.Since(startTime)
usedTimeMs := float32(responseTime.Nanoseconds()) / 1e6
c.discoverer.settedLoadBalancer.Response(&appClient, node, res.Error, res.Response, responseTime)
if res.Error != nil || (res.Response != nil && res.Response.StatusCode >= 502 && res.Response.StatusCode <= 504) {
node.FailedTimes.Add(1)
errStr := ""
if res.Error != nil {
errStr = res.Error.Error()
} else {
errStr = res.Response.Status
}
c.logError(errStr, "app", app, "node", node.Addr, "path", path, "attempts", appClient.attempts)
appClient.Log(node.Addr, usedTimeMs, fmt.Errorf("%s", errStr))
if node.FailedTimes.Load() >= int32(conf.CallRetryTimes) {
c.discoverer.logError("node isolated locally due to high failures", "app", app, "node", node.Addr)
}
continue
}
node.FailedTimes.Store(0)
appClient.Log(node.Addr, usedTimeMs, nil)
if strings.ToUpper(method) == "WS" {
return wsConn, node.Addr
}
return res, node.Addr
}
return &gohttp.Result{Error: fmt.Errorf("all nodes failed for %s %s", app, path)}, ""
}