http/http.go
2024-10-16 18:08:06 +08:00

587 lines
15 KiB
Go

package http
import (
_ "embed"
"encoding/json"
"net/http"
"reflect"
"strings"
"sync"
"time"
"apigo.cc/gojs"
"apigo.cc/gojs/goja"
"github.com/gorilla/websocket"
"github.com/ssgo/httpclient"
"github.com/ssgo/log"
"github.com/ssgo/u"
)
//go:embed http.ts
var httpTS string
type Http struct {
client *httpclient.ClientPool
baseURL string
globalHeaders map[string]string
}
var defaultHttp = &Http{
client: httpclient.GetClient(60000 * time.Millisecond),
globalHeaders: make(map[string]string),
}
// TODO ws
func init() {
obj := map[string]any{
"new": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
return newClient("HTTP", argsIn, vm)
},
"newH2C": func(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
return newClient("H2C", argsIn, vm)
},
"get": defaultHttp.Get,
"head": defaultHttp.Head,
"post": defaultHttp.Post,
"put": defaultHttp.Put,
"delete": defaultHttp.Delete,
"do": defaultHttp.Do,
"upload": defaultHttp.Upload,
"download": defaultHttp.Download,
"connect": defaultHttp.Connect,
}
gojs.Register("apigo.cc/gojs/http", gojs.Module{
Object: obj,
TsCode: httpTS,
Desc: "",
Example: "",
})
}
func newClient(portal string, argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(0)
opt := args.Obj(0)
timeout := 60000 * time.Millisecond
if opt != nil {
timeout = time.Duration(opt.Int64("timeout")) * time.Millisecond
}
var client *httpclient.ClientPool
if portal == "H2C" {
client = httpclient.GetClientH2C(timeout)
} else {
client = httpclient.GetClient(timeout)
}
cli := &Http{
client: client,
globalHeaders: make(map[string]string),
}
setConfig(cli, opt)
return vm.ToValue(gojs.MakeMap(cli))
}
func setConfig(cli *Http, opt *gojs.Obj) {
if opt != nil {
if globalHeaders := opt.Map("globalHeaders"); globalHeaders != nil {
for k, v := range globalHeaders {
cli.globalHeaders[k] = u.String(v)
}
}
if baseURL := opt.Str("baseURL"); baseURL != "" {
cli.baseURL = baseURL
}
if downloadPartSize := opt.Int64("downloadPartSize"); downloadPartSize != 0 {
cli.client.DownloadPartSize = downloadPartSize
}
if redirect := opt.Bool("redirect"); redirect {
cli.client.EnableRedirect()
}
}
}
func makeResult(r *httpclient.Result, vm *goja.Runtime) goja.Value {
if r.Error != nil {
panic(vm.NewGoError(r.Error))
}
headers := map[string]string{}
for k, v := range r.Response.Header {
headers[k] = v[0]
}
return vm.ToValue(map[string]any{
"status": r.Response.Status,
"statusCode": r.Response.StatusCode,
"headers": headers,
"_data": r.Bytes(),
"bytes": toBytes,
"string": toString,
"object": toObject,
})
}
func toBytes(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
dataValue := argsIn.This.ToObject(vm).Get("_data")
if _, ok := dataValue.Export().([]byte); ok {
return dataValue
}
return nil
}
func toString(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
dataValue := argsIn.This.ToObject(vm).Get("_data")
if data, ok := dataValue.Export().([]byte); ok {
return vm.ToValue(string(data))
}
return nil
}
func toObject(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
dataValue := argsIn.This.ToObject(vm).Get("_data")
if data, ok := dataValue.Export().([]byte); ok {
obj := u.UnJsonBytes(data, nil)
v := u.FinalValue(reflect.ValueOf(obj))
if v.IsValid() {
return vm.ToValue(v.Interface())
}
}
return nil
}
func (hc *Http) makeURL(url string) string {
if !strings.Contains(url, "://") && hc.baseURL != "" {
if strings.HasSuffix(hc.baseURL, "/") && strings.HasPrefix(url, "/") {
return hc.baseURL + url[1:]
} else if !strings.HasSuffix(hc.baseURL, "/") && !strings.HasPrefix(url, "/") {
return hc.baseURL + "/" + url
}
return hc.baseURL + url
}
return url
}
func (hc *Http) makeHeaderArray(in map[string]any) []string {
out := make([]string, 0)
if hc.globalHeaders != nil {
for k, v := range hc.globalHeaders {
out = append(out, k, v)
}
}
if in != nil {
for k, v := range in {
out = append(out, k, u.String(v))
}
}
return out
}
func (hc *Http) Get(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(1)
return makeResult(hc.client.Get(hc.makeURL(args.Str(0)), hc.makeHeaderArray(args.Map(1))...), vm)
}
func (hc *Http) Head(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(1)
return makeResult(hc.client.Head(hc.makeURL(args.Str(0)), hc.makeHeaderArray(args.Map(1))...), vm)
}
func (hc *Http) Post(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(2)
return makeResult(hc.client.Post(hc.makeURL(args.Str(0)), args.Any(1), hc.makeHeaderArray(args.Map(2))...), vm)
}
func (hc *Http) Put(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(2)
return makeResult(hc.client.Put(hc.makeURL(args.Str(0)), args.Any(1), hc.makeHeaderArray(args.Map(2))...), vm)
}
func (hc *Http) Delete(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(2)
return makeResult(hc.client.Delete(hc.makeURL(args.Str(0)), args.Any(1), hc.makeHeaderArray(args.Map(2))...), vm)
}
func (hc *Http) Do(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(3)
if len(argsIn.Arguments) == 3 {
argsIn.Arguments = append(argsIn.Arguments, vm.ToValue(nil))
}
var r *httpclient.Result
if cb, ok := goja.AssertFunction(argsIn.Arguments[3]); ok {
r = hc.client.ManualDo(hc.makeURL(args.Str(0)), args.Str(1), args.Any(2), hc.makeHeaderArray(args.Map(4))...)
buf := make([]byte, 1024)
for {
n, err := r.Response.Body.Read(buf)
if err != nil {
break
}
_, _ = cb(argsIn.This, vm.ToValue(u.String(buf[0:n])))
}
} else {
r = hc.client.Do(hc.makeURL(args.Str(0)), args.Str(1), args.Any(2), hc.makeHeaderArray(args.Map(4))...)
}
return makeResult(r, vm)
}
func (hc *Http) Upload(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(2)
postData := map[string]string{}
postFiles := map[string]any{}
u.Convert(args.Any(1), &postData)
if len(argsIn.Arguments) > 2 {
u.Convert(args.Any(2), &postFiles)
}
r, _ := hc.client.MPost(hc.makeURL(args.Str(0)), postData, postFiles, hc.makeHeaderArray(args.Map(3))...)
return makeResult(r, vm)
}
func (hc *Http) Download(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(2)
var r *httpclient.Result
var callback goja.Callable
if len(argsIn.Arguments) > 2 {
if cb, ok := goja.AssertFunction(argsIn.Arguments[2]); ok {
callback = cb
}
}
if callback != nil {
r, _ = hc.client.Download(hc.makeURL(args.Str(0)), args.Str(1), func(start, end int64, ok bool, finished, total int64) {
_, _ = callback(argsIn.This, vm.ToValue(finished), vm.ToValue(total))
}, hc.makeHeaderArray(args.Map(3))...)
} else {
r, _ = hc.client.Download(hc.makeURL(args.Str(0)), args.Str(1), nil, hc.makeHeaderArray(args.Map(3))...)
}
return makeResult(r, vm)
}
func (hc *Http) Connect(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(1)
url := hc.makeURL(args.Str(0))
if strings.HasPrefix(url, "http") {
url = strings.Replace(url, "http", "ws", 1)
}
ws := &WS{url: url, this: args.This, running: false, pingStopChan: make(chan bool, 1), closeChan: make(chan bool, 1), logger: args.Logger, headers: make(map[string]string)}
for k, v := range hc.globalHeaders {
ws.headers[k] = v
}
if opt := args.Obj(1); opt != nil {
if headers := opt.Map("headers"); headers != nil {
for k, v := range headers {
ws.headers[k] = u.String(v)
}
}
ws.compress = opt.Bool("compress")
ws.onOpen = opt.Func("onOpen")
ws.onClose = opt.Func("onClose")
ws.onError = opt.Func("onError")
// ws.onPing = opt.Func("onPing")
// ws.onPong = opt.Func("onPong")
ws.onMessage = opt.Func("onMessage")
ws.pingInterval = opt.Int64("pingInterval")
ws.reconnectInterval = opt.Int64("reconnectInterval")
if ws.reconnectInterval == 0 && ws.onMessage != nil {
ws.reconnectInterval = 1000
}
}
// fmt.Println(u.BMagenta("WS"), "start")
if err := ws.connect(vm); err == nil {
if ws.pingInterval > 0 {
// fmt.Println(u.BMagenta("WS"), "start ping")
go func() {
// ws.sleep(time.Duration(ws.pingInterval) * time.Millisecond)
for {
if ws.conn != nil {
ws.writeMessage(websocket.PingMessage, nil)
}
if !ws.running {
break
}
ws.sleep(time.Duration(ws.pingInterval) * time.Millisecond)
if !ws.running {
break
}
}
// fmt.Println(u.BMagenta("WS"), "stop ping")
ws.pingStopChan <- true
}()
} else {
ws.pingStopChan <- true
}
if ws.onMessage != nil {
// fmt.Println(u.BMagenta("WS"), "start onMessage")
go func() {
for {
for {
if ws.conn != nil {
typ, data, err := readWSMessage(ws.conn)
if err != nil {
break
}
_, _ = ws.onMessage(ws.this, vm.ToValue(typ), vm.ToValue(data))
}
}
// fmt.Println(u.BMagenta("WS"), "stop onMessage")
// 未结束的连接自动重连
if !ws.running {
break
}
ws.sleep(time.Duration(ws.reconnectInterval) * time.Millisecond)
if !ws.running {
break
}
// fmt.Println(u.BMagenta("WS"), "reconnect")
ws.connect(vm)
}
// fmt.Println(u.BMagenta("WS"), "stop onMessage2")
ws.closeChan <- true
}()
} else {
ws.closeChan <- true
}
return vm.ToValue(gojs.MakeMap(ws))
} else {
panic(vm.NewGoError(err))
}
}
type WS struct {
conn *websocket.Conn
running bool
closed bool
closeChan chan bool
pingStopChan chan bool
logger *log.Logger
this goja.Value
writeLock sync.Mutex
url string
headers map[string]string
compress bool
onOpen goja.Callable
onClose goja.Callable
onError goja.Callable
// onPing goja.Callable
// onPong goja.Callable
pingTimes int64
pongTimes int64
lastPingTime int64
lastPongTime int64
onMessage goja.Callable
pingInterval int64
reconnectInterval int64
}
func (ws *WS) sleep(interval time.Duration) {
if interval < time.Second {
time.Sleep(interval)
} else {
for {
time.Sleep(time.Second)
interval -= time.Second
if !ws.running || interval <= 0 {
break
}
}
}
}
func (ws *WS) error(err goja.Value) {
if ws.onError != nil {
ws.onError(ws.this, err)
}
}
func (ws *WS) connect(vm *goja.Runtime) error {
reqHeader := http.Header{}
for k, v := range ws.headers {
reqHeader.Set(k, v)
}
conn, _, err := websocket.DefaultDialer.Dial(ws.url, reqHeader)
if err != nil {
return err
}
ws.conn = conn
if ws.reconnectInterval > 0 || ws.pingInterval > 0 {
ws.running = true
}
ws.closed = false
if ws.compress {
conn.EnableWriteCompression(true)
}
// if ws.onPing != nil {
// conn.SetPingHandler(func(appData string) error {
// fmt.Println(u.BMagenta("WS"), "onPing")
// _, err := ws.onPing(ws.this, vm.ToValue(appData))
// return err
// })
// return err
// }
// if ws.onPong != nil {
// conn.SetPongHandler(func(appData string) error {
// fmt.Println(u.BMagenta("WS"), "onPong")
// _, err := ws.onPong(ws.this, vm.ToValue(appData))
// return err
// })
// return err
// }
ws.pingTimes = 0
ws.pongTimes = 0
ws.lastPingTime = 0
ws.lastPongTime = 0
conn.SetPongHandler(func(appData string) error {
ws.pongTimes++
ws.lastPongTime = time.Now().UnixMilli()
return nil
})
conn.SetCloseHandler(func(code int, text string) error {
// fmt.Println(u.BMagenta("WS"), "onClose")
if ws.onClose != nil {
_, err := ws.onClose(ws.this, vm.ToValue(code), vm.ToValue(text))
return err
}
// 关闭旧连接和接收器
if !ws.closed {
ws.conn.Close()
ws.closed = true
}
// 未结束的连接自动重连
if ws.running {
time.Sleep(time.Duration(ws.reconnectInterval) * time.Millisecond)
// fmt.Println(u.BMagenta("WS"), "reconnect")
return ws.connect(vm)
}
return nil
})
return nil
}
func (ws *WS) PingCount(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
return vm.ToValue(gojs.Map{
"pingTimes": ws.pingTimes,
"pongTimes": ws.pongTimes,
"lastPingTime": ws.lastPingTime,
"lastPongTime": ws.lastPongTime,
})
}
func (ws *WS) Read(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
typ, data, err := readWSMessage(ws.conn)
if err != nil {
panic(vm.NewGoError(err))
}
return vm.ToValue(gojs.Map{
"type": typ,
"data": data,
})
}
func readWSMessage(client *websocket.Conn) (string, any, error) {
msgType := "close"
var msgData any
typ, data, err := client.ReadMessage()
if err == nil && typ != websocket.CloseMessage {
switch typ {
case websocket.TextMessage:
if err2 := json.Unmarshal(data, &msgData); err2 != nil {
msgData = string(data)
msgType = "text"
} else {
msgType = "json"
}
case websocket.BinaryMessage:
if err2 := json.Unmarshal(data, &msgData); err2 != nil {
msgData = data
msgType = "binary"
} else {
msgType = "json"
}
case websocket.PingMessage:
msgData = string(data)
msgType = "ping"
case websocket.PongMessage:
msgData = string(data)
msgType = "pong"
}
}
return msgType, msgData, err
}
func (ws *WS) writeMessage(typ int, msg []byte) error {
ws.writeLock.Lock()
defer ws.writeLock.Unlock()
if typ == websocket.PingMessage {
ws.pingTimes++
ws.lastPingTime = time.Now().UnixMilli()
}
return ws.conn.WriteMessage(typ, msg)
}
func (ws *WS) Write(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(1)
var err error
switch args.Any(0).(type) {
case []byte:
err = ws.writeMessage(websocket.BinaryMessage, args.Bytes(0))
case string:
err = ws.writeMessage(websocket.TextMessage, args.Bytes(0))
default:
err = ws.writeMessage(websocket.TextMessage, u.JsonBytes(args.Any(0)))
}
if err != nil {
panic(vm.NewGoError(err))
}
return nil
}
func (ws *WS) Ping(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
err := ws.writeMessage(websocket.PingMessage, nil)
if err != nil {
panic(vm.NewGoError(err))
}
return nil
}
func (ws *WS) WriteMessage(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
args := gojs.MakeArgs(&argsIn, vm).Check(2)
var err error
switch args.Str(0) {
case "text":
err = ws.writeMessage(websocket.TextMessage, args.Bytes(1))
case "binary":
err = ws.writeMessage(websocket.BinaryMessage, args.Bytes(1))
case "ping":
err = ws.writeMessage(websocket.PingMessage, args.Bytes(1))
case "pong":
err = ws.writeMessage(websocket.PongMessage, args.Bytes(1))
case "close":
err = ws.writeMessage(websocket.CloseMessage, args.Bytes(1))
case "json":
err = ws.writeMessage(websocket.TextMessage, u.JsonBytes(args.Any(1)))
default:
err = ws.writeMessage(websocket.TextMessage, args.Bytes(1))
}
if err != nil {
panic(vm.NewGoError(err))
}
return nil
}
func (ws *WS) Close(argsIn goja.FunctionCall, vm *goja.Runtime) goja.Value {
// fmt.Println(u.BMagenta("WS"), "stop")
ws.running = false
if !ws.closed {
ws.closed = true
err := ws.conn.Close()
if err != nil {
panic(vm.NewGoError(err))
}
<-ws.pingStopChan
<-ws.closeChan
}
return nil
}