Compare commits

...

1 Commits
v0.0.1 ... main

Author SHA1 Message Date
Star
47dd76db71 fix many 2025-07-21 13:20:25 +08:00
4 changed files with 133 additions and 53 deletions

8
go.mod
View File

@ -3,14 +3,15 @@ module apigo.cc/gojs/task
go 1.23.0 go 1.23.0
require ( require (
apigo.cc/gojs v0.0.17 apigo.cc/gojs v0.0.21
apigo.cc/gojs/console v0.0.2 apigo.cc/gojs/console v0.0.2
apigo.cc/gojs/file v0.0.4 apigo.cc/gojs/file v0.0.4
apigo.cc/gojs/runtime v0.0.3 apigo.cc/gojs/runtime v0.0.3
apigo.cc/gojs/util v0.0.11 apigo.cc/gojs/util v0.0.12
github.com/robfig/cron/v3 v3.0.1 github.com/robfig/cron/v3 v3.0.1
github.com/ssgo/config v1.7.9
github.com/ssgo/log v1.7.7 github.com/ssgo/log v1.7.7
github.com/ssgo/u v1.7.20 github.com/ssgo/u v1.7.21
) )
require ( require (
@ -22,7 +23,6 @@ require (
github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
github.com/obscuren/ecies v0.0.0-20150213224233-7c0f4a9b18d9 // indirect github.com/obscuren/ecies v0.0.0-20150213224233-7c0f4a9b18d9 // indirect
github.com/ssgo/config v1.7.9 // indirect
github.com/ssgo/standard v1.7.7 // indirect github.com/ssgo/standard v1.7.7 // indirect
github.com/ssgo/tool v0.4.29 // indirect github.com/ssgo/tool v0.4.29 // indirect
golang.org/x/crypto v0.40.0 // indirect golang.org/x/crypto v0.40.0 // indirect

View File

@ -7,32 +7,49 @@ import (
"apigo.cc/gojs" "apigo.cc/gojs"
"apigo.cc/gojs/goja" "apigo.cc/gojs/goja"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/ssgo/config"
"github.com/ssgo/log"
"github.com/ssgo/u"
) )
const pluginName = "task" const pluginName = "task"
var logger = log.DefaultLogger
type Config struct {
HotLoad *bool
}
var conf = Config{
HotLoad: u.BoolPtr(false),
}
var waitChan chan struct{}
type PluginObject struct { type PluginObject struct {
skipCron *cron.Cron skipCron *cron.Cron
delayCron *cron.Cron delayCron *cron.Cron
// asyncCron *cron.Cron // asyncCron *cron.Cron
stopChan chan struct{} isStarted bool
isStarted bool monitorTaskId cron.EntryID
lock sync.RWMutex lock sync.RWMutex
tasks map[string]*Task tasks map[string]*Task
taskData map[string]any taskData map[string]any
taskDataLock sync.RWMutex taskDataLock sync.RWMutex
taskList map[string]*list.List taskList map[string]*list.List
taskListLock sync.RWMutex taskListLock sync.RWMutex
} }
var _pluginObject = &PluginObject{ var _pluginObject = &PluginObject{
tasks: map[string]*Task{}, skipCron: cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger))),
taskData: map[string]any{}, delayCron: cron.New(cron.WithSeconds()),
taskList: map[string]*list.List{}, tasks: map[string]*Task{},
taskData: map[string]any{},
taskList: map[string]*list.List{},
} }
func init() { func init() {
config.LoadConfig(pluginName, &conf)
tsCode := gojs.MakeTSCode(_pluginObject) tsCode := gojs.MakeTSCode(_pluginObject)
mappedObj := gojs.ToMap(_pluginObject) mappedObj := gojs.ToMap(_pluginObject)
gojs.Register("apigo.cc/gojs/"+pluginName, gojs.Module{ gojs.Register("apigo.cc/gojs/"+pluginName, gojs.Module{
@ -42,7 +59,18 @@ func init() {
OnKill: func() { OnKill: func() {
_pluginObject.Stop() _pluginObject.Stop()
}, },
WaitForStop: func() {
if waitChan != nil {
<-waitChan
}
},
TsCode: tsCode, TsCode: tsCode,
Desc: "task api", Desc: "task api",
}) })
} }
func (obj *PluginObject) Config(cfg Config) {
if cfg.HotLoad != nil {
conf.HotLoad = cfg.HotLoad
}
}

View File

@ -7,33 +7,42 @@ import u from 'apigo.cc/gojs/util'
co.info('plugin test start') co.info('plugin test start')
try { try {
file.mkdir('testTasks/tmp') file.mkdir('testTasks/tmp')
task.asyncStart() task.start()
for (let i = 0; i < 10; i++) { for (let i = 0; i < 10; i++) {
let jsFileA = 'testTasks/tmp/a' + i + '.js' let jsFileA = 'testTasks/tmp/a' + i + '.js'
let jsFileB = 'testTasks/tmp/b' + i + '.js' let jsFileB = 'testTasks/tmp/b' + i + '.js'
file.copy('testTasks/a.js', jsFileA) file.copy('testTasks/a.js', jsFileA)
file.copy('testTasks/b.js', jsFileB) file.copy('testTasks/b.js', jsFileB)
task.newTask('@every 1s', jsFileA) task.addTask('@every 1s', jsFileA)
task.newTask('@every 1s', jsFileB) task.addTask('@every 1s', jsFileB)
} }
co.info('任务全部启动完成,等待任务运行')
rt.sleep(3000) rt.sleep(3000)
co.info('任务运行结束,正在停止任务进程')
task.stop() task.stop()
let aStarts = task.get('aStarts') let aStarts = task.get('aStarts')
if (aStarts !== 10) return 'aStarts(' + aStarts + ') not 10' if (aStarts !== 10) return 'aStarts(' + aStarts + ') not 10'
co.info('aStarts 校验通过', aStarts)
let aStops = task.get('aStops') let aStops = task.get('aStops')
if (aStops !== 10) return 'aStops(' + aStops + ') not 10' if (aStops !== 10) return 'aStops(' + aStops + ') not 10'
co.info('aStops 校验通过', aStops)
let bStarts = task.get('bStarts') let bStarts = task.get('bStarts')
if (bStarts !== 10) return 'bStarts(' + bStarts + ') not 10' if (bStarts !== 10) return 'bStarts(' + bStarts + ') not 10'
co.info('bStarts 校验通过', bStarts)
let bStops = task.get('bStops') let bStops = task.get('bStops')
if (bStops !== 10) return 'bStops(' + bStops + ') not 10' if (bStops !== 10) return 'bStops(' + bStops + ') not 10'
co.info('bStops 校验通过', bStops)
let aRunTimes = task.get('aRunTimes') let aRunTimes = task.get('aRunTimes')
if (aRunTimes !== 30) return 'aRunTimes(' + aRunTimes + ') not 30' if (aRunTimes !== 30) return 'aRunTimes(' + aRunTimes + ') not 30'
co.info('aRunTimes 校验通过', aRunTimes)
co.info() co.info('全部测试通过')
return true return true
} catch (ex) { } catch (ex) {
co.error(ex) co.error(ex)

111
task.go
View File

@ -12,6 +12,7 @@ import (
) )
type Task struct { type Task struct {
spec string
file string file string
vm *gojs.Runtime vm *gojs.Runtime
lock sync.RWMutex lock sync.RWMutex
@ -20,19 +21,23 @@ type Task struct {
policy string policy string
} }
func (obj *PluginObject) AsyncStart() { func (obj *PluginObject) Start() {
obj.lock.Lock() obj.lock.Lock()
defer obj.lock.Unlock() defer obj.lock.Unlock()
if !obj.isStarted { if obj.isStarted {
obj.skipCron = cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger))) return
obj.skipCron.Start() }
obj.delayCron = cron.New(cron.WithSeconds())
obj.delayCron.Start() waitChan = make(chan struct{})
// obj.asyncCron = cron.New(cron.WithSeconds()) logger = log.New(u.ShortUniqueId())
// obj.asyncCron.Start() logger.Info("task service is starting")
obj.isStarted = true obj.skipCron.Start()
obj.stopChan = make(chan struct{}) obj.delayCron.Start()
obj.skipCron.AddFunc("@every 5s", func() { // obj.asyncCron = cron.New(cron.WithSeconds())
// obj.asyncCron.Start()
if *conf.HotLoad {
// 每隔5秒检查一次任务文件是否有更新如果更新就进行热加载
obj.monitorTaskId, _ = obj.skipCron.AddFunc("@every 5s", func() {
tasks := []*Task{} tasks := []*Task{}
obj.lock.RLock() obj.lock.RLock()
for _, task := range obj.tasks { for _, task := range obj.tasks {
@ -62,21 +67,23 @@ func (obj *PluginObject) AsyncStart() {
} }
}) })
} }
} obj.isStarted = true
logger.Info("task service is started")
func (obj *PluginObject) Start() {
obj.AsyncStart()
<-obj.stopChan
} }
func (obj *PluginObject) Stop() { func (obj *PluginObject) Stop() {
obj.lock.RLock() obj.lock.Lock()
isStarted := obj.isStarted defer obj.lock.Unlock()
obj.lock.RUnlock() if !obj.isStarted {
if !isStarted {
return return
} }
defer close(obj.stopChan)
defer close(waitChan)
logger.Info("task service is stopping")
if *conf.HotLoad && obj.monitorTaskId != 0 {
obj.skipCron.Remove(obj.monitorTaskId)
}
ctx1 := obj.skipCron.Stop() ctx1 := obj.skipCron.Stop()
ctx2 := obj.delayCron.Stop() ctx2 := obj.delayCron.Stop()
// ctx3 := obj.asyncCron.Stop() // ctx3 := obj.asyncCron.Stop()
@ -84,35 +91,38 @@ func (obj *PluginObject) Stop() {
<-ctx2.Done() <-ctx2.Done()
// <-ctx3.Done() // <-ctx3.Done()
obj.lock.Lock()
defer obj.lock.Unlock()
for _, task := range obj.tasks { for _, task := range obj.tasks {
task.RunFunc("onStop") obj.removeTask(task)
} }
obj.isStarted = false obj.isStarted = false
obj.tasks = map[string]*Task{} obj.tasks = map[string]*Task{}
logger.Info("task service is stopped")
} }
func (obj *PluginObject) NewTask(spec string, file string, policy *string) error { func (obj *PluginObject) AddTask(spec string, file string, policy *string) error {
fi := u.GetFileInfo(file) fi := u.GetFileInfo(file)
if fi == nil { if fi == nil {
return u.Error("task file not exists") logger.Error("task file not exists", "spec", spec, "file", file, "policy", policy)
return gojs.Err("task file not exists")
} }
rt := gojs.New() rt := gojs.New()
if _, err := rt.RunFile(file); err != nil { if _, err := rt.RunFile(file); err != nil {
return u.Error(err.Error()) logger.Error("failed to run task file", "spec", spec, "file", file, "policy", policy, "err", err.Error())
return gojs.Err(err.Error())
} }
task := &Task{ task := &Task{
spec: spec,
file: file, file: file,
vm: rt, vm: rt,
mtime: fi.ModTime, mtime: fi.ModTime,
policy: u.String(policy), policy: u.String(policy),
} }
if err := task.RunFunc("onStart"); err != nil { if err := task.RunFunc("onStart"); err != nil {
return u.Error(err.Error()) logger.Error("failed to run onStart for task", "spec", spec, "file", file, "policy", task.policy, "err", err.Error())
return gojs.Err(err.Error())
} }
var err error var err error
@ -135,9 +145,11 @@ func (obj *PluginObject) NewTask(spec string, file string, policy *string) error
} }
if err != nil { if err != nil {
return u.Error(err.Error()) logger.Error("failed to add task", "spec", spec, "file", file, "policy", task.policy, "err", err.Error())
return gojs.Err(err.Error())
} }
logger.Info("add task", "spec", spec, "file", file, "policy", task.policy)
obj.lock.Lock() obj.lock.Lock()
defer obj.lock.Unlock() defer obj.lock.Unlock()
obj.tasks[spec+file] = task obj.tasks[spec+file] = task
@ -151,7 +163,19 @@ func (obj *PluginObject) RemoveTask(spec string, file string) error {
if task == nil { if task == nil {
return nil return nil
} }
task.RunFunc("onStop") err := obj.removeTask(task)
if err == nil {
obj.lock.Lock()
defer obj.lock.Unlock()
delete(obj.tasks, task.spec+task.file)
}
return err
}
func (obj *PluginObject) removeTask(task *Task) error {
if err := task.RunFunc("onStop"); err != nil {
logger.Error("failed to run onStop for task", "spec", task.spec, "file", task.file, "policy", task.policy, "err", err.Error())
}
switch task.policy { switch task.policy {
case "skip": case "skip":
obj.skipCron.Remove(task.entryId) obj.skipCron.Remove(task.entryId)
@ -161,16 +185,35 @@ func (obj *PluginObject) RemoveTask(spec string, file string) error {
obj.delayCron.Remove(task.entryId) obj.delayCron.Remove(task.entryId)
} }
obj.lock.Lock() logger.Info("remove task", "spec", task.spec, "file", task.file, "policy", task.policy)
defer obj.lock.Unlock()
delete(obj.tasks, spec+file)
return nil return nil
} }
func (obj *PluginObject) RunOnce(file string) error {
rt := gojs.New()
if _, err := rt.RunFile(file); err == nil {
if _, err := rt.RunCode("if(typeof onStart === 'function')onStart()"); err != nil {
logger.Error("failed to run onStart for task", "file", file, "err", err.Error())
return gojs.Err(err.Error())
}
if _, err := rt.RunCode("if(typeof onRun === 'function')onRun()"); err != nil {
logger.Error("failed to run onRun for task", "file", file, "err", err.Error())
return gojs.Err(err.Error())
}
if _, err := rt.RunCode("if(typeof onStop === 'function')onStop()"); err != nil {
logger.Error("failed to run onStop for task", "file", file, "err", err.Error())
return gojs.Err(err.Error())
}
return nil
} else {
return gojs.Err(err.Error())
}
}
func (task *Task) RunFunc(fnName string) error { func (task *Task) RunFunc(fnName string) error {
task.lock.RLock() task.lock.RLock()
rt := task.vm rt := task.vm
task.lock.RUnlock() task.lock.RUnlock()
_, err := rt.RunCode(fmt.Sprintf("if(%s)%s()", fnName, fnName)) _, err := rt.RunCode(fmt.Sprintf("if(typeof %s === 'function')%s()", fnName, fnName))
return err return err
} }