package service import ( "apigo.cloud/git/apigo/gojs" "apigo.cloud/git/apigo/plugin" "errors" "fmt" "github.com/gorilla/websocket" "github.com/ssgo/discover" "github.com/ssgo/httpclient" "github.com/ssgo/log" "github.com/ssgo/redis" "github.com/ssgo/s" "github.com/ssgo/u" "net/http" "path/filepath" "regexp" "strings" "sync" "time" ) // 常驻内存的API池 type APICache struct { Filename string Mtime int64 Code *gojs.PreCompiledCode } var _apiCache = map[string]*APICache{} var _apiCacheLock = sync.RWMutex{} type APIConfig struct { AuthLevel int Method string Host string Path string Memo string ActionFile string } func (conf *APIConfig) String() string { return fmt.Sprintf("%s_%s_%s_%d", conf.Host, conf.Method, conf.Path, conf.AuthLevel) } var apiPath = "api" var apiHotLoadInterval = 5000 var _data = map[string]any{} var _dataLock = sync.RWMutex{} type Session struct { id string conn *redis.Redis data map[string]any } var sessionRedis *redis.Redis var sessionTimeout = 3600 var memorySessionData = map[string]map[string]any{} var memorySessionDataLock = sync.RWMutex{} func NewSession(id string, logger *log.Logger) *Session { data := map[string]any{} conn := sessionRedis if sessionRedis == nil { memorySessionDataLock.RLock() data1 := memorySessionData[id] memorySessionDataLock.RUnlock() if data1 != nil { data = data1 } } else { conn = sessionRedis.CopyByLogger(logger) _ = conn.GET("SESS_" + id).To(&data) } return &Session{ id: id, conn: conn, data: data, } } func (session *Session) Set(key string, value any) { session.data[key] = value } func (session *Session) MSet(data map[string]any) { for key, value := range data { session.data[key] = value } } func (session *Session) Get(key string) any { return session.data[key] } func (session *Session) MGet(keys ...string) map[string]any { out := make(map[string]any) for _, key := range keys { out[key] = session.data[key] } return out } func (session *Session) Remove(keys ...string) { for _, key := range keys { delete(session.data, key) } } func (session *Session) Save() { if session.conn == nil { session.data["_time"] = time.Now().Unix() memorySessionDataLock.Lock() memorySessionData[session.id] = session.data memorySessionDataLock.Unlock() } else { session.conn.SETEX("SESS_"+session.id, sessionTimeout, session.data) } } type ProxyByResult struct { AuthLevel int ToApp *string ToPath *string Headers map[string]string } var apiMatcher = regexp.MustCompile(`(?im)^\s*//(.*?)(GET|POST|PUT|DELETE|HEAD|OPTION|WS|\*)\s+(/\S*)(.*?)$`) var authLevelMatcher = regexp.MustCompile(`(?i)AuthLevel:\s*(\S+)`) var hostMatcher = regexp.MustCompile(`(?i)Host:\s*(\S+)`) func autoLoadAPI(root string) { for _, f := range u.ReadDirN(root) { if !strings.HasPrefix(f.Name, ".") && !strings.HasPrefix(f.Name, "_") { filename := filepath.Join(root, f.Name) if f.IsDir { autoLoadAPI(filename) } else if strings.HasSuffix(f.Name, ".js") { if m := apiMatcher.FindStringSubmatch(u.ReadFileN(filename)); m != nil { method := strings.ToUpper(m[2]) if method == "*" { method = "" } path := m[3] authLevel := 0 host := "" memo := m[1] + " " + m[4] if m2 := authLevelMatcher.FindStringSubmatch(memo); m2 != nil { authLevel = u.Int(m2[1]) memo = strings.Replace(memo, m2[0], "", 1) } if m2 := hostMatcher.FindStringSubmatch(memo); m2 != nil { host = m2[1] memo = strings.Replace(memo, m2[0], "", 1) } memo = strings.TrimSpace(memo) //fmt.Println(filename, u.BMagenta(authLevel), u.BCyan(method), u.BCyan(path), u.BMagenta(host), memo) conf := APIConfig{ Path: path, Method: method, AuthLevel: authLevel, Host: host, Memo: memo, ActionFile: filename, } if method == "WS" { registerWebsocket(&conf) } else { register(&conf) } } } } } } func makeAPICache(apiKey, filename string) { var apiCache *APICache if code, err := gojs.PreCompileFile(filename, s.ServerLogger); err == nil { apiCache = &APICache{ Filename: filename, Mtime: u.GetFileInfo(filename).ModTime.UnixMilli(), Code: code, } _apiCacheLock.Lock() _apiCache[apiKey] = apiCache _apiCacheLock.Unlock() } } func register(conf *APIConfig) { apiKey := conf.String() makeAPICache(apiKey, conf.ActionFile) s.RestfulWithOptions(conf.AuthLevel, conf.Method, conf.Path, func(args map[string]any, headers map[string]string, request *s.Request, response *s.Response, caller *discover.Caller, session *Session, logger *log.Logger) any { _apiCacheLock.RLock() apiCache := _apiCache[apiKey] _apiCacheLock.RUnlock() if apiCache != nil { return runAPI(apiCache.Code, args, headers, request, response, caller, session, logger) } return nil }, conf.Memo, s.WebServiceOptions{Host: conf.Host}) } func registerWebsocket(conf *APIConfig) { apiKey := conf.String() makeAPICache(apiKey, conf.ActionFile) s.RegisterSimpleWebsocketWithOptions(conf.AuthLevel, conf.Path, func(args map[string]any, headers map[string]string, request *s.Request, client *websocket.Conn, caller *discover.Caller, session *Session, logger *log.Logger) any { _apiCacheLock.RLock() apiCache := _apiCache[apiKey] _apiCacheLock.RUnlock() if apiCache != nil { return runWS(apiCache.Code, args, headers, request, client, caller, session, logger) } return nil }, conf.Memo, s.WebServiceOptions{Host: conf.Host}) } func preStart() { gojs.LoadPluginsConfig("config.yml") autoLoadAPI(apiPath) if apiHotLoadInterval > 0 { if apiHotLoadInterval < 100 { apiHotLoadInterval = 100 } s.NewTimerServer("_apiHotLoad", time.Millisecond*time.Duration(apiHotLoadInterval), func(isRunning *bool) { checkList := map[string]*APICache{} _apiCacheLock.RLock() for apiKey, apiCache := range _apiCache { checkList[apiKey] = apiCache } _apiCacheLock.RUnlock() if *isRunning == false { return } for apiKey, apiCache := range checkList { if fileInfo := u.GetFileInfo(apiCache.Filename); fileInfo != nil { if fileInfo.ModTime.UnixMilli() > apiCache.Mtime { s.ServerLogger.Info("api file changed, reloading...", "filename", apiCache.Filename) makeAPICache(apiKey, apiCache.Filename) } } if *isRunning == false { break } } }, nil, nil) } } func init() { plugin.Register(plugin.Plugin{ Id: "apigo.cloud/git/apigo/service", Name: "web service framework by github.com/ssgo/s", Objects: map[string]any{ "start": func() { preStart() s.Start() }, "asyncStart": func() *AsyncServer { preStart() as := s.AsyncStart() c := &AsyncServer{ Addr: as.Addr, Proto: as.Proto, ProtoName: as.ProtoName, as: as, globalHeaders: nil, } return c }, "addTask": func(name string, intervalMS uint, onRunning func(running *bool), onStart func(), onStop func()) { s.NewTimerServer(name, time.Duration(intervalMS)*time.Microsecond, onRunning, onStart, onStop) }, "onStop": func(cb func()) { s.AddShutdownHook(cb) }, "startSession": func(authenticator func(authLevel int, url *string, args map[string]any, request *s.Request, response *s.Response, session *Session) (pass bool), ctx plugin.Context) { s.SetAuthChecker(func(authLevel int, logger *log.Logger, url *string, args map[string]any, request *s.Request, response *s.Response, options *s.WebServiceOptions) (pass bool, object any) { session := NewSession(request.GetSessionId(), logger) return authenticator(authLevel, url, args, request, response, session), session }) }, "setAuthenticator": func(authenticator func(authLevel int, logger *log.Logger, url *string, args map[string]any, request *s.Request, response *s.Response, options *s.WebServiceOptions) (pass bool, object any), ctx plugin.Context) { s.SetAuthChecker(authenticator) }, "setInFilter": func(cb func(args *map[string]any, request *s.Request, response *s.Response) (out any)) { s.SetInFilter(func(args *map[string]any, request *s.Request, response *s.Response, logger *log.Logger) (out any) { return cb(args, request, response) }) }, "setOutFilter": func(cb func(args map[string]any, request *s.Request, response *s.Response, out any) (isOver bool)) { s.SetOutFilter(func(args map[string]any, request *s.Request, response *s.Response, out any, logger *log.Logger) (newOut any, isOver bool) { return nil, cb(args, request, response, out) }) }, "proxy": func(authLevel int, path string, toApp, toPath string) { s.Proxy(authLevel, path, toApp, toPath) }, "proxyBy": func(cb func(request *s.Request) ProxyByResult) { s.SetProxyBy(func(request *s.Request) (authLevel int, toApp, toPath *string, headers map[string]string) { r := cb(request) return r.AuthLevel, r.ToApp, r.ToPath, r.Headers }) }, "rewrite": func(path string, toPath string) { s.Rewrite(path, toPath) }, "rewriteBy": func(cb func(request *s.Request) (toPath string)) { s.SetRewriteBy(func(request *s.Request) (toPath string, rewrite bool) { toPath = cb(request) return toPath, toPath != "" }) }, "static": func(requestPath string, filePath string) { s.Static(requestPath, filePath) }, "staticByHost": func(hostname string, requestPath string, filePath string) { s.StaticByHost(requestPath, filePath, hostname) }, "register": func(conf *APIConfig) { register(conf) }, "registerWebsocket": func(conf *APIConfig) { registerWebsocket(conf) }, "getArgs": func(ctx *plugin.Context) map[string]any { if v, ok := ctx.GetData("args").(map[string]any); ok { return v } return nil }, "getHeaders": func(ctx *plugin.Context) map[string]string { if v, ok := ctx.GetData("headers").(map[string]string); ok { return v } return nil }, "getRequest": func(ctx *plugin.Context) *s.Request { if v, ok := ctx.GetData("request").(*s.Request); ok { return v } return nil }, "getResponse": func(ctx *plugin.Context) *s.Response { if v, ok := ctx.GetData("response").(*s.Response); ok { return v } return nil }, "getClient": func(ctx *plugin.Context) *WS { if v, ok := ctx.GetData("client").(*websocket.Conn); ok { logger := ctx.GetInject("*log.Logger").(*log.Logger) return &WS{ conn: v, closed: false, logger: logger, } } return nil }, "getCaller": func(ctx *plugin.Context) *DiscoverClient { if v, ok := ctx.GetData("caller").(*DiscoverClient); ok { return v } return &DiscoverClient{ caller: &discover.Caller{}, logger: ctx.GetInject("*log.Logger").(*log.Logger), globalHeaders: make(map[string]string), } }, "getSession": func(ctx *plugin.Context) *Session { if v, ok := ctx.GetData("session").(*Session); ok { return v } return nil }, "setData": func(ctx *plugin.Context, key string, value any) { _dataLock.Lock() defer _dataLock.Unlock() _data[key] = value }, "getData": func(ctx *plugin.Context, key string) any { _dataLock.RLock() defer _dataLock.RUnlock() return _data[key] }, "addDiscoverApp": func(ctx *plugin.Context, app, callConfig string) bool { return discover.AddExternalApp(app, callConfig) }, "addDiscoverAppForTest": func(ctx *plugin.Context, app, callConfig string) bool { return discover.AddExternalAppManually(app, callConfig) }, "setDiscoverNode": func(ctx *plugin.Context, app, addr string, weight int) { discover.SetNode(app, addr, weight) }, }, ConfigSample: `listen: 80 # 监听端口(|隔开多个监听)(,隔开多个选项)例如 80,http|443|443:h2|127.0.0.1:8080,h2c ssl: # SSL证书配置,key为域名,value为cert和key的文件路径 a.com: # 证书匹配的域名 certFile: /path/to/ssl.pem # cert文件路径 keyFile: /path/to/ssl.key # key文件路径 registry: # 服务发现使用的redis连接,例如:redis://:@127.0.0.1:6379/15 app: # 注册成为一个服务,指定一个名称即可让其他服务使用这个名称来调用,默认不注册为服务 weight: 100 # 服务节点的权重,默认值:100 accessTokens: # 请求接口时使用指定的Access-Token进行验证,值为token对应的auth-level huDWA-78we2-89uji1da: 1 # 该token将获得level为1的权限 89dsj-aADSsn-Uds1dad: 2 # 该token将获得level为2的权限 calls: # 定义将会调用的服务,可以指定调用时使用的Access-Token、请求使用的协议、超时时间(:间隔) app1: huDWA-78we2-89uji1da # 携带指定的Access-Token请求服务app1(h2c协议,超时时间为10s) app2: 1:500ms # 不携带Access-Token请求服务app2(http协议,超时时间为500ms) app3: s:30s:89dsj-aADSsn-Uds1dad # 携带指定Access-Token请求服务app3(https协议,超时时间为30s) callRetryTimes: 10 # 节点连续失败超过指定次数将会被注销(只要有其他节点能工作不会影响正常访问),默认值:10 ipPrefix: # 用discover注册发现服务时指定使用的IP网段,默认排除 172.17.(Docker) keepaliveTimeout: 15000 # 连接允许空闲的最大时间,单位ms,默认值:15000 noLogHeaders: Accept,Accept-Encoding,Cache-Control,Pragma,Connection,Upgrade-Insecure-Requests # 不记录请求头中包含的这些字段,多个字段用逗号分隔 logInputArrayNum: 10 # 请求字段中容器类型(数组、Map)在日志打印个数限制 默认为10个,多余的数据将不再日志中记录 logInputFieldSize: 500 # 请求字段中单个字段在日志打印长度限制 默认为500个字符,多余的数据将不再日志中记录 logOutputArrayNum: 3 # 响应字段中容器类型(数组、Map)在日志打印个数限制 默认为3个,多余的数据将不再日志中记录 logOutputFieldSize: 100 # 响应字段中单个字段在日志打印长度限制 默认为100个字符,多余的数据将不再日志中记录 compress: false # 是否启用压缩,默认不启用 compressMinSize: 1024 # 小于设定值的应答内容将不进行压缩,默认值:1024 compressMaxSize: 4096000 # 大于设定值的应答内容将不进行压缩,默认值:4096000 redirectTimeout: 10000 # proxy和discover发起请求时的超时时间,单位ms,默认值:10000 acceptXRealIpWithoutRequestId: false # 是否允许头部没有携带请求ID的X-Real-IP信息,默认不允许(防止伪造客户端IP) statisticTime: false # 是否开启请求时间统计,默认不开启 statisticTimeInterval: 10000 # 统计时间间隔,单位ms,默认值:10000 maxUploadSize: 104857600 # 最大上传文件大小(multipart/form-data请求的总空间),单位字节,默认值:104857600(100M) cpu: 0 # CPU占用的核数,默认为0,即不做限制 memory: 0 # 内存(单位M),默认为0,即不做限制 cpuMonitor: false # 在日志中记录CPU使用情况,默认不开启 memoryMonitor: false # 在日志中记录内存使用情况,默认不开启 cpuLimitValue: 100 # CPU超过最高占用值(10-100)超过次数将自动重启(如果CpuMonitor开启的话),默认100 memoryLimitValue: 95 # 内存超过最高占用值(10-100)超过次数将自动重启(如果MemoryMonitor开启的话),默认95 cpuLimitTimes: 6 # CPU超过最高占用值超过次数(1-100)将报警(如果CpuMonitor开启的话),默认6(即30秒内连续6次) memoryLimitTimes: 6 # 内存超过最高占用值超过次数(1-100)将报警(如果MemoryMonitor开启的话),默认6(即30秒内连续6次) cookieScope: host # 启用Session时Cookie的有效范围,host|domain|topDomain,默认值为host idServer: redis://:@127.0.0.1:6379/15 # 用uniqueId、id来生成唯一ID(雪花算法)时所需的redis服务器连接,例如:redis://:@127.0.0.1:6379/15,如果不指定将不能实现跨服务的全局唯一 sessionServer: redis://:@127.0.0.1:6379/14 # 用来存储Session所需的redis服务器连接,例如:redis://:@127.0.0.1:6379/14,如果不指定将使用内存存储来实现Session共享 sessionTimeout: 3600 # session过期时间,单位秒,默认值:3600 apiPath: api # API接口文件的目录,默认为当前目录下的 api/ apiHotLoadInterval: 0 # API文件热加载,当API接口检测到文件变化自动加载,单位毫秒,默认值:0(不开启) `, Init: func(conf map[string]any) { if conf["listen"] != nil { s.Config.Listen = u.String(conf["listen"]) } if conf["ssl"] != nil { u.Convert(conf["ssl"], &s.Config.SSL) } if conf["registry"] != nil { discover.Config.Registry = u.String(conf["registry"]) } if conf["app"] != nil { discover.Config.App = u.String(conf["app"]) } if conf["weight"] != nil { discover.Config.Weight = u.Int(conf["weight"]) } if conf["accessTokens"] != nil { u.Convert(conf["accessTokens"], &s.Config.AccessTokens) } if conf["calls"] != nil { u.Convert(conf["calls"], &discover.Config.Calls) } if conf["callRetryTimes"] != nil { discover.Config.CallRetryTimes = u.Int(conf["callRetryTimes"]) } if conf["ipPrefix"] != nil { s.Config.IpPrefix = u.String(conf["ipPrefix"]) discover.Config.IpPrefix = u.String(conf["ipPrefix"]) } if conf["keepaliveTimeout"] != nil { s.Config.KeepaliveTimeout = u.Int(conf["keepaliveTimeout"]) } if conf["noLogHeaders"] != nil { s.Config.NoLogHeaders = u.String(conf["noLogHeaders"]) } if conf["logInputArrayNum"] != nil { s.Config.LogInputArrayNum = u.Int(conf["logInputArrayNum"]) } if conf["logInputFieldSize"] != nil { s.Config.LogInputFieldSize = u.Int(conf["logInputFieldSize"]) } if conf["logOutputArrayNum"] != nil { s.Config.LogOutputArrayNum = u.Int(conf["logOutputArrayNum"]) } if conf["logOutputFieldSize"] != nil { s.Config.LogOutputFieldSize = u.Int(conf["logOutputFieldSize"]) } if conf["compress"] != nil { s.Config.Compress = u.Bool(conf["compress"]) } if conf["compressMinSize"] != nil { s.Config.CompressMinSize = u.Int(conf["compressMinSize"]) } if conf["compressMaxSize"] != nil { s.Config.CompressMaxSize = u.Int(conf["compressMaxSize"]) } if conf["redirectTimeout"] != nil { s.Config.RedirectTimeout = u.Int(conf["redirectTimeout"]) } if conf["acceptXRealIpWithoutRequestId"] != nil { s.Config.AcceptXRealIpWithoutRequestId = u.Bool(conf["acceptXRealIpWithoutRequestId"]) } if conf["statisticTime"] != nil { s.Config.StatisticTime = u.Bool(conf["statisticTime"]) } if conf["statisticTimeInterval"] != nil { s.Config.StatisticTimeInterval = u.Int(conf["statisticTimeInterval"]) } if conf["maxUploadSize"] != nil { s.Config.MaxUploadSize = u.Int64(conf["maxUploadSize"]) } if conf["cpu"] != nil { s.Config.Cpu = u.Int(conf["cpu"]) } if conf["memory"] != nil { s.Config.Memory = u.Int(conf["memory"]) } if conf["cpuMonitor"] != nil { s.Config.CpuMonitor = u.Bool(conf["cpuMonitor"]) } if conf["memoryMonitor"] != nil { s.Config.MemoryMonitor = u.Bool(conf["memoryMonitor"]) } if conf["cpuLimitValue"] != nil { s.Config.CpuLimitValue = u.Uint(conf["cpuLimitValue"]) } if conf["memoryLimitValue"] != nil { s.Config.MemoryLimitValue = u.Uint(conf["memoryLimitValue"]) } if conf["cpuLimitTimes"] != nil { s.Config.CpuLimitTimes = u.Uint(conf["cpuLimitTimes"]) } if conf["memoryLimitTimes"] != nil { s.Config.MemoryLimitTimes = u.Uint(conf["memoryLimitTimes"]) } if conf["cookieScope"] != nil { s.Config.CookieScope = u.String(conf["cookieScope"]) } if conf["idServer"] != nil { s.Config.IdServer = u.String(conf["idServer"]) } if conf["sessionServer"] != nil { sessionRedis = redis.GetRedis(u.String(conf["sessionServer"]), nil) } if conf["sessionTimeout"] != nil { sessionTimeout = u.Int(conf["sessionTimeout"]) } if conf["apiPath"] != nil { apiPath = u.String(conf["apiPath"]) } if conf["apiHotLoadInterval"] != nil { apiHotLoadInterval = u.Int(conf["apiHotLoadInterval"]) } }, }) } func runAPI(code *gojs.PreCompiledCode, args map[string]any, headers map[string]string, request *s.Request, response *s.Response, caller *discover.Caller, session *Session, logger *log.Logger) any { rt := gojs.New(&gojs.RuntimeOption{Logger: logger}) rt.GoCtx.SetData("args", args) rt.GoCtx.SetData("headers", headers) rt.GoCtx.SetData("request", request) rt.GoCtx.SetData("response", response) rt.GoCtx.SetData("caller", &DiscoverClient{ caller: caller, logger: logger, globalHeaders: make(map[string]string), }) rt.GoCtx.SetData("session", session) r, _ := rt.RunPreCompiled(code) return r } func runWS(code *gojs.PreCompiledCode, args map[string]any, headers map[string]string, request *s.Request, client *websocket.Conn, caller *discover.Caller, session *Session, logger *log.Logger) any { rt := gojs.New(&gojs.RuntimeOption{Logger: logger}) rt.GoCtx.SetData("args", args) rt.GoCtx.SetData("headers", headers) rt.GoCtx.SetData("request", request) rt.GoCtx.SetData("client", client) rt.GoCtx.SetData("caller", &DiscoverClient{ caller: caller, logger: logger, globalHeaders: make(map[string]string), }) rt.GoCtx.SetData("session", session) r, _ := rt.RunPreCompiled(code) return r } func makeResult(logger *log.Logger, result *httpclient.Result) (*Result, error) { err, headers, statusCode, output := _makeResult(logger, result) return &Result{ result: result, Error: err, StatusCode: statusCode, Headers: headers, Data: output, }, result.Error } func _makeResult(logger *log.Logger, result *httpclient.Result) (err string, headers map[string]string, statusCode int, output interface{}) { if result.Error != nil { err = result.Error.Error() logger.Error(result.Error.Error()) } if result.Response != nil { headers = map[string]string{} for k, v := range result.Response.Header { if len(v) == 1 { headers[k] = v[0] } else { headers[k] = strings.Join(v, " ") } } statusCode = result.Response.StatusCode if strings.Contains(result.Response.Header.Get("Content-Type"), "application/json") { output = u.UnJson(result.String(), nil) } else { output = result.String() } } return } type AsyncServer struct { Addr string Proto string ProtoName string as *s.AsyncServer baseURL string globalHeaders map[string]string } func (c *AsyncServer) OnStop(f func()) { c.as.OnStop(f) } func (c *AsyncServer) Wait() { c.as.Wait() } func (c *AsyncServer) Stop() { c.as.Stop() } func (c *AsyncServer) makeURL(url string) string { if !strings.Contains(url, "://") && c.baseURL != "" { if strings.HasSuffix(c.baseURL, "/") && strings.HasPrefix(url, "/") { return c.baseURL + url[1:] } else if !strings.HasSuffix(c.baseURL, "/") && !strings.HasPrefix(url, "/") { return c.baseURL + "/" + url } return c.baseURL + url } return url } func (c *AsyncServer) makeHeaderArray(in *map[string]string) []string { out := make([]string, 0) if c.globalHeaders != nil { for k, v := range c.globalHeaders { out = append(out, k, v) } } if in != nil { for k, v := range *in { out = append(out, k, v) } } return out } // SetBaseURL 设置一个URL前缀,后续请求中可以只提供path部分 // SetBaseURL url 以http://或https://开头的URL地址 func (c *AsyncServer) SetBaseURL(url string) { c.baseURL = url } // SetGlobalHeaders 设置固定的HTTP头部信息,在每个请求中都加入这些HTTP头 // SetGlobalHeaders headers 传入一个Key-Value对象的HTTP头信息 func (c *AsyncServer) SetGlobalHeaders(headers map[string]string) { c.globalHeaders = headers } // Get 发送GET请求 // * url 以http://或https://开头的URL地址,如果设置了baseURL可以只提供path部分 // * headers 传入一个Key-Value对象的HTTP头信息,如果不指定头信息这个参数可以省略不传 // * return 返回结果对象,如果返回值是JSON格式,将自动转化为对象否则将字符串放在.result中,如发生错误将抛出异常,返回的对象中还包括:headers、statusCode、statusMessage func (c *AsyncServer) Get(ctx *plugin.Context, url string, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.as.Get(c.makeURL(url), c.makeHeaderArray(headers)...)) } // Post 发送POST请求 // * body 可以传入任意类型,如果不是字符串或二进制数组时会自动添加application/json头,数据将以json格式发送 func (c *AsyncServer) Post(ctx *plugin.Context, url string, body interface{}, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.as.Post(c.makeURL(url), body, c.makeHeaderArray(headers)...)) } // Put 发送PUT请求 func (c *AsyncServer) Put(ctx *plugin.Context, url string, body interface{}, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.as.Put(c.makeURL(url), body, c.makeHeaderArray(headers)...)) } // Delete 发送DELETE请求 func (c *AsyncServer) Delete(ctx *plugin.Context, url string, body interface{}, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.as.Delete(c.makeURL(url), body, c.makeHeaderArray(headers)...)) } // Head 发送HEAD请求 func (c *AsyncServer) Head(ctx *plugin.Context, url string, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.as.Head(c.makeURL(url), c.makeHeaderArray(headers)...)) } // Do 发送请求 // * method 请求方法,GET、POST等 func (c *AsyncServer) Do(ctx *plugin.Context, method string, url string, body interface{}, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.as.Do(method, c.makeURL(url), body, c.makeHeaderArray(headers)...)) } // ManualDo 手动处理请求,需要自行从返回结果中读取数据,可实现SSE客户端 // ManualDo return 应答的对象(需手动读取数据并关闭请求) func (c *AsyncServer) ManualDo(ctx *plugin.Context, method string, url string, body interface{}, headers *map[string]string) (*ManualResult, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) result := c.as.ManualDo(method, url, body, c.makeHeaderArray(headers)...) r1, _ := makeResult(logger, result) return &ManualResult{ Result: *r1, }, result.Error } // Open 打开一个Websocket连接 // Open return Websocket对象(使用完毕请关闭连接) func (c *AsyncServer) Open(ctx *plugin.Context, url string, headers *map[string]string) (*WS, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) reqHeader := http.Header{} if headers != nil { for k, v := range *headers { reqHeader.Set(k, v) } } urlPrefix := "ws://" if c.ProtoName == "https" { urlPrefix = "wss://" } if conn, _, err := websocket.DefaultDialer.Dial(urlPrefix+c.Addr+url, reqHeader); err == nil { return &WS{conn: conn, logger: logger}, err } else { logger.Error(err.Error()) return nil, err } } type WS struct { conn *websocket.Conn closed bool logger *log.Logger } // Read 读取文本数据 // Read return 读取到的字符串 func (ws *WS) Read() (string, error) { _, buf, err := ws.conn.ReadMessage() return string(buf), err } // ReadBytes 读取二进制数据 // ReadBytes return 读取到的二进制数据 func (ws *WS) ReadBytes() ([]byte, error) { _, buf, err := ws.conn.ReadMessage() return buf, err } // ReadJSON 读取JSON对象 // ReadJSON return 读取到的对象 func (ws *WS) ReadJSON() (interface{}, error) { var obj interface{} err := ws.conn.ReadJSON(&obj) return obj, err } // Write 写入文本数据 // Write content 文本数据 func (ws *WS) Write(content string) error { return ws.conn.WriteMessage(websocket.TextMessage, []byte(content)) } // WriteBytes 写入二进制数据 // WriteBytes content 二进制数据 func (ws *WS) WriteBytes(content []byte) error { return ws.conn.WriteMessage(websocket.BinaryMessage, content) } // WriteJSON 写入对象 // WriteJSON content 对象 func (ws *WS) WriteJSON(content interface{}) error { return ws.conn.WriteJSON(content) } //// OnClose 关闭事件 //// OnClose callback 对方关闭时调用 //func (ws *WS) OnClose(callback func()) { // ws.conn.SetCloseHandler(func(code int, text string) error { // callback() // return nil // }) //} // Close 关闭连接 func (ws *WS) Close() error { if ws.closed { return nil } ws.closed = true return ws.conn.Close() } // EnableCompression 启用压缩 func (ws *WS) EnableCompression() { ws.conn.EnableWriteCompression(true) } type Result struct { result *httpclient.Result Error string StatusCode int Headers map[string]string Data interface{} } func (r *Result) String() string { return r.result.String() } func (r *Result) Bytes() []byte { return r.result.Bytes() } type ManualResult struct { Result closed bool } func (r *ManualResult) Save(filename string) error { if r.closed { return errors.New("http client reader closed") } r.closed = true return r.result.Save(filename) } func (r *ManualResult) Read(n int) (string, error) { if r.closed { return "", errors.New("http client reader closed") } buf := make([]byte, n) n1, err := r.result.Response.Body.Read(buf) return string(buf[0:n1]), err } func (r *ManualResult) ReadBytes(n int) ([]byte, error) { if r.closed { return nil, errors.New("http client reader closed") } buf := make([]byte, n) n1, err := r.result.Response.Body.Read(buf) return buf[0:n1], err } func (r *ManualResult) Close() error { if r.closed { return nil } r.closed = true return r.result.Response.Body.Close() } type DiscoverClient struct { caller *discover.Caller logger *log.Logger globalHeaders map[string]string } //func (c *DiscoverClient) makeURL(url string) string { // if !strings.Contains(url, "://") && c.baseURL != "" { // if strings.HasSuffix(c.baseURL, "/") && strings.HasPrefix(url, "/") { // return c.baseURL + url[1:] // } else if !strings.HasSuffix(c.baseURL, "/") && !strings.HasPrefix(url, "/") { // return c.baseURL + "/" + url // } // return c.baseURL + url // } // return url //} func (c *DiscoverClient) makeHeaderArray(in *map[string]string) []string { out := make([]string, 0) if c.globalHeaders != nil { for k, v := range c.globalHeaders { out = append(out, k, v) } } if in != nil { for k, v := range *in { out = append(out, k, v) } } return out } // SetGlobalHeaders 设置固定的HTTP头部信息,在每个请求中都加入这些HTTP头 // SetGlobalHeaders headers 传入一个Key-Value对象的HTTP头信息 func (c *DiscoverClient) SetGlobalHeaders(headers map[string]string) { c.globalHeaders = headers } // Get 发送GET请求 // * url 以http://或https://开头的URL地址,如果设置了baseURL可以只提供path部分 // * headers 传入一个Key-Value对象的HTTP头信息,如果不指定头信息这个参数可以省略不传 // * return 返回结果对象,如果返回值是JSON格式,将自动转化为对象否则将字符串放在.result中,如发生错误将抛出异常,返回的对象中还包括:headers、statusCode、statusMessage func (c *DiscoverClient) Get(ctx *plugin.Context, app, url string, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.caller.Get(app, url, c.makeHeaderArray(headers)...)) } // Post 发送POST请求 // * body 可以传入任意类型,如果不是字符串或二进制数组时会自动添加application/json头,数据将以json格式发送 func (c *DiscoverClient) Post(ctx *plugin.Context, app, url string, body interface{}, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.caller.Post(app, url, body, c.makeHeaderArray(headers)...)) } // Put 发送PUT请求 func (c *DiscoverClient) Put(ctx *plugin.Context, app, url string, body interface{}, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.caller.Put(app, url, body, c.makeHeaderArray(headers)...)) } // Delete 发送DELETE请求 func (c *DiscoverClient) Delete(ctx *plugin.Context, app, url string, body interface{}, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.caller.Delete(app, url, body, c.makeHeaderArray(headers)...)) } // Head 发送HEAD请求 func (c *DiscoverClient) Head(ctx *plugin.Context, app, url string, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.caller.Head(app, url, c.makeHeaderArray(headers)...)) } // Do 发送请求 // * method 请求方法,GET、POST等 func (c *DiscoverClient) Do(ctx *plugin.Context, method, app, url string, body interface{}, headers *map[string]string) (*Result, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) return makeResult(logger, c.caller.Do(method, app, url, body, c.makeHeaderArray(headers)...)) } // ManualDo 手动处理请求,需要自行从返回结果中读取数据,可实现SSE客户端 // ManualDo return 应答的对象(需手动读取数据并关闭请求) func (c *DiscoverClient) ManualDo(ctx *plugin.Context, method, app string, url string, body interface{}, headers *map[string]string) (*ManualResult, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) result := c.caller.ManualDo(method, app, url, body, c.makeHeaderArray(headers)...) r1, _ := makeResult(logger, result) return &ManualResult{ Result: *r1, }, result.Error } // Open 打开一个Websocket连接 // Open return Websocket对象(使用完毕请关闭连接) func (c *DiscoverClient) Open(ctx *plugin.Context, app, url string, headers *map[string]string) (*WS, error) { logger := ctx.GetInject("*log.Logger").(*log.Logger) reqHeader := http.Header{} if headers != nil { for k, v := range *headers { reqHeader.Set(k, v) } } if conn := c.caller.Open(app, url, c.makeHeaderArray(headers)...); conn == nil { return &WS{conn: conn, logger: logger}, nil } else { return nil, errors.New("open websocket failed") } }