commit d08ccdff7e222c104df0807d218adae10ecdb2bc Author: Star Date: Thu Oct 17 13:39:35 2024 +0800 add limiter, verify, session, websocket ... diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8f15271 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.* +!.gitignore +go.sum +/build +/node_modules +/package.json +/bak +node_modules +package.json diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d894367 --- /dev/null +++ b/LICENSE @@ -0,0 +1,9 @@ +MIT License + +Copyright (c) 2024 apigo + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..13e6c2f --- /dev/null +++ b/README.md @@ -0,0 +1,216 @@ +# 低代码的服务器端应用框架 基于 [ssgo/s](https://github.com/ssgo/s) + +快速创建一个web服务,提供 http、https、http2、h2c、websocket 服务 +支持作为静态文件服务器 +支持 rewrite 和 proxy反向代理(可以代理到discover应用或其他http服务器) +支持服务发现 [ssgo/discover](https://github.com/ssgo/discover) 快速构建一个服务网络 + +## 快速开始 + +```javascript +import s from "apigo.cc/gojs/service" + +func main() { + s.config({ + listen: '8080', + }) + s.register({ path: '/echo' }, ({ args }) => { + return args.name + }) + s.start() +} +``` + +受Javascript虚拟机机制限制简单方式是单线程,大量耗时请求压力下可能难以胜任 + +## 使用对象池实现高并发 + +#### api/echo.js + +```javascript +import s from "apigo.cc/gojs/service" + +function main() { + service.register({ path: '/echo' }, ({ args }) => { + return args.name + }) +} +``` + +#### main.js + +```javascript +import s from "apigo.cc/gojs/service" + +function main() { + s.load('api/echo.js', { min: 20, max: 1000, idle: 100 }) + s.start() +} +``` + +这种模式下服务代码会脱离主线程,使用对象池实现高并发 + +如果不配置对象池参数则不约束虚拟机数量上限,可以达到最佳性能但是对CPU和内存有一定挑战 + +设置较小的max可以有效保护CPU和内存资源但是设置过小将无法发挥服务器的性能 + +## 注册到服务发现 + +#### 在 user 服务中配置注册中心地址和应用名称,并配置访问令牌 + +```javascript +s.config({ + app: 'user', + registry: redis://:password@127.0.0.1:6379/15 + accessTokens: { + aaaaaaaa: 1, + }, +}) +s.register({ path: '/getUserInfo', authLevel: 1 }, ({ session }) => { + return session.get('id', 'name') +}) +``` + +#### 在 调用的服务中配置访问user服务使用的令牌 + +```javascript +s.config({ + registry: redis://:password@127.0.0.1:6379/15 + calls: { + user: 'aaaaaaaa', + }, +}) +s.register({ path: '/getUserInfo' }, ({ caller }) => { + return caller.get('user/getUserInfo').object() +}) +``` + +authLevel 不设置则不进行校验,如果获得 authLevel 2 则允许访问所有 2及以下级别的接口 + +如果 user 服务不配置 listen 默认使用 h2c 协议随机端口 + +如果不使用 h2c 协议,调用方配置 calls 时需指定 'http:aaaaaaaa' + +## Session + +服务默认启用 session 和 device 功能,如果不希望使用可以在配置中设置 sessionKey 或 deviceKey 为空 + +如果配置了 sessionProvider,session 将会存储在 redis 中,否则存储在内存中 + +sessionID和deviceID 同时支持HTTP头和Cookie两种传输方式,HTTP头优先,如果客户端没有传递则服务器会自动分配 + +如需使用 session 只需要在接口中直接获取即可 + +#### 下面是一个使用 session 并且使用参数有效性验证和限流器的例子 + +```javascript +import service from "apigo.cc/gojs/service" + +let verifies = { + id: v => { return /^\d+$/.test(v) }, + name: /^[a-zA-Z0-9_-\u4e00-\u9fa5\u3400-\u4db5\u3000-\u303F\u3040-\u309F\u30A0-\u30FF\u1100-\u11FF\u3130-\u318F\uAC00-\uD7AF\uD82F\uD835\uD83C\uD83D\uD83E\uD83F\uD840-\uD868\uD86A-\uD86C\uD86F-\uD872]+$/u, +} + +function main() { + service.config({ + userIdKey: 'id', + limitedMessage: { code: 429, message: "访问过于频繁" }, + authFieldMessage: { code: 403, message: "身份验证失败 [{{USER_AUTHLEVEL}}/{{TARGET_AUTHLEVEL}}]" }, + verifyFieldMessage: { code: 400, message: "参数 [{{FAILED_FIELDS}} 验证失败" }, + limiters: { + ip1s: { + from: 'ip', + time: 1000, + times: 10 + } + }, + }) + service.register({ method: 'POST', path: '/login', , limiters: ['ip1s'], verifies }, ({ args, session }) => { + session.set('id', args.id) + session.set('name', args.name) + session.setAuthLevel(1) + session.save() + return { code: 1 } + }) + service.register({ method: 'GET', path: '/userInfo', authLevel: 1, limiters: ['ip1s'] }, ({ session }) => { + return { code: 1, data: session.get('id', 'name') } + }) +} +``` + +session对象自动注入,无需任何其他操作。修改session后需要使用 session.save 来保存 + +调用 session.setAuthLevel 可以设置用户权限,当接口注册的 authLevel 大于0时可以基于 session 中的设置进行访问控制 + +配置了 userIdKey 后,会自动将 session 的用户ID记录在访问日志中,方便对用户的访问进行分析和统计 + +示例中创建了一个每个IP每秒允许10次请求的限流器并且在接口中使用了这个限流器 + +login接口配置了 id 和 name 两个参数的有效性验证规则 + +参数有效性验证配置可以支持以下类型: + +- value为string或RegExp对象时进行正则表达式校验 +- value为number时表示 字符串长度 +- value为boolean时表示 必须存在 +- value为数组时表示 必须是数组中的值 +- value为函数时调用函数进行校验 + + +## 配置文件 + +除了在代码中使用 s.config 外可以有三种配置方式 + +1、在当前目录下创建 service.yml 或 service.json 文件 + +```yaml +listen: 80|443 +ssl: + yourdomain.com: + certfile: /path/yourdomain.pem + keyfile: /path/yourdomain.pem +static: + yourdomain.com: /path/www +``` + +2、在环境配置文件 env.yml 或 env.json 中配置 + +```yaml +service: + listen: 80|443 +``` + +3、在环境变量中配置(以docker为例) + +```shell +docker run -e SERVICE_LISTEN=8080:8443 +``` + +#### 所有配置方式的优先级为 s.config > 环境变量 > env.yml > service.yml + +## websocket + +```javascript +import service from "apigo.cc/gojs/service" + +function main() { + service.register({ + method: 'WS', path: '/ws', + onMessage: ({ client, type, data }) => { + // onMessage + client.writeMessage(type, data) + }, + onClose: ({ client }) => { + // onClose + }, + }, ({ client }) => { + // onOpen + client.write('Hello, World!') + }) +} +``` + +注册接口时将 method 指定为 WS 即可创建 websocket 服务,配置 onMessage 来异步处理消息 + + +## 完整的API参考 [service.ts](https://apigo.cc/gojs/service/src/branch/main/service.ts) diff --git a/caller.go b/caller.go new file mode 100644 index 0000000..cfeaa89 --- /dev/null +++ b/caller.go @@ -0,0 +1,139 @@ +package service + +import ( + "reflect" + "strings" + + "apigo.cc/gojs" + "apigo.cc/gojs/goja" + "github.com/ssgo/discover" + "github.com/ssgo/httpclient" + "github.com/ssgo/u" +) + +type Caller struct { + client *discover.Caller +} + +func NewCaller(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm) + return vm.ToValue(gojs.MakeMap(Caller{client: discover.NewCaller(nil, args.Logger)})) +} + +func makeResult(r *httpclient.Result, vm *goja.Runtime) goja.Value { + if r.Error != nil { + panic(vm.NewGoError(r.Error)) + } + headers := map[string]string{} + for k, v := range r.Response.Header { + headers[k] = v[0] + } + return vm.ToValue(map[string]any{ + "status": r.Response.Status, + "statusCode": r.Response.StatusCode, + "headers": headers, + "_data": r.Bytes(), + "bytes": toBytes, + "string": toString, + "object": toObject, + }) +} + +func toBytes(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + dataValue := argsIn.This.ToObject(vm).Get("_data") + if _, ok := dataValue.Export().([]byte); ok { + return dataValue + } + return nil +} + +func toString(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + dataValue := argsIn.This.ToObject(vm).Get("_data") + if data, ok := dataValue.Export().([]byte); ok { + return vm.ToValue(string(data)) + } + return nil +} + +func toObject(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + dataValue := argsIn.This.ToObject(vm).Get("_data") + if data, ok := dataValue.Export().([]byte); ok { + obj := u.UnJsonBytes(data, nil) + v := u.FinalValue(reflect.ValueOf(obj)) + if v.IsValid() { + return vm.ToValue(v.Interface()) + } + } + return nil +} + +func (cl *Caller) makeHeaderArray(in map[string]any) []string { + out := make([]string, 0) + if in != nil { + for k, v := range in { + out = append(out, k, u.String(v)) + } + } + return out +} + +func parseAppPath(appURL string) (string, string) { + arr := strings.SplitN(appURL, "/", 2) + if len(arr) == 2 { + return arr[0], arr[1] + } + return appURL, "/" +} + +func (cl *Caller) Get(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + app, path := parseAppPath(args.Str(0)) + return makeResult(cl.client.Get(app, path, cl.makeHeaderArray(args.Map(1))...), vm) +} + +func (cl *Caller) Head(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + app, path := parseAppPath(args.Str(0)) + return makeResult(cl.client.Head(app, path, cl.makeHeaderArray(args.Map(1))...), vm) +} + +func (cl *Caller) Post(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + app, path := parseAppPath(args.Str(0)) + return makeResult(cl.client.Post(app, path, args.Any(1), cl.makeHeaderArray(args.Map(2))...), vm) +} + +func (cl *Caller) Put(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + app, path := parseAppPath(args.Str(0)) + return makeResult(cl.client.Put(app, path, args.Any(1), cl.makeHeaderArray(args.Map(2))...), vm) +} + +func (cl *Caller) Delete(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + app, path := parseAppPath(args.Str(0)) + return makeResult(cl.client.Delete(app, path, args.Any(1), cl.makeHeaderArray(args.Map(2))...), vm) +} + +func (cl *Caller) Do(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(3) + app, path := parseAppPath(args.Str(0)) + if len(argsIn.Arguments) == 3 { + argsIn.Arguments = append(argsIn.Arguments, vm.ToValue(nil)) + } + var r *httpclient.Result + if cb, ok := goja.AssertFunction(argsIn.Arguments[3]); ok { + r = cl.client.ManualDo(app, path, args.Str(1), args.Any(2), cl.makeHeaderArray(args.Map(4))...) + buf := make([]byte, 1024) + for { + n, err := r.Response.Body.Read(buf) + if err != nil { + break + } + _, _ = cb(argsIn.This, vm.ToValue(u.String(buf[0:n]))) + } + } else { + r = cl.client.Do(app, path, args.Str(1), args.Any(2), cl.makeHeaderArray(args.Map(4))...) + } + return makeResult(r, vm) +} diff --git a/gateway.go b/gateway.go new file mode 100644 index 0000000..b797626 --- /dev/null +++ b/gateway.go @@ -0,0 +1,324 @@ +package service + +import ( + "fmt" + "regexp" + "strings" + "sync" + "time" + + "github.com/ssgo/discover" + "github.com/ssgo/s" + "github.com/ssgo/standard" + "github.com/ssgo/u" +) + +type regexProxiesInfo struct { + Value string + Regex regexp.Regexp +} + +type regexRewriteInfo struct { + To string + Regex regexp.Regexp +} + +var _proxies = map[string]string{} +var _proxiesLock = sync.RWMutex{} +var _regexProxies = map[string]*regexProxiesInfo{} +var _regexRewrites = map[string]*regexRewriteInfo{} +var _rewritesLock = sync.RWMutex{} +var _statics = map[string]string{} +var _staticsLock = sync.RWMutex{} + +func updateStatic(in map[string]string) bool { + updated := false + for k, v := range in { + _staticsLock.RLock() + v1 := _statics[k] + _staticsLock.RUnlock() + if v == v1 { + continue + } + + s.ServerLogger.Info(u.StringIf(v1 != "", "update static set", "new static set"), "key", k, "value", v) + _staticsLock.Lock() + _statics[k] = v + _staticsLock.Unlock() + a := strings.SplitN(k, "/", 2) + if len(a) == 1 { + a = append(a, "/") + } + if a[0] == "*" { + a[0] = "" + } + s.StaticByHost(a[1], v, a[0]) + updated = true + } + return updated +} + +func updateProxy(in map[string]string) bool { + updated := false + //fmt.Println("####000") + + for k, v := range in { + //fmt.Println("####111", k, v) + _proxiesLock.RLock() + v1 := _proxies[k] + v2 := _regexProxies[k] + _proxiesLock.RUnlock() + // skip same + if v == v1 { + //fmt.Println("####222", k, v) + continue + } + ////fmt.Println("####333", k, v) + if v2 != nil && v == v2.Value { + continue + } + ////fmt.Println("####444", k, v) + + if strings.Contains(v, "(") { + // for regexp + ////fmt.Println("####555", k, v) + matcher, err := regexp.Compile("^" + v + "$") + if err != nil { + s.ServerLogger.Error("proxy regexp compile failed", "key", k, "value", v) + //log.Print("Proxy Error Compile ", err) + } else { + s.ServerLogger.Info(u.StringIf(v2 != nil, "update regexp proxy set", "new regexp proxy set"), "key", k, "value", v) + _proxiesLock.Lock() + _regexProxies[k] = ®exProxiesInfo{ + Value: v, + Regex: *matcher, + } + _proxiesLock.Unlock() + updated = true + } + } else { + // for simple + ////fmt.Println("####666", k, v) + s.ServerLogger.Info(u.StringIf(v1 != "", "update proxy set", "new proxy set"), "key", k, "value", v) + _proxiesLock.Lock() + _proxies[k] = v + _proxiesLock.Unlock() + + // add app to discover + ////fmt.Println("########2", len((*proxies))) + if !strings.Contains(v, "://") { + if discover.Config.Calls[v] == "" { + callConfig := "" + if strings.ContainsRune(v, ':') { + // support call config in proxy value + a := strings.SplitN(v, ":", 2) + v = a[0] + callConfig = a[1] + } else { + callConfig = (time.Duration(s.Config.ReadHeaderTimeout) * time.Millisecond).String() + } + // if redisPool != nil { + if discover.AddExternalApp(v, callConfig) { + updated = true + } + // } + } else { + updated = true + } + } else { + updated = true + } + } + } + //fmt.Println("####999") + return updated +} + +func updateRewrite(in map[string]string) bool { + updated := false + for k, v := range in { + _rewritesLock.RLock() + v2 := _regexRewrites[k] + _rewritesLock.RUnlock() + + // skip same + if v2 != nil && v == v2.To { + continue + } + + matcher, err := regexp.Compile("^" + k + "$") + if err != nil { + s.ServerLogger.Error("rewrite regexp compile failed", "key", k, "value", v) + } else { + s.ServerLogger.Info(u.StringIf(v2 != nil, "update regexp rewrite set", "new regexp rewrite set"), "key", k, "value", v) + _rewritesLock.Lock() + _regexRewrites[k] = ®exRewriteInfo{ + To: v, + Regex: *matcher, + } + _rewritesLock.Unlock() + updated = true + } + } + return updated +} + +func rewrite(request *s.Request) (toPath string, rewrite bool) { + list2 := map[string]*regexRewriteInfo{} + _rewritesLock.RLock() + for k, v := range _regexRewrites { + list2[k] = v + } + _rewritesLock.RUnlock() + + if len(list2) > 0 { + requestUrl := fmt.Sprint(request.Header.Get("X-Scheme"), "://", request.Host, request.RequestURI) + requestUrlWithoutScheme := fmt.Sprint(request.Host, request.RequestURI) + + for _, rr := range list2 { + finds := rr.Regex.FindAllStringSubmatch(requestUrl, 20) + if len(finds) == 0 { + finds = rr.Regex.FindAllStringSubmatch(requestUrlWithoutScheme, 20) + } + if len(finds) == 0 { + continue + } + + to := rr.To + if len(finds[0]) > 1 { + for i := 1; i < len(finds[0]); i++ { + varName := fmt.Sprintf("$%d", i) + to = strings.ReplaceAll(to, varName, finds[0][i]) + } + return to, true + } + } + } + + // 不进行代理 + return "", false +} + +func proxy(request *s.Request) (authLevel int, toApp, toPath *string, headers map[string]string) { + //fmt.Println("proxy", len(_proxies)) + outHeaders := map[string]string{ + standard.DiscoverHeaderFromApp: "gateway", + standard.DiscoverHeaderFromNode: s.GetServerAddr(), + } + + scheme := u.StringIf(request.TLS == nil, "http", "https") + host1 := "" + host2 := "" + if strings.ContainsRune(request.Host, ':') { + hostArr := strings.SplitN(request.Host, ":", 2) + host1 = hostArr[0] + host2 = request.Host + } else { + host1 = request.Host + host2 = request.Host + ":" + u.StringIf(request.TLS == nil, "80", "443") + } + + pathMatchers := make([]string, 0) + pathMatchers = append(pathMatchers, fmt.Sprint(scheme, "://", host1, request.RequestURI)) + pathMatchers = append(pathMatchers, fmt.Sprint(scheme, "://", host2, request.RequestURI)) + pathMatchers = append(pathMatchers, fmt.Sprint(host1, request.RequestURI)) + pathMatchers = append(pathMatchers, fmt.Sprint(host2, request.RequestURI)) + pathMatchers = append(pathMatchers, request.RequestURI) + + hostMatchers := make([]string, 0) + hostMatchers = append(hostMatchers, fmt.Sprint(scheme, "://", host1)) + hostMatchers = append(hostMatchers, fmt.Sprint(scheme, "://", host2)) + hostMatchers = append(hostMatchers, host1) + hostMatchers = append(hostMatchers, host2) + + list := map[string]string{} + _proxiesLock.RLock() + for k, v := range _proxies { + list[k] = v + } + _proxiesLock.RUnlock() + for p, a := range list { + //fmt.Println("check proxy ", p, a) + matchPath := "" + matchPathArr := strings.SplitN(strings.ReplaceAll(p, "://", ""), "/", 2) + if len(matchPathArr) == 2 { + matchPath = "/" + matchPathArr[1] + } + + if matchPath == "" { + for _, m := range hostMatchers { + if m == p { + //fmt.Println(" >>>>>>>>1", p, m, request.RequestURI) + return 0, fixAppName(a), &request.RequestURI, outHeaders + } + } + } else { + for _, m := range pathMatchers { + if strings.HasPrefix(m, p) { + if strings.HasPrefix(request.RequestURI, matchPath) { + p2 := request.RequestURI[len(matchPath):] + if len(p2) == 0 || p2[0] != '/' { + p2 = "/" + p2 + } + //fmt.Println(" >>>>>>>>2", p, m, p2) + return 0, fixAppName(a), &p2, outHeaders + } else { + //fmt.Println(" >>>>>>>>3", p, m, request.RequestURI) + return 0, fixAppName(a), &request.RequestURI, outHeaders + } + } + } + } + } + + // 模糊匹配 + list2 := map[string]*regexProxiesInfo{} + _proxiesLock.RLock() + for k, v := range _regexProxies { + list2[k] = v + } + _proxiesLock.RUnlock() + + if len(list2) > 0 { + requestUrl := request.Host + request.RequestURI + for _, rp := range list2 { + //fmt.Println("check regexp proxy ", rp.Regex, rp.Value) + finds := rp.Regex.FindAllStringSubmatch(requestUrl, 20) + if len(finds) > 0 && len(finds[0]) > 2 { + //fmt.Println(" >>>>>>>>2", request.RequestURI, finds[0][2]) + pos := strings.Index(request.RequestURI, finds[0][2]) + if pos > 0 { + outHeaders["Proxy-Path"] = request.RequestURI[0:pos] + } + + if !strings.Contains(finds[0][1], "://") && strings.ContainsRune(finds[0][1], ':') { + callConfig := "" + if strings.ContainsRune(finds[0][1], ':') { + // support call config in proxy value + a := strings.SplitN(finds[0][1], ":", 2) + finds[0][1] = a[0] + callConfig = a[1] + } else { + callConfig = (time.Duration(s.Config.ReadHeaderTimeout) * time.Millisecond).String() + } + // if redisPool != nil { + discover.AddExternalApp(finds[0][1], callConfig) + // } + } + return 0, &finds[0][1], &finds[0][2], outHeaders + } + } + } + + // 不进行代理 + return +} + +func fixAppName(appName string) *string { + if !strings.Contains(appName, "://") && strings.ContainsRune(appName, ':') { + a := strings.SplitN(appName, ":", 2) + return &a[0] + } else { + return &appName + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..8edd816 --- /dev/null +++ b/go.mod @@ -0,0 +1,40 @@ +module apigo.cc/gojs/service + +go 1.18 + +require ( + apigo.cc/gojs v0.0.2 + apigo.cc/gojs/console v0.0.1 + apigo.cc/gojs/http v0.0.3 + apigo.cc/gojs/util v0.0.2 + github.com/gorilla/websocket v1.5.3 + github.com/ssgo/config v1.7.7 + github.com/ssgo/discover v1.7.8 + github.com/ssgo/httpclient v1.7.8 + github.com/ssgo/log v1.7.7 + github.com/ssgo/redis v1.7.7 + github.com/ssgo/s v1.7.14 + github.com/ssgo/standard v1.7.7 + github.com/ssgo/u v1.7.9 +) + +require ( + github.com/dlclark/regexp2 v1.11.4 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/go-ole/go-ole v1.3.0 // indirect + github.com/go-sourcemap/sourcemap v2.1.4+incompatible // indirect + github.com/gomodule/redigo v1.9.2 // indirect + github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect + github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683 // indirect + github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect + github.com/shirou/gopsutil/v3 v3.24.5 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/ssgo/tool v0.4.27 // indirect + github.com/tklauser/go-sysconf v0.3.14 // indirect + github.com/tklauser/numcpus v0.8.0 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect + golang.org/x/net v0.30.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/request.go b/request.go new file mode 100644 index 0000000..3ea2710 --- /dev/null +++ b/request.go @@ -0,0 +1,142 @@ +package service + +import ( + "io" + + "apigo.cc/gojs" + "apigo.cc/gojs/goja" + "github.com/ssgo/s" + "github.com/ssgo/standard" +) + +type Request struct { + req *s.Request + Proto string + Scheme string + Host string + Method string + Path string + RemoteAddr string + RealIP string + Referer string + UserAgent string + Url string + ContentLength int64 + Cookies map[string]string + Headers map[string]string + Args map[string]any + Files map[string]map[string]any + MultiFiles map[string][]map[string]any +} + +func MakeRequest(req *s.Request, args map[string]any, headers map[string]string) map[string]any { + cookies := map[string]string{} + for _, ck := range req.Cookies() { + cookies[ck.Name] = ck.Value + } + files := map[string]map[string]any{} + multiFiles := map[string][]map[string]any{} + for k, v := range args { + if v1, ok := v.(s.UploadFile); ok { + if data, err := v1.Content(); err == nil { + files[k] = map[string]any{ + "name": v1.Filename, + "size": v1.Size, + "data": data, + } + } + delete(args, k) + } else if v2, ok := v.([]s.UploadFile); ok { + multiFiles[k] = make([]map[string]any, len(v2)) + for i, v1 := range v2 { + if data, err := v1.Content(); err == nil { + multiFiles[k][i] = map[string]any{ + "name": v1.Filename, + "size": v1.Size, + "data": data, + } + } + } + delete(args, k) + } + } + return gojs.MakeMap(&Request{ + req: req, + Proto: req.Proto, + Scheme: req.Header.Get(standard.DiscoverHeaderScheme), + Host: req.Header.Get(standard.DiscoverHeaderHost), + Method: req.Method, + Path: req.RequestURI, + RemoteAddr: req.RemoteAddr, + RealIP: req.GetRealIp(), + Referer: req.Referer(), + UserAgent: req.UserAgent(), + Url: req.URL.String(), + ContentLength: req.ContentLength, + Cookies: cookies, + Headers: headers, + Args: args, + Files: files, + MultiFiles: multiFiles, + }) +} + +func (r *Request) MakeURL(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + return vm.ToValue(r.req.MakeUrl(args.Str(0))) +} + +func (r *Request) ReadAll(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + data, err := io.ReadAll(r.req.Body) + if err != nil { + panic(vm.NewGoError(err)) + } + return vm.ToValue(data) +} + +func (r *Request) Read(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + size := args.Int(0) + data := make([]byte, size) + n, err := r.req.Body.Read(data) + if err != nil { + panic(vm.NewGoError(err)) + } + return vm.ToValue(data[0:n]) +} + +func (r *Request) Close(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + err := r.req.Body.Close() + if err != nil { + panic(vm.NewGoError(err)) + } + return nil +} + +func (r *Request) Get(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + return vm.ToValue(r.req.Get(args.Str(0))) +} + +func (r *Request) Set(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + r.req.Set(args.Str(0), args.Any(1)) + return nil +} + +func (r *Request) GetHeader(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + return vm.ToValue(r.req.Header.Get(args.Str(0))) +} + +func (r *Request) SetHeader(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + r.req.Header.Set(args.Str(0), args.Str(1)) + return nil +} + +func (r *Request) SetUserID(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + r.req.SetUserId(args.Str(0)) + return nil +} diff --git a/response.go b/response.go new file mode 100644 index 0000000..1b6293f --- /dev/null +++ b/response.go @@ -0,0 +1,121 @@ +package service + +import ( + "net/http" + "reflect" + "time" + + "apigo.cc/gojs" + "apigo.cc/gojs/goja" + "github.com/ssgo/s" +) + +type Response struct { + resp *s.Response + endCh chan bool + result any +} + +func (r *Response) End(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + if len(argsIn.Arguments) > 0 { + r.result = argsIn.Arguments[0].Export() + } + if r.endCh != nil { + r.endCh <- true + } + return nil +} + +func (r *Response) SetStatus(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + r.resp.WriteHeader(args.Int(0)) + return nil +} + +func (r *Response) SetCookie(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + path := "/" + domain := "" + expires := time.Time{} + maxAge := 0 + secure := false + httpOnly := false + obj := args.Obj(2) + if obj != nil { + path = obj.Str("path") + domain = obj.Str("domain") + maxAge = obj.Int("maxAge") + secure = obj.Bool("secure") + httpOnly = obj.Bool("httpOnly") + if expiresV := obj.Get("expires"); expiresV != nil { + expiresK := expiresV.ExportType().Kind() + if expiresK == reflect.String { + expires, _ = time.Parse("2006-01-02 15:04:05", expiresV.String()) + } else if expiresK == reflect.Int64 || expiresK == reflect.Int { + expires = time.Now().Add(time.Duration(expiresV.ToInteger()) * time.Millisecond) + } + } + + } + cookie := http.Cookie{ + Name: args.Str(0), + Value: args.Str(1), + Path: path, + Domain: domain, + Expires: expires, + MaxAge: maxAge, + Secure: secure, + HttpOnly: httpOnly, + } + http.SetCookie(r.resp, &cookie) + return nil +} + +func (r *Response) SetHeader(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + r.resp.Header().Set(args.Str(0), args.Str(1)) + return nil +} + +func (r *Response) AddHeader(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + r.resp.Header().Add(args.Str(0), args.Str(1)) + return nil +} + +func (r *Response) GetHeader(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + return vm.ToValue(r.resp.Header().Get(args.Str(0))) +} + +func (r *Response) Write(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + n, err := r.resp.Write(args.Bytes(0)) + if err != nil { + panic(vm.NewGoError(err)) + } + return vm.ToValue(n) +} + +func (r *Response) Flush(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + r.resp.Flush() + return nil +} + +func (r *Response) SendFile(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + r.resp.SendFile(args.Str(0), args.Str(1)) + return nil +} + +func (r *Response) DownloadFile(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(3) + r.resp.DownloadFile(args.Str(0), args.Str(1), args.Bytes(1)) + return nil +} + +func (r *Response) Location(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + r.resp.Location(args.Str(0)) + return nil +} diff --git a/service.go b/service.go new file mode 100644 index 0000000..496cc53 --- /dev/null +++ b/service.go @@ -0,0 +1,571 @@ +package service + +import ( + _ "embed" + "encoding/json" + "errors" + "fmt" + "reflect" + "regexp" + "strings" + "sync" + "time" + + "apigo.cc/gojs" + "apigo.cc/gojs/goja" + "github.com/gorilla/websocket" + "github.com/ssgo/config" + "github.com/ssgo/discover" + "github.com/ssgo/log" + "github.com/ssgo/redis" + "github.com/ssgo/s" + "github.com/ssgo/standard" + "github.com/ssgo/u" +) + +//go:embed service.ts +var serviceTS string + +//go:embed README.md +var serviceMD string + +var server *s.AsyncServer + +var pools = map[string]*gojs.Pool{} + +var poolExists = map[string]bool{} +var poolActionRegistered = map[string]bool{} +var poolsLock = sync.RWMutex{} +var waitChan chan bool + +type LimiterConfig struct { + From string + Time int + Times int +} + +type Config struct { + SessionKey string + DeviceKey string + ClientKey string + UserIdKey string + SessionProvider string + SessionTimeout int64 + AuthFieldMessage string + VerifyFieldMessage string + LimitedMessage string + Limiters map[string]*LimiterConfig + LimiterRedis string + + Proxy map[string]string + Rewrite map[string]string + Static map[string]string +} + +var serviceConfig Config +var onStop goja.Callable + +var limiters = map[string]*s.Limiter{} + +func init() { + obj := map[string]any{ + "config": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + s.Init() + + // 处理配置 + args := gojs.MakeArgs(&argsIn, vm) + serviceConfig = Config{"Session", "Device", "Client", "userId", "", 3600, "auth failed", "verify failed", "too many requests", nil, "", map[string]string{}, map[string]string{}, map[string]string{}} + if errs := config.LoadConfig("service", &serviceConfig); errs != nil && len(errs) > 0 { + panic(vm.NewGoError(errs[0])) + } + config.LoadConfig("service", &discover.Config) + // var auth goja.Callable + if conf := args.Obj(0); conf != nil { + u.Convert(conf.O.Export(), &serviceConfig) + u.Convert(conf.O.Export(), &s.Config) + u.Convert(conf.O.Export(), &discover.Config) + // auth = conf.Func("auth") + onStop = conf.Func("onStop") + + if serviceConfig.SessionProvider != "" { + sessionRedis = redis.GetRedis(serviceConfig.SessionProvider, args.Logger) + } + sessionTimeout = serviceConfig.SessionTimeout + if sessionTimeout < 0 { + sessionTimeout = 0 + } + } + + // 身份验证和Session + authAccessToken := s.Config.AccessTokens != nil && len(s.Config.AccessTokens) > 0 + s.SetClientKeys(serviceConfig.DeviceKey, serviceConfig.ClientKey, serviceConfig.SessionKey) + if serviceConfig.SessionKey != "" { + s.SetAuthChecker(func(authLevel int, logger *log.Logger, url *string, args map[string]any, request *s.Request, response *s.Response, options *s.WebServiceOptions) (pass bool, object any) { + var session *Session + setAuthLevel := 0 + if serviceConfig.SessionKey != "" { + sessionID := request.GetSessionId() + if sessionID != "" { + session = NewSession(sessionID, logger) + } + if userId, ok := session.data[serviceConfig.UserIdKey]; ok { + request.SetUserId(u.String(userId)) + } + // 优先使用session中的authLevel + if authLevel > 0 { + if authLevelBySession, ok := session.data["_authLevel"]; ok { + setAuthLevel = u.Int(authLevelBySession) + } + } + // if auth != nil { + // requestParams, _ := makeRequestParams(args, nil, request, response, nil, session, logger) + // requestParams["authLevel"] = authLevel + // if r, err := auth(nil, vm.ToValue(requestParams)); err == nil { + // if r.ExportType().Kind() == reflect.Bool { + // if r.ToBoolean() { + // return true, session + // } else { + // return false, nil + // } + // } else { + // return false, r.Export() + // } + // } else { + // logger.Error(err.Error()) + // return false, nil + // } + // } else { + // return true, session + // } + } + + // 如果没有session或session中的authLevel为0,则使用Access-Token中的authLevel(服务间调用) + if authAccessToken && setAuthLevel == 0 && authLevel > 0 { + setAuthLevel = s.GetAuthTokenLevel(request.Header.Get("Access-Token")) + } + if setAuthLevel >= authLevel { + return true, session + } else { + msg := serviceConfig.AuthFieldMessage + if strings.Contains(msg, "{{") { + msg = strings.ReplaceAll(msg, "{{TARGET_AUTHLEVEL}}", u.String(authLevel)) + msg = strings.ReplaceAll(msg, "{{USER_AUTHLEVEL}}", u.String(setAuthLevel)) + } + var obj any + if json.Unmarshal([]byte(msg), &obj) == nil { + return false, obj + } else { + return false, msg + } + } + }) + } + + // 限流器 + if serviceConfig.Limiters != nil { + var limiterRedis *redis.Redis + if serviceConfig.LimiterRedis != "" { + limiterRedis = redis.GetRedis(serviceConfig.LimiterRedis, args.Logger) + } + for name, limiter := range serviceConfig.Limiters { + switch limiter.From { + case "ip": + limiter.From = "header." + standard.DiscoverHeaderClientIp + case "user": + limiter.From = "header." + standard.DiscoverHeaderUserId + case "device": + limiter.From = "header." + standard.DiscoverHeaderDeviceId + } + if limiterRedis != nil { + limiters[name] = s.NewLimiter(name, limiter.From, time.Duration(limiter.Time)*time.Millisecond, limiter.Times, limiterRedis) + } else { + limiters[name] = s.NewLocalLimiter(name, limiter.From, time.Duration(limiter.Time)*time.Millisecond, limiter.Times) + } + } + } + return nil + }, + "start": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + if server != nil { + panic(vm.NewGoError(errors.New("server already started"))) + } + + // 处理静态文件 + if len(serviceConfig.Static) > 0 { + updateStatic(serviceConfig.Static) + } + if len(serviceConfig.Rewrite) > 0 { + updateRewrite(serviceConfig.Rewrite) + s.SetRewriteBy(rewrite) + } + if len(serviceConfig.Proxy) > 0 { + updateProxy(serviceConfig.Proxy) + s.SetProxyBy(proxy) + } + + // 启动服务 + server = s.AsyncStart() + waitChan = make(chan bool, 1) + server.OnStop(func() { + if onStop != nil { + onStop(nil) + } + }) + server.OnStopped(func() { + if waitChan != nil { + waitChan <- true + } + }) + return vm.ToValue(server.Addr) + }, + "stop": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + if server == nil { + panic(vm.NewGoError(errors.New("server not started"))) + } + server.Stop() + pools = map[string]*gojs.Pool{} + server = nil + return nil + }, + "register": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + o := args.Obj(0) + action := args.Func(1) + if action == nil { + panic(vm.NewGoError(errors.New("action must be a callback function"))) + } + + authLevel := o.Int("authLevel") + host := o.Str("host") + method := strings.ToUpper(o.Str("method")) + path := o.Str("path") + memo := o.Str("memo") + var usedLimiters []*s.Limiter + for _, limiterName := range o.Array("limiters") { + if limiter1, ok := limiters[u.String(limiterName)]; ok { + usedLimiters = append(usedLimiters, limiter1) + } + } + if verifiesObj := o.Obj("verifies"); verifiesObj != nil { + verifiesSet := map[string]func(any, *goja.Runtime) bool{} + for _, field := range verifiesObj.Keys() { + v := verifiesObj.Get(field) + // 根据类型设置验证器,存储到vm中,在请求到达时进行有效性验证 + switch v.ExportType().Kind() { + case reflect.String: + verifiesSet[field] = verifyRegexp(u.String(v.Export())) + case reflect.Int, reflect.Int64: + verifiesSet[field] = verifyLen(u.Int(v.Export())) + case reflect.Bool: + verifiesSet[field] = verifyRequire() + case reflect.Slice: + list := make([]string, 0) + vm.ForOf(v, func(v1 goja.Value) bool { + list = append(list, u.String(v1.Export())) + return true + }) + verifiesSet[field] = verifyIn(list) + default: + if fn, ok := goja.AssertFunction(v); ok { + verifiesSet[field] = verifyFunc(fn, verifiesObj) + } else if obj := v.ToObject(vm); obj != nil { + // 支持传入js的正则表达式对象 + if testV := obj.Get("test"); testV != nil { + if testFn, ok := goja.AssertFunction(testV); ok { + verifiesSet[field] = verifyFunc(testFn, obj) + } + } + } + } + } + vm.GoData[fmt.Sprint("VERIFY_"+host, method, path)] = verifiesSet + } + + opt := s.WebServiceOptions{ + NoBody: o.Bool("noBody"), + NoLog200: o.Bool("noLog200"), + Host: host, + //Ext: nil, + Limiters: usedLimiters, + } + + startFile := u.String(vm.GoData["startFile"]) + poolsLock.RLock() + poolExist := poolExists[startFile] + poolsLock.RUnlock() + if poolExist { + // 从对象调用(支持并发) + actionKey := "REGISTER_" + host + method + path + vm.GoData[actionKey] = action + vm.GoData[actionKey+"This"] = args.This + if method == "WS" { + vm.GoData[actionKey+"onMessage"] = o.Func("onMessage") + vm.GoData[actionKey+"onClose"] = o.Func("onClose") + } + poolsLock.Lock() + actionRegistered := poolActionRegistered[actionKey] + if !actionRegistered { + poolActionRegistered[actionKey] = true + } + poolsLock.Unlock() + if !actionRegistered { + if strings.ToUpper(method) == "WS" { + s.RegisterWebsocketWithOptions(authLevel, path, &websocket.Upgrader{}, makeWSAction(startFile, actionKey), nil, nil, nil, true, "", opt) + } else { + s.RestfulWithOptions(authLevel, method, path, makeOuterAction(startFile, actionKey), memo, opt) + } + } + } else { + // 无对象池,直接调用(单线程) + s.RestfulWithOptions(authLevel, method, path, makeInnerAction(action, vm, args.This), memo, opt) + } + return nil + }, + "load": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + actionFile := args.Path(0) + opt := args.Obj(1) + var mi, ma, idle uint + //var num uint + mainArgs := make([]any, 0) + debug := false + if opt != nil { + mi = uint(opt.Int("min")) + ma = uint(opt.Int("max")) + idle = uint(opt.Int("idle")) + //num = uint(opt.Int("num")) + debug = opt.Bool("debug") + if mainArgs1 := opt.Array("args"); mainArgs1 != nil { + mainArgs = mainArgs1 + } + } + if !u.FileExists(actionFile) { + panic(vm.NewGoError(errors.New("actionFile must be a js file path"))) + } + actionCode := u.ReadFileN(actionFile) + if !strings.Contains(actionCode, "function main(") || !strings.Contains(actionCode, ".register(") { + panic(vm.NewGoError(errors.New("actionFile must be a js file with main function and call service.register"))) + } + poolsLock.Lock() + poolExists[actionFile] = true + poolsLock.Unlock() + p := gojs.NewPoolByCode(actionCode, actionFile, gojs.PoolConfig{ + Min: mi, + Max: ma, + Idle: idle, + Debug: debug, + Args: mainArgs, + }, args.Logger) + //p := gojs.NewLBByCode(actionCode, actionFile, gojs.LBConfig{ + // Num: num, + // Debug: debug, + // Args: mainArgs, + //}, args.Logger) + poolsLock.Lock() + pools[actionFile] = p + poolsLock.Unlock() + return nil + }, + "newCaller": NewCaller, + } + + gojs.Register("apigo.cc/gojs/service", gojs.Module{ + Object: obj, + TsCode: serviceTS, + Example: serviceMD, + WaitForStop: func() { + if waitChan != nil { + <-waitChan + } + }, + }) +} + +func verifyRegexp(regexpStr string) func(any, *goja.Runtime) bool { + if rx, err := regexp.Compile(regexpStr); err != nil { + return func(value any, vm *goja.Runtime) bool { + return rx.MatchString(u.String(value)) + } + } + return nil +} + +func verifyLen(checkLength int) func(any, *goja.Runtime) bool { + return func(value any, vm *goja.Runtime) bool { + v := u.FinalType(reflect.ValueOf(value)) + if v.Kind() == reflect.Slice || v.Kind() == reflect.Map { + return v.Len() >= checkLength + } + return len(u.String(value)) >= checkLength + } +} + +func verifyRequire() func(any, *goja.Runtime) bool { + return func(value any, vm *goja.Runtime) bool { + value = u.FixPtr(value) + switch realValue := value.(type) { + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64: + return u.Int64(realValue) != 0 + case bool: + return realValue + default: + v := u.FinalType(reflect.ValueOf(value)) + if v.Kind() == reflect.Slice || v.Kind() == reflect.Map { + return v.Len() >= 0 + } + return len(u.String(value)) >= 0 + } + } +} + +func verifyIn(list []string) func(any, *goja.Runtime) bool { + return func(value any, vm *goja.Runtime) bool { + return u.StringIn(list, u.String(value)) + } +} + +func verifyFunc(callback goja.Callable, thisObj goja.Value) func(any, *goja.Runtime) bool { + return func(value any, vm *goja.Runtime) bool { + if r, err := callback(thisObj, vm.ToValue(value)); err == nil { + return u.Bool(r.Export()) + } else { + log.DefaultLogger.Error(err.Error()) + } + return false + } +} + +func makeRequestParams(args map[string]any, headers map[string]string, request *s.Request, response *s.Response, client *websocket.Conn, caller *discover.Caller, session *Session, logger *log.Logger) (gojs.Map, *Response) { + resp := &Response{ + resp: response, + endCh: make(chan bool, 1), + } + params := gojs.Map{ + "args": args, + "logger": gojs.MakeLogger(logger), + "request": MakeRequest(request, args, headers), + "response": gojs.MakeMap(resp), + "client": MakeWSClient(client), + } + if headers != nil { + params["headers"] = headers + } + if session != nil { + params["session"] = gojs.MakeMap(session) + } + if caller != nil { + params["caller"] = gojs.MakeMap(Caller{client: caller}) + } + return params, resp +} + +func runAction(action goja.Callable, vm *goja.Runtime, thisArg goja.Value, args map[string]any, headers map[string]string, request *s.Request, response *s.Response, client *websocket.Conn, caller *discover.Caller, session *Session, logger *log.Logger) (any, error) { + vm.CallbackLocker.Lock() + defer vm.CallbackLocker.Unlock() + + // 验证请求参数的有效性 + if verifies, ok := vm.GoData["VERIFY_"+u.String(request.Get("registerTag"))].(map[string]func(any, *goja.Runtime) bool); ok { + failedFields := make([]string, 0) + for k, v := range args { + if verifier, ok1 := verifies[k]; ok1 { + if !verifier(v, vm) { + failedFields = append(failedFields, k) + } + } + } + // 数据有效性验证失败 + if len(failedFields) > 0 { + response.WriteHeader(400) + msg := serviceConfig.VerifyFieldMessage + if strings.Contains(msg, "{{") { + msg = strings.ReplaceAll(msg, "{{FAILED_FIELDS}}", strings.Join(failedFields, ", ")) + } + var obj any + if json.Unmarshal([]byte(msg), &obj) == nil { + return obj, nil + } else { + return msg, nil + } + } + } + + requestParams, resp := makeRequestParams(args, headers, request, response, client, caller, session, logger) + var r any + + r1, err := action(thisArg, vm.ToValue(requestParams)) + if err == nil && r1 != nil { + r = r1.Export() + } + if err != nil { + logger.Error(err.Error()) + } + if response != nil && r == nil && err == nil { + <-resp.endCh + r = resp.result + } + return r, err +} + +func makeInnerAction(action goja.Callable, vm *goja.Runtime, thisArg goja.Value) any { + return func(args map[string]any, headers map[string]string, request *s.Request, response *s.Response, caller *discover.Caller, session *Session, logger *log.Logger) any { + r, _ := runAction(action, vm, thisArg, args, headers, request, response, nil, caller, session, logger) + return r + } +} + +func getPool(startFile string) *gojs.Pool { + var pool *gojs.Pool + for i := 0; i < 10; i++ { + poolsLock.RLock() + pool = pools[startFile] + poolsLock.RUnlock() + if pool != nil { + return pool + } + time.Sleep(time.Millisecond * 100) + } + return nil +} + +func makeOuterAction(startFile string, actionKey string) any { + return func(args map[string]any, headers map[string]string, request *s.Request, response *s.Response, caller *discover.Caller, session *Session, logger *log.Logger) any { + if pool := getPool(startFile); pool != nil { + rt := pool.Get() + defer pool.Put(rt) + if action, ok := rt.GetGoData(actionKey).(goja.Callable); ok { + var thisArg goja.Value + if thisArgV, ok := rt.GetGoData(actionKey + "This").(goja.Value); ok { + thisArg = thisArgV + } + r, _ := rt.RunVM(func(vm *goja.Runtime) (any, error) { + r2, err2 := runAction(action, vm, thisArg, args, headers, request, response, nil, caller, session, logger) + return r2, err2 + + }) + return r + } + } + return nil + } +} + +type PoolStatus struct { + Total uint + MaxTotal uint + MaxWaiting uint + CreateTimes uint +} + +func GetPoolStatus() map[string]PoolStatus { + out := map[string]PoolStatus{} + for k, v := range pools { + total, maxTotal, maxWaiting, createTimes := v.Count() + out[k] = PoolStatus{ + Total: total, + MaxTotal: maxTotal, + MaxWaiting: maxWaiting, + CreateTimes: createTimes, + } + } + return out +} diff --git a/service.ts b/service.ts new file mode 100644 index 0000000..8b5f3ca --- /dev/null +++ b/service.ts @@ -0,0 +1,253 @@ +// just for develop + +export default { + config, + start, + stop, + register, + load +} + +function config(config?: Config): void { +} + +function start(): string { + return '' +} + +function stop(): void { +} + +function register(option: RegisterOption, callback: (params: RequestParams) => void): any { + return null +} + +function load(serviceFile: string, poolConfig?: PoolConfig): void { +} + +interface Config { + // github.com/ssgo/s 的配置参数 + listen: string // 监听端口(|隔开多个监听)(,隔开多个选项)(如果不指定IP则监听在0.0.0.0,如果不指定端口则使用h2c协议监听在随机端口,80端口默认使用http协议,443端口默认使用https协议),例如 80,http|443|443:h2|127.0.0.1:8080,h2c + ssl: Map // SSL证书配置,key为域名,value为cert和key的文件路径 + noLogGets: boolean // 不记录GET请求的日志 + noLogHeaders: string // 不记录请求头中包含的这些字段,多个字段用逗号分隔,默认不记录:Accept,Accept-Encoding,Cache-Control,Pragma,Connection,Upgrade-Insecure-Requests + logInputArrayNum: number // 请求字段中容器类型(数组、Map)在日志打印个数限制 默认为10个,多余的数据将不再日志中记录 + logInputFieldSize: number // 请求字段中单个字段在日志打印长度限制 默认为500个字符,多余的数据将不再日志中记录 + noLogOutputFields: string // 不记录响应字段中包含的这些字段(key名),多个字段用逗号分隔 + logOutputArrayNum: number // 响应字段中容器类型(数组、Map)在日志打印个数限制 默认为3个,多余的数据将不再日志中记录 + logOutputFieldSize: number // 响应字段中单个字段在日志打印长度限制 默认为100个字符,多余的数据将不再日志中记录 + logWebsocketAction: boolean // 记录Websocket中每个Action的请求日志,默认不记录 + compress: boolean // 是否启用压缩,默认不启用 + compressMinSize: number // 小于设定值的应答内容将不进行压缩,默认值:1024 + compressMaxSize: number // 大于设定值的应答内容将不进行压缩,默认值:4096000 + checkDomain: string // 心跳检测时使用域名,,默认使用IP地址,心跳检测使用 HEAD /__CHECK__ 请求,应答 299 表示正常,593 表示异常 + redirectTimeout: number // proxy和discover发起请求时的超时时间,单位ms,默认值:10000 + acceptXRealIpWithoutRequestId: boolean // 是否允许头部没有携带请求ID的X-Real-IP信息,默认不允许(防止伪造客户端IP) + statisticTime: boolean // 是否开启请求时间统计,默认不开启 + statisticTimeInterval: number // 统计时间间隔,单位ms,默认值:10000 + fast: boolean // 是否启用快速模式(为了追求性能牺牲一部分特性),默认不启用 + maxUploadSize: number // 最大上传文件大小(multipart/form-data请求的总空间),单位字节,默认值:104857600 + cpu: number // CPU占用的核数,默认为0,即不做限制 + memory: number // 内存(单位M),默认为0,即不做限制 + cpuMonitor: boolean // 在日志中记录CPU使用情况,默认不开启 + memoryMonitor: boolean // 在日志中记录内存使用情况,默认不开启 + cpuLimitValue: number // CPU超过最高占用值(10-100)超过次数将自动重启(如果CpuMonitor开启的话),默认100 + memoryLimitValue: number // 内存超过最高占用值(10-100)超过次数将自动重启(如果MemoryMonitor开启的话),默认95 + cpuLimitTimes: number // CPU超过最高占用值超过次数(1-100)将报警(如果CpuMonitor开启的话),默认6(即30秒内连续6次) + memoryLimitTimes: number // 内存超过最高占用值超过次数(1-100)将报警(如果MemoryMonitor开启的话),默认6(即30秒内连续6次) + cookieScope: string // 启用Session时Cookie的有效范围,host|domain|topDomain,默认值为host + idServer: string // 用s.UniqueId、s.Id来生成唯一ID(雪花算法)时所需的redis服务器连接,如果不指定将不能实现跨服务的全局唯一 + keepKeyCase: boolean // 是否保持Key的首字母大小写?默认一律使用小写 + indexFiles: string[] // 访问静态文件时的索引文件,默认为 index.html + indexDir: boolean // 访问目录时显示文件列表 + readTimeout: number // 读取请求的超时时间,单位ms + readHeaderTimeout: number // 读取请求头的超时时间,单位ms + writeTimeout: number // 响应写入的超时时间,单位ms + idleTimeout: number // 连接空闲超时时间,单位ms + maxHeaderBytes: number // 请求头的最大字节数 + maxHandlers: number // 每个连接的最大处理程序数量 + maxConcurrentStreams: number // 每个连接的最大并发流数量 + maxDecoderHeaderTableSize: number // 解码器头表的最大大小 + maxEncoderHeaderTableSize: number // 编码器头表的最大大小 + maxReadFrameSize: number // 单个帧的最大读取大小 + maxUploadBufferPerConnection: number // 每个连接的最大上传缓冲区大小 + maxUploadBufferPerStream: number // 每个流的最大上传缓冲区大小 + + // 其他配置 + sessionKey: string // HTTP头和Cookie中SessionID的Key,客户端没有传递时服务端自动生成,Header的优先级高于Cookie,默认为 Session, 设置为空时表示不使用 + deviceKey: string // 标识设备ID的Key,客户端没有传递时服务端自动生成,Header的优先级高于Cookie,默认为 Device, 设置为空时表示不使用 + clientKey: string // 标识客户端的Key,默认为 Client,对应的Header头为 ClientName 和 ClientVersion + userIdKey: string // session中userID的Key,用于在日志中记录userId信息,默认为 userId + sessionProvider: string // 指定一个redis连接(例如:redis://:sskey加密的密码@127.0.0.1:6379/15),默认使用内存存储 + sessionTimeout: number // session过期时间,单位 秒,默认为 3600秒 + authFieldMessage: string | Object // 身份验证失败时的消息,默认为 auth failed,可以设置对象来返回JSON,可以使用模版 {{TARGET_AUTHLEVEL}}、{{USER_AUTHLEVEL}} + verifyFieldMessage: string | Object // 参数验证失败时的消息,默认为 verify failed,可以设置对象来返回JSON,可以使用模版 {{FAILED_FIELDS}} + limitedMessage: string | Object // 访问受限时的消息,默认为 too many requests,可以设置对象来返回JSON,可以使用模版 {{LIMITED_FROM}}、{{LIMITED_VALUE}} + limiterRedis: string // 限流器使用的Redis连接,默认使用内存存储 + limiters: Map // 限流器配置,from 为数据来源,例如:ip、user、device、header.User-Agent、in.phone 等(in表示从请求参数中获取),time为时间间隔,单位ms,times为 时间间隔内允许访问的次数 + + // gateway 的配置参数 + proxy: Map // 代理配置,key为[host][path],value为代理的目标应用或URL + rewrite: Map // 重写请求路径,key为[host][path],value为重写后的路径 + static: Map // 静态文件目录,key为[host][path],value为目录路径 + + // github.com/ssgo/discover 的配置参数 + registry: string // 服务注册中心,请配置一个有效的RedisURL,默认值 redis://:@127.0.0.1:6379/15 + app: string // 设置该值将会自动将服务注册到 discover + weight: number // 节点的权重,默认值 100 + accessTokens: Map // 请求接口时使用指定的Access-Token进行验证,值为Token对应的authLevel(允许访问大于等于指定对应authLevel的接口) + calls: Map // 配置将会调用的服务,格式:[1|2]:[http|https|h2c]:[AccessToken],默认协议为:h2c,如使用h2c协议可只提供 AccessToken + callRetryTimes: number // 调用其他服务时最大重试次数,默认值 10 + ipPrefix: string // discover服务发现时指定使用的IP网段,默认排除 172.17.(Docker) +} + +interface CertSet { + certFile: string + keyFile: string +} + +interface PoolConfig { + min: number + max: string + idle: number +} + +interface LimiterConfig { + from: string + time: number + times: number +} + +interface RegisterOption { + authLevel: number + host: string + method: string + path: string + memo: string + noBody: boolean + noLog200: boolean + limiters: string[] + onMessage: (params: OnMessageParams) => void + onClose: (params: RequestParams) => void +} + +interface RequestParams { + args: Object + headers: Object + request: Request + client: WSClient + caller: Caller + session: Session + response: Response + logger: Logger +} + +interface OnMessageParams { + type: string + data: string | Object + client: WSClient + session: Session + logger: Logger +} + +interface WSClient { + read: () => WSMessage + write: (data: any) => void + writeMessage: (type: string, data: any) => void + ping: () => void + close: () => void +} + +interface WSMessage { + type: string + data: string | Object +} + +interface Session { + set: (key: string | Object, value?: any) => void + get: (...keys: string[]) => any | Object + remove: (...keys: string[]) => void + setAuthLevel: (authLevel: number) => void + save: () => void +} + +interface Caller { + get(url: string, headers?: Object): Result + head(url: string, headers?: Object): Result + post(url: string, data: any, headers?: Object): Result + put(url: string, data: any, headers?: Object): Result + delete(url: string, data: any, headers?: Object): Result + do(method: string, url: string, data: any, callback?: (data: string) => void, headers?: Object): Result +} + +interface Result { + status: string + statusCode: number + headers: Object + bytes(): Uint8Array + string(): string + object(): Object +} + +interface CookieOption { + path: string + domain: string + expires: any + maxAge: number + secure: boolean + httpOnly: boolean +} + +interface Request { + proto: string + scheme: string + host: string + method: string + path: string + remoteAddr: string + realIP: string + referer: string + userAgent: string + url: string + contentLength: number + cookies: Object + headers: Object + args: Object + files: Map + multiFiles: Map + makeURL: (path: string) => string + readAll: () => any + read: (size: number) => any + close: () => void + get: (key: string) => any + set: (key: string, value: any) => void + getHeader: (key: string) => string + setHeader: (key: string, value: string) => void + setUserID: (id: string) => void +} + +interface Response { + setStatus: (code: number) => void + setCookie: (name: string, value: string, option?: CookieOption) => void + setHeader: (name: string, value: string) => void + addHeader: (name: string, value: string) => void + getHeader: (name: string) => string + write: (data: any) => number + flush: () => void + sendFile: (contentType: string, filename: string) => void + downloadFile: (contentType: string, filename: string, data: any) => void + location: (url: string) => void +} + +interface Logger { + debug: (message: string, info?: Object) => void + info: (message: string, info?: Object) => void + warn: (message: string, info?: Object) => void + error: (message: string, info?: Object) => void +} + +interface UploadFile { + name: string + size: number + data: any +} diff --git a/session.go b/session.go new file mode 100644 index 0000000..38814b9 --- /dev/null +++ b/session.go @@ -0,0 +1,137 @@ +package service + +import ( + "reflect" + "sync" + "time" + + "apigo.cc/gojs" + "apigo.cc/gojs/goja" + "github.com/ssgo/log" + "github.com/ssgo/redis" +) + +type Session struct { + id string + conn *redis.Redis + data map[string]any +} + +var sessionRedis *redis.Redis +var sessionTimeout = int64(3600) +var memorySessionData = map[string]map[string]any{} +var memorySessionDataLock = sync.RWMutex{} +var lastSessionClearTime int64 + +func NewSession(id string, logger *log.Logger) *Session { + data := map[string]any{} + conn := sessionRedis + if sessionRedis == nil { + memorySessionDataLock.RLock() + if data1, ok1 := memorySessionData[id]; ok1 && data1 != nil { + data = data1 + } + memorySessionDataLock.RUnlock() + } else { + conn = sessionRedis.CopyByLogger(logger) + err := conn.GET("SESS_" + id).To(&data) + if err == nil { + conn.EXPIRE("SESS_"+id, int(sessionTimeout)) + } + } + return &Session{ + id: id, + conn: conn, + data: data, + } +} + +func (session *Session) Set(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + if args.Arguments[0].ExportType().Kind() == reflect.Map { + for key, value := range args.Map(0) { + session.data[key] = value + } + } else { + args.Check(2) + session.data[args.Str(0)] = args.Any(1) + } + return nil +} + +func (session *Session) Get(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + if len(args.Arguments) == 1 { + if args.Arguments[0].ExportType().Kind() == reflect.Slice { + out := make(map[string]any) + for _, key := range args.Arr(0).StrArray(0) { + out[key] = session.data[key] + } + return vm.ToValue(out) + } else { + return vm.ToValue(session.data[args.Str(0)]) + } + } else { + out := make(map[string]any) + for _, key := range args.StrArray(0) { + out[key] = session.data[key] + } + return vm.ToValue(out) + } +} + +func (session *Session) Remove(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + if args.Arguments[0].ExportType().Kind() == reflect.Slice { + out := make(map[string]any) + for _, key := range args.Arr(0).StrArray(0) { + delete(session.data, key) + } + return vm.ToValue(out) + } else { + for _, key := range args.StrArray(0) { + delete(session.data, key) + } + } + return nil +} + +func (session *Session) SetAuthLevel(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + session.data["_authLevel"] = args.Int(0) + return nil +} + +func (session *Session) Save(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + if session.conn == nil { + now := time.Now().Unix() + session.data["_time"] = now + memorySessionDataLock.Lock() + memorySessionData[session.id] = session.data + clearTimeDiff := now - lastSessionClearTime + if clearTimeDiff > 60 { + lastSessionClearTime = now + } + memorySessionDataLock.Unlock() + + if clearTimeDiff > 60 { + go clearMemorySession() + } + } else { + session.conn.SETEX("SESS_"+session.id, int(sessionTimeout), session.data) + } + return nil +} + +func clearMemorySession() { + memorySessionDataLock.Lock() + now := time.Now().Unix() + for id, data := range memorySessionData { + if data["_time"] != nil { + if now-data["_time"].(int64) > sessionTimeout { + delete(memorySessionData, id) + } + } + } + memorySessionDataLock.Unlock() +} diff --git a/task.go b/task.go new file mode 100644 index 0000000..75f6128 --- /dev/null +++ b/task.go @@ -0,0 +1,54 @@ +package service + +import ( + "sync" + + "apigo.cc/gojs" + "apigo.cc/gojs/goja" +) + +var taskData = map[string]map[string]any{} +var taskDataLock = sync.RWMutex{} + +func SetTaskData(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(3) + scope := args.Str(0) + key := args.Str(1) + value := args.Any(2) + taskDataLock.Lock() + defer taskDataLock.Unlock() + if taskData[scope] == nil { + taskData[scope] = map[string]any{} + } + taskData[scope][key] = value + return nil +} + +func GetTaskData(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + scope := args.Str(0) + key := args.Str(1) + taskDataLock.RLock() + defer taskDataLock.RUnlock() + if taskData[scope] != nil { + return vm.ToValue(taskData[scope][key]) + } + return nil +} + +func GetTaskDataKeys(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + scope := args.Str(0) + taskDataLock.RLock() + defer taskDataLock.RUnlock() + if taskData[scope] != nil { + keys := make([]string, len(taskData[scope])) + i := 0 + for key := range taskData[scope] { + keys[i] = key + i++ + } + return vm.ToValue(keys) + } + return nil +} diff --git a/tests/api/echo.js b/tests/api/echo.js new file mode 100644 index 0000000..edb87ee --- /dev/null +++ b/tests/api/echo.js @@ -0,0 +1,13 @@ +import service from "apigo.cc/gojs/service" +import co from "apigo.cc/gojs/console" +import u from "apigo.cc/gojs/util" + +function main() { + service.register({ path: '/echo2', noLog200: true }, ({ args, response }) => { + // setTimeout(() => { + // response.end(args.name) + // }, 10) + u.sleep(10) + return args.name + }) +} diff --git a/tests/api/user.js b/tests/api/user.js new file mode 100644 index 0000000..882b73f --- /dev/null +++ b/tests/api/user.js @@ -0,0 +1,20 @@ +import service from "apigo.cc/gojs/service" +import co from "apigo.cc/gojs/console" + +let verifies = { + id: v => { return /^\d+$/.test(v) }, + name: /^[a-zA-Z0-9_-\u4e00-\u9fa5\u3400-\u4db5\u3000-\u303F\u3040-\u309F\u30A0-\u30FF\u1100-\u11FF\u3130-\u318F\uAC00-\uD7AF\uD82F\uD835\uD83C\uD83D\uD83E\uD83F\uD840-\uD868\uD86A-\uD86C\uD86F-\uD872]+$/u, +} + +function main() { + service.register({ method: 'POST', path: '/login', verifies }, ({ args, session }) => { + session.set('id', args.id) + session.set('name', args.name) + session.setAuthLevel(1) + session.save() + return { code: 1 } + }) + service.register({ method: 'GET', path: '/userInfo', authLevel: 1, limiters: ['ip1s'] }, ({ session }) => { + return { code: 1, data: session.get('id', 'name') } + }) +} diff --git a/tests/api/ws.js b/tests/api/ws.js new file mode 100644 index 0000000..22c182b --- /dev/null +++ b/tests/api/ws.js @@ -0,0 +1,16 @@ +import service from "apigo.cc/gojs/service" +import co from "apigo.cc/gojs/console" + +function main() { + service.register({ + method: 'WS', path: '/ws', + onMessage: ({ client, type, data }) => { + client.writeMessage(type, data) + }, + onClose: () => { + co.info('ws closed') + } + }, ({ client }) => { + client.write('Hello, World!') + }) +} diff --git a/tests/service_pool_test.go b/tests/service_pool_test.go new file mode 100644 index 0000000..312916d --- /dev/null +++ b/tests/service_pool_test.go @@ -0,0 +1,222 @@ +package service_test + +import ( + "fmt" + "sync" + "time" + + "apigo.cc/gojs" + _ "apigo.cc/gojs/console" + _ "apigo.cc/gojs/http" + "apigo.cc/gojs/service" + _ "apigo.cc/gojs/service" + _ "apigo.cc/gojs/util" + "github.com/ssgo/httpclient" + "github.com/ssgo/u" + + //"go.uber.org/goleak" + //_ "net/http/pprof" + "runtime" + "testing" +) + +var rt2 *gojs.Runtime +var addrByPool = "" + +const runTimes2 = 100 + +//func TestMain(m *testing.M) { +// // Verify no goroutines are leaked after tests +// goleak.VerifyTestMain(m) +//} + +func TestStartByPool(t *testing.T) { + //go func() { + // fmt.Println(u.BYellow(http.ListenAndServe("localhost:6060", nil))) + //}() + + gojs.ExportForDev() + rt2 = gojs.New() + err := rt2.StartFromFile("start.js") + if err != nil { + t.Fatal("start failed", err) + } + r, err := rt2.RunMain() + if err != nil { + t.Fatal("start failed", err) + } + addrByPool = u.String(r) +} + +// TODO Caller +// TODO WS + +// func TestWS(t *testing.T) { +// r, err := rt2.RunCode("testWS()") +// if err != nil { +// t.Fatal("test ws failed, got error", err) +// } else if r != true { +// t.Fatal("test ws failed", r, err) +// } else { +// fmt.Println(u.BGreen("test ws success")) +// } +// } + +func TestSession(t *testing.T) { + r, err := rt2.RunCode("testUser()") + if err != nil { + t.Fatal("test user failed, got error", err, r) + } else if r != true { + t.Fatal("test user failed", r) + } +} + +func TestJsEchoByPool(t *testing.T) { + for i := 0; i < runTimes2; i++ { + name := u.UniqueId() + r, err := rt2.RunCode("test2('" + name + "')") + if err != nil { + t.Fatal("test js get failed, got error", err) + } else if r != name { + t.Fatal("test js get failed, name not match", r, name) + } + } +} + +func TestGoEchoByPool(t *testing.T) { + hc := httpclient.GetClientH2C(0) + for i := 0; i < runTimes2; i++ { + name := u.UniqueId() + r := hc.Get("http://" + addrByPool + "/echo2?name=" + name) + if r.Error != nil { + t.Fatal("test go get failed, got error", r.Error) + } else if r.String() != name { + t.Fatal("test go get failed, name not match", r, name) + } + } +} + +func TestJsAsyncEchoByPool(t *testing.T) { + ch := make(chan bool, runTimes2) + t1 := time.Now().UnixMilli() + for i := 0; i < runTimes2; i++ { + go func() { + name := u.UniqueId() + r, err := rt2.RunCode("test2('" + name + "')") + ch <- true + if err != nil { + t.Fatal("test js async get failed, got error", err) + } else if r != name { + t.Fatal("test js async get failed, name not match", r, name) + } + }() + } + for i := 0; i < runTimes2; i++ { + <-ch + } + t2 := time.Now().UnixMilli() - t1 + fmt.Println(u.BGreen("js async test time:"), t2, "ms") +} + +func TestGoAsyncEchoByPool(t *testing.T) { + //defer goleak.VerifyNone(t) + hc := httpclient.GetClientH2C(0) + const taskNum = 100 + const taskRunTimes = 100 + + for j := 0; j < 10; j++ { + ch := make(chan bool, taskNum*taskRunTimes) + t1 := time.Now().UnixMilli() + //for j := 0; j < 10; j++ { + + ms1 := runtime.MemStats{} + runtime.ReadMemStats(&ms1) + + okNum := 0 + failedNum := 0 + startNum := 0 + endNum := 0 + lastName := fmt.Sprint("Task_", taskNum-1, "_", taskRunTimes-1) + lastResult := "" + lastOK := false + okMap := map[string]int{} + okMapLock := sync.Mutex{} + for i1 := 0; i1 < taskNum; i1++ { + name1 := fmt.Sprint("Task_", i1, "_") + go func() { + for i2 := 0; i2 < taskRunTimes; i2++ { + name := fmt.Sprint(name1, i2) + okMapLock.Lock() + okMap[name] = 1 + startNum++ + okMapLock.Unlock() + r := hc.Get("http://" + addrByPool + "/echo2?name=" + name) + okMapLock.Lock() + if name == lastName { + lastResult = r.String() + lastOK = lastName == lastResult + } + if r.Error != nil { + failedNum++ + t.Fatal("test go async get failed, got error", r.Error) + } else if r.String() != name { + failedNum++ + t.Fatal("test go async get failed, name not match", r, name) + } else { + okMap[name] = 2 + okNum++ + } + endNum++ + okMapLock.Unlock() + ch <- true + } + }() + } + for i := 0; i < taskNum*taskRunTimes; i++ { + <-ch + } + for i1 := 0; i1 < taskNum; i1++ { + for i2 := 0; i2 < taskRunTimes; i2++ { + name := fmt.Sprint("Task_", i1, "_", i2) + if okMap[name] != 2 { + fmt.Println(i1, i2, okMap[name]) + } + } + } + if okNum != taskNum*taskRunTimes { + t.Fatal("test go async get failed, ok num is error", okNum, taskNum*taskRunTimes, failedNum, "|", startNum, ">", endNum, "||", lastName, lastResult, lastOK) + } + t2 := time.Now().UnixMilli() - t1 + fmt.Println(u.Green("go async test time:"), u.BGreen(t2), "ms") + for k, status := range service.GetPoolStatus() { + fmt.Println(u.Magenta(k), u.Magenta("total"), u.BMagenta(status.Total), u.Magenta("maxTotal"), u.BMagenta(status.MaxTotal), u.Magenta("createTimes"), u.BMagenta(status.CreateTimes), u.Magenta("maxWaiting"), u.BMagenta(status.MaxWaiting)) + } + //fmt.Println(u.BGreen("last name:"), lastName, lastResult, lastOK) + if !lastOK { + t.Fatal("test go async get failed, last name not match", lastName, lastResult, lastOK) + } + + ms2 := runtime.MemStats{} + runtime.ReadMemStats(&ms2) + + runtime.GC() + ms3 := runtime.MemStats{} + runtime.ReadMemStats(&ms3) + fmt.Println(u.Cyan("MEMORY >>"), u.BCyan(fmt.Sprintln(ms1.HeapInuse/1000000, ms2.HeapInuse/1000000, ms3.HeapInuse/1000000))) + } + //hc.Destroy() + //time.Sleep(1000 * time.Second) +} + +func TestStopByPool(t *testing.T) { + _, err := rt2.RunCode("s.stop()") + if err != nil { + t.Fatal("stop failed", err) + } + gojs.WaitAll() + + runtime.GC() + ms3 := runtime.MemStats{} + runtime.ReadMemStats(&ms3) + fmt.Println(u.Cyan("END MEMORY >>"), u.BCyan(ms3.HeapInuse/1000000)) +} diff --git a/tests/service_test.go b/tests/service_test.go new file mode 100644 index 0000000..5a39e60 --- /dev/null +++ b/tests/service_test.go @@ -0,0 +1,121 @@ +package service_test + +import ( + "fmt" + "testing" + "time" + + "apigo.cc/gojs" + _ "apigo.cc/gojs/console" + _ "apigo.cc/gojs/http" + _ "apigo.cc/gojs/service" + _ "apigo.cc/gojs/util" + "github.com/ssgo/httpclient" + "github.com/ssgo/u" +) + +var rt *gojs.Runtime +var addr = "" + +const runTimes = 100 + +func TestStart(t *testing.T) { + gojs.ExportForDev() + rt = gojs.New() + err := rt.StartFromFile("start.js") + if err != nil { + t.Fatal("start failed", err) + } + r, err := rt.RunMain() + if err != nil { + t.Fatal("start failed", err) + } + addr = u.String(r) +} + +func TestJsEcho(t *testing.T) { + for i := 0; i < runTimes; i++ { + name := u.UniqueId() + r, err := rt.RunCode("test('" + name + "')") + if err != nil { + t.Fatal("test js get failed, got error", err) + } else if r != name { + t.Fatal("test js get failed, name not match", r, name) + } + } +} + +func TestGoEcho(t *testing.T) { + hc := httpclient.GetClientH2C(0) + for i := 0; i < runTimes; i++ { + name := u.UniqueId() + r := hc.Get("http://" + addr + "/echo?name=" + name) + if r.Error != nil { + t.Fatal("test go get failed, got error", r.Error) + } else if r.String() != name { + t.Fatal("test go get failed, name not match", r, name) + } + } +} + +func TestJsAsyncEcho(t *testing.T) { + ch := make(chan bool, runTimes) + t1 := time.Now().UnixMilli() + for i := 0; i < runTimes; i++ { + go func() { + name := u.UniqueId() + r, err := rt.RunCode("test('" + name + "')") + ch <- true + if err != nil { + t.Fatal("test js async get failed, got error", err) + } else if r != name { + t.Fatal("test js async get failed, name not match", r, name) + } + }() + } + for i := 0; i < runTimes; i++ { + <-ch + } + t2 := time.Now().UnixMilli() - t1 + fmt.Println(u.BGreen("js async test time:"), t2, "ms") +} + +func TestGoAsyncEcho(t *testing.T) { + hc := httpclient.GetClientH2C(0) + ch := make(chan bool, runTimes*10) + t1 := time.Now().UnixMilli() + lastName := "" + lastResult := "" + for i := 0; i < runTimes*10; i++ { + name := fmt.Sprint("N", i) + lastName = name + go func() { + r := hc.Get("http://" + addr + "/echo?name=" + name) + lastResult = r.String() + ch <- true + if r.Error != nil { + t.Fatal("test go async get failed, got error", r.Error) + } else if r.String() != name { + t.Fatal("test go async get failed, name not match", r, name) + } + }() + } + for i := 0; i < runTimes*10; i++ { + <-ch + } + t2 := time.Now().UnixMilli() - t1 + fmt.Println(u.BGreen("go async test time:"), t2, "ms") + fmt.Println(u.BGreen("last name:"), lastName, lastResult) +} + +func TestStop(t *testing.T) { + go func() { + time.Sleep(100 * time.Millisecond) + _, _ = rt.RunCode("s.stop()") + }() + gojs.WaitAll() + time.Sleep(100 * time.Millisecond) + // if err != nil { + // t.Fatal("stop failed", err) + // } +} diff --git a/tests/service_ws_test.go b/tests/service_ws_test.go new file mode 100644 index 0000000..7784d26 --- /dev/null +++ b/tests/service_ws_test.go @@ -0,0 +1,56 @@ +package service_test + +import ( + "fmt" + + "apigo.cc/gojs" + _ "apigo.cc/gojs/console" + _ "apigo.cc/gojs/http" + _ "apigo.cc/gojs/service" + _ "apigo.cc/gojs/util" + "github.com/ssgo/u" + + //"go.uber.org/goleak" + //_ "net/http/pprof" + "runtime" + "testing" +) + +var rt3 *gojs.Runtime + +func TestStartWSByPool(t *testing.T) { + gojs.ExportForDev() + rt3 = gojs.New() + err := rt3.StartFromFile("start_ws.js") + if err != nil { + t.Fatal("start failed", err) + } + _, err = rt3.RunMain() + if err != nil { + t.Fatal("start failed", err) + } +} + +func TestWS(t *testing.T) { + r, err := rt3.RunCode("testWS()") + if err != nil { + t.Fatal("test ws failed, got error", err) + } else if r != true { + t.Fatal("test ws failed", r, err) + } else { + fmt.Println(u.BGreen("test ws success")) + } +} + +func TestStopWSByPool(t *testing.T) { + _, err := rt3.RunCode("s.stop()") + if err != nil { + t.Fatal("stop failed", err) + } + gojs.WaitAll() + + runtime.GC() + ms3 := runtime.MemStats{} + runtime.ReadMemStats(&ms3) + fmt.Println(u.Cyan("END MEMORY >>"), u.BCyan(ms3.HeapInuse/1000000)) +} diff --git a/tests/start.js b/tests/start.js new file mode 100644 index 0000000..7235dab --- /dev/null +++ b/tests/start.js @@ -0,0 +1,84 @@ +import s from "apigo.cc/gojs/service" +import http from "apigo.cc/gojs/http" +import u from "apigo.cc/gojs/util" +import co from "apigo.cc/gojs/console" + +let h2c = http +let urlPrefix +function main() { + s.config({ + cpuMonitor: true, + memoryMonitor: true, + sessionKey: 'SessionID', + userIdKey: 'id', + limitedMessage: { code: 429, message: "访问过于频繁" }, + authFieldMessage: { code: 403, message: "身份验证失败 [{{USER_AUTHLEVEL}}/{{TARGET_AUTHLEVEL}}]" }, + verifyFieldMessage: { code: 400, message: "参数 [{{FAILED_FIELDS}} 验证失败" }, + limiters: { + ip1s: { + from: 'ip', + time: 100, + times: 10 + } + }, + }) + s.register({ path: '/echo', noLog200: true }, ({ args, response }) => { + // setTimeout(() => { + // response.end(args.name) + // }, 1) + u.sleep(1) + return args.name + }) + s.load('api/echo.js', { min: 20, max: 1000, idle: 100 }) + s.load('api/user.js') + let host = s.start() + h2c = http.newH2C({ + baseURL: 'http://' + host + }) + return host +} + +function test(name) { + let r = h2c.get('/echo?name=' + name) + return r.string() +} + +function test2(name) { + let r = h2c.get('/echo2?name=' + name) + return r.string() +} + +function testUser() { + let r = h2c.get('/userInfo') + if (r.statusCode !== 403 || r.object().code !== 403) return r + r = h2c.post('/login', { id: 'a1', name: 'Tom' }) + if (r.statusCode !== 400 || r.object().code !== 400) return r + r = h2c.post('/login', { id: 123, name: 'Tom\t' }) + if (r.statusCode !== 400 || r.object().code !== 400) return r + + r = h2c.post('/login', { id: 123, name: 'Tom' }).object() + if (r.code === 1) { + // 测试限流器允许的 10 次请求 + for (let i = 0; i < 10; i++) { + r = h2c.get('/userInfo').object() + if (r.data.id !== 123 | r.data.name !== 'Tom') { + return r.data + } + } + // 测试限流器拒绝的 1 次请求 + r = h2c.get('/userInfo') + if (r.statusCode != 429) { + return r + } + u.sleep(100) + + // 测试限流器过期后允许的 1 次请求 + r = h2c.get('/userInfo').object() + if (r.data.id !== 123 | r.data.name !== 'Tom') { + return r.data + } + + return true + } + return r +} diff --git a/tests/start_ws.js b/tests/start_ws.js new file mode 100644 index 0000000..6fee651 --- /dev/null +++ b/tests/start_ws.js @@ -0,0 +1,56 @@ +import s from "apigo.cc/gojs/service" +import http from "apigo.cc/gojs/http" +import u from "apigo.cc/gojs/util" +import co from "apigo.cc/gojs/console" + +let hc = http +let urlPrefix +function main() { + s.load('api/ws.js') + let host = s.start() + hc = http.new({ baseURL: 'http://' + host }) + return host +} + +function testWS() { + let ws = hc.connect('/ws', { pingInterval: 10 }) + let r = ws.read() + if (r.data !== 'Hello, World!') { + ws.close() + return r + } + co.info('test ws hello ok') + + ws.write('abc') + r = ws.read() + if (r.data !== 'abc') { + ws.close() + return r + } + co.info('test ws abc ok') + + // ws.ping() + u.sleep(100) + + let pc = ws.pingCount() + co.info('test ws ping ok', pc.pingTimes, pc.pongTimes) + + ws.write(new Uint8Array([1, 2, 3])) + r = ws.read() + if (r.data[0] !== 1 || r.data[1] !== 2 || r.data[2] !== 3) { + ws.close() + return r + } + co.info('test ws bytes ok') + + ws.write({ name: 'Tom' }) + r = ws.read() + if (r.data.name !== 'Tom') { + ws.close() + return r + } + co.info('test ws json ok') + + ws.close() + return true +} diff --git a/tests/test.js b/tests/test.js new file mode 100644 index 0000000..7fa29b2 --- /dev/null +++ b/tests/test.js @@ -0,0 +1,7 @@ +import http from "apigo.cc/gojs/http" + +let h2c = http.newH2C() +function main(args) { + let r = h2c.get('http://' + args[0] + '/echo?name=' + args[1]) + return r.string() +} \ No newline at end of file diff --git a/ws.go b/ws.go new file mode 100644 index 0000000..5c4e044 --- /dev/null +++ b/ws.go @@ -0,0 +1,192 @@ +package service + +import ( + "encoding/json" + + "apigo.cc/gojs" + "apigo.cc/gojs/goja" + "github.com/gorilla/websocket" + "github.com/ssgo/discover" + "github.com/ssgo/log" + "github.com/ssgo/s" + "github.com/ssgo/u" +) + +func MakeWSClient(client *websocket.Conn) gojs.Map { + return gojs.Map{ + "read": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + typ, data, err := readWSMessage(client) + if err != nil { + panic(vm.NewGoError(err)) + } + return vm.ToValue(gojs.Map{ + "type": typ, + "data": data, + }) + }, + "write": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(1) + var err error + switch args.Any(0).(type) { + case []byte: + err = client.WriteMessage(websocket.BinaryMessage, args.Bytes(0)) + case string: + err = client.WriteMessage(websocket.TextMessage, args.Bytes(0)) + default: + err = client.WriteMessage(websocket.TextMessage, u.JsonBytes(args.Any(0))) + } + if err != nil { + panic(vm.NewGoError(err)) + } + return nil + }, + "ping": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + err := client.WriteMessage(websocket.PingMessage, nil) + if err != nil { + panic(vm.NewGoError(err)) + } + return nil + }, + "writeMessage": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + args := gojs.MakeArgs(&argsIn, vm).Check(2) + var err error + switch args.Str(0) { + case "text": + err = client.WriteMessage(websocket.TextMessage, args.Bytes(1)) + case "binary": + err = client.WriteMessage(websocket.BinaryMessage, args.Bytes(1)) + case "ping": + err = client.WriteMessage(websocket.PingMessage, args.Bytes(1)) + case "pong": + err = client.WriteMessage(websocket.PongMessage, args.Bytes(1)) + case "close": + err = client.WriteMessage(websocket.CloseMessage, args.Bytes(1)) + case "json": + err = client.WriteMessage(websocket.TextMessage, u.JsonBytes(args.Any(1))) + default: + err = client.WriteMessage(websocket.TextMessage, args.Bytes(1)) + } + if err != nil { + panic(vm.NewGoError(err)) + } + return nil + }, + "close": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value { + err := client.Close() + if err != nil { + panic(vm.NewGoError(err)) + } + return nil + }, + } +} + +func readWSMessage(client *websocket.Conn) (string, any, error) { + msgType := "close" + var msgData any + typ, data, err := client.ReadMessage() + if err == nil && typ != websocket.CloseMessage { + switch typ { + case websocket.TextMessage: + if err2 := json.Unmarshal(data, &msgData); err2 != nil { + msgData = string(data) + msgType = "text" + } else { + msgType = "json" + } + case websocket.BinaryMessage: + if err2 := json.Unmarshal(data, &msgData); err2 != nil { + msgData = data + msgType = "binary" + } else { + msgType = "json" + } + case websocket.PingMessage: + msgData = string(data) + msgType = "ping" + case websocket.PongMessage: + msgData = string(data) + msgType = "pong" + } + } + return msgType, msgData, err +} + +func makeWSAction(startFile string, actionKey string) any { + // onOpen + return func(args map[string]any, headers map[string]string, request *s.Request, client *websocket.Conn, caller *discover.Caller, session *Session, logger *log.Logger) any { + if pool := getPool(startFile); pool != nil { + // 调用 onOpen + hasOnMessage := false + hasOnClose := false + func() { + rt := pool.Get() + defer pool.Put(rt) + if action, ok := rt.GetGoData(actionKey).(goja.Callable); ok { + var thisArg goja.Value + if thisArgV, ok := rt.GetGoData(actionKey + "This").(goja.Value); ok { + thisArg = thisArgV + } + _, _ = rt.RunVM(func(vm *goja.Runtime) (any, error) { + r2, err2 := runAction(action, vm, thisArg, args, headers, request, nil, client, caller, session, logger) + return r2, err2 + + }) + _, hasOnMessage = rt.GetGoData(actionKey + "onMessage").(goja.Callable) + _, hasOnClose = rt.GetGoData(actionKey + "onClose").(goja.Callable) + } + }() + // 循环读取消息,回调js的onMessage + if hasOnMessage { + for { + typ, data, err := readWSMessage(client) + if err != nil || typ == "close" { + break + } + func() { + rt := pool.Get() + defer pool.Put(rt) + if action, ok := rt.GetGoData(actionKey + "onMessage").(goja.Callable); ok { + var thisArg goja.Value + if thisArgV, ok := rt.GetGoData(actionKey + "This").(goja.Value); ok { + thisArg = thisArgV + } + _, _ = rt.RunVM(func(vm *goja.Runtime) (any, error) { + requestParams, _ := makeRequestParams(args, headers, request, nil, client, caller, session, logger) + requestParams["type"] = typ + requestParams["data"] = data + _, err := action(thisArg, vm.ToValue(requestParams)) + if err != nil { + logger.Error(err.Error()) + } + return nil, nil + }) + } + }() + } + } + + if hasOnClose { + func() { + rt := pool.Get() + defer pool.Put(rt) + if action, ok := rt.GetGoData(actionKey + "onClose").(goja.Callable); ok { + var thisArg goja.Value + if thisArgV, ok := rt.GetGoData(actionKey + "This").(goja.Value); ok { + thisArg = thisArgV + } + _, _ = rt.RunVM(func(vm *goja.Runtime) (any, error) { + requestParams, _ := makeRequestParams(args, headers, request, nil, client, caller, session, logger) + _, err := action(thisArg, vm.ToValue(requestParams)) + if err != nil { + logger.Error(err.Error()) + } + return nil, nil + }) + } + }() + } + } + return nil + } +}