diff --git a/go.mod b/go.mod index 103e408..5242d57 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,15 @@ module apigo.cc/gojs/task go 1.23.0 require ( - apigo.cc/gojs v0.0.17 + apigo.cc/gojs v0.0.21 apigo.cc/gojs/console v0.0.2 apigo.cc/gojs/file v0.0.4 apigo.cc/gojs/runtime v0.0.3 - apigo.cc/gojs/util v0.0.11 + apigo.cc/gojs/util v0.0.12 github.com/robfig/cron/v3 v3.0.1 + github.com/ssgo/config v1.7.9 github.com/ssgo/log v1.7.7 - github.com/ssgo/u v1.7.20 + github.com/ssgo/u v1.7.21 ) require ( @@ -22,7 +23,6 @@ require ( github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect github.com/kr/text v0.2.0 // indirect github.com/obscuren/ecies v0.0.0-20150213224233-7c0f4a9b18d9 // indirect - github.com/ssgo/config v1.7.9 // indirect github.com/ssgo/standard v1.7.7 // indirect github.com/ssgo/tool v0.4.29 // indirect golang.org/x/crypto v0.40.0 // indirect diff --git a/plugin.go b/plugin.go index e1d58bd..79e4257 100644 --- a/plugin.go +++ b/plugin.go @@ -7,32 +7,49 @@ import ( "apigo.cc/gojs" "apigo.cc/gojs/goja" "github.com/robfig/cron/v3" + "github.com/ssgo/config" + "github.com/ssgo/log" + "github.com/ssgo/u" ) const pluginName = "task" +var logger = log.DefaultLogger + +type Config struct { + HotLoad *bool +} + +var conf = Config{ + HotLoad: u.BoolPtr(false), +} +var waitChan chan struct{} + type PluginObject struct { skipCron *cron.Cron delayCron *cron.Cron // asyncCron *cron.Cron - stopChan chan struct{} - isStarted bool - lock sync.RWMutex - tasks map[string]*Task - taskData map[string]any - taskDataLock sync.RWMutex + isStarted bool + monitorTaskId cron.EntryID + lock sync.RWMutex + tasks map[string]*Task + taskData map[string]any + taskDataLock sync.RWMutex taskList map[string]*list.List taskListLock sync.RWMutex } var _pluginObject = &PluginObject{ - tasks: map[string]*Task{}, - taskData: map[string]any{}, - taskList: map[string]*list.List{}, + skipCron: cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger))), + delayCron: cron.New(cron.WithSeconds()), + tasks: map[string]*Task{}, + taskData: map[string]any{}, + taskList: map[string]*list.List{}, } func init() { + config.LoadConfig(pluginName, &conf) tsCode := gojs.MakeTSCode(_pluginObject) mappedObj := gojs.ToMap(_pluginObject) gojs.Register("apigo.cc/gojs/"+pluginName, gojs.Module{ @@ -42,7 +59,18 @@ func init() { OnKill: func() { _pluginObject.Stop() }, + WaitForStop: func() { + if waitChan != nil { + <-waitChan + } + }, TsCode: tsCode, Desc: "task api", }) } + +func (obj *PluginObject) Config(cfg Config) { + if cfg.HotLoad != nil { + conf.HotLoad = cfg.HotLoad + } +} diff --git a/plugin_test.js b/plugin_test.js index d8e0d5d..3c5245f 100644 --- a/plugin_test.js +++ b/plugin_test.js @@ -7,33 +7,42 @@ import u from 'apigo.cc/gojs/util' co.info('plugin test start') try { file.mkdir('testTasks/tmp') - task.asyncStart() + task.start() for (let i = 0; i < 10; i++) { let jsFileA = 'testTasks/tmp/a' + i + '.js' let jsFileB = 'testTasks/tmp/b' + i + '.js' file.copy('testTasks/a.js', jsFileA) file.copy('testTasks/b.js', jsFileB) - task.newTask('@every 1s', jsFileA) - task.newTask('@every 1s', jsFileB) + task.addTask('@every 1s', jsFileA) + task.addTask('@every 1s', jsFileB) } - + co.info('任务全部启动完成,等待任务运行') rt.sleep(3000) + co.info('任务运行结束,正在停止任务进程') task.stop() let aStarts = task.get('aStarts') if (aStarts !== 10) return 'aStarts(' + aStarts + ') not 10' + co.info('aStarts 校验通过', aStarts) + let aStops = task.get('aStops') if (aStops !== 10) return 'aStops(' + aStops + ') not 10' + co.info('aStops 校验通过', aStops) + let bStarts = task.get('bStarts') if (bStarts !== 10) return 'bStarts(' + bStarts + ') not 10' + co.info('bStarts 校验通过', bStarts) + let bStops = task.get('bStops') if (bStops !== 10) return 'bStops(' + bStops + ') not 10' + co.info('bStops 校验通过', bStops) + let aRunTimes = task.get('aRunTimes') if (aRunTimes !== 30) return 'aRunTimes(' + aRunTimes + ') not 30' + co.info('aRunTimes 校验通过', aRunTimes) - co.info() - + co.info('全部测试通过') return true } catch (ex) { co.error(ex) diff --git a/task.go b/task.go index 986ce24..2d04bc5 100644 --- a/task.go +++ b/task.go @@ -12,6 +12,7 @@ import ( ) type Task struct { + spec string file string vm *gojs.Runtime lock sync.RWMutex @@ -20,19 +21,23 @@ type Task struct { policy string } -func (obj *PluginObject) AsyncStart() { +func (obj *PluginObject) Start() { obj.lock.Lock() defer obj.lock.Unlock() - if !obj.isStarted { - obj.skipCron = cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger))) - obj.skipCron.Start() - obj.delayCron = cron.New(cron.WithSeconds()) - obj.delayCron.Start() - // obj.asyncCron = cron.New(cron.WithSeconds()) - // obj.asyncCron.Start() - obj.isStarted = true - obj.stopChan = make(chan struct{}) - obj.skipCron.AddFunc("@every 5s", func() { + if obj.isStarted { + return + } + + waitChan = make(chan struct{}) + logger = log.New(u.ShortUniqueId()) + logger.Info("task service is starting") + obj.skipCron.Start() + obj.delayCron.Start() + // obj.asyncCron = cron.New(cron.WithSeconds()) + // obj.asyncCron.Start() + if *conf.HotLoad { + // 每隔5秒检查一次任务文件是否有更新,如果更新就进行热加载 + obj.monitorTaskId, _ = obj.skipCron.AddFunc("@every 5s", func() { tasks := []*Task{} obj.lock.RLock() for _, task := range obj.tasks { @@ -62,21 +67,23 @@ func (obj *PluginObject) AsyncStart() { } }) } -} - -func (obj *PluginObject) Start() { - obj.AsyncStart() - <-obj.stopChan + obj.isStarted = true + logger.Info("task service is started") } func (obj *PluginObject) Stop() { - obj.lock.RLock() - isStarted := obj.isStarted - obj.lock.RUnlock() - if !isStarted { + obj.lock.Lock() + defer obj.lock.Unlock() + if !obj.isStarted { return } - defer close(obj.stopChan) + + defer close(waitChan) + logger.Info("task service is stopping") + if *conf.HotLoad && obj.monitorTaskId != 0 { + obj.skipCron.Remove(obj.monitorTaskId) + } + ctx1 := obj.skipCron.Stop() ctx2 := obj.delayCron.Stop() // ctx3 := obj.asyncCron.Stop() @@ -84,35 +91,38 @@ func (obj *PluginObject) Stop() { <-ctx2.Done() // <-ctx3.Done() - obj.lock.Lock() - defer obj.lock.Unlock() for _, task := range obj.tasks { - task.RunFunc("onStop") + obj.removeTask(task) } obj.isStarted = false obj.tasks = map[string]*Task{} + logger.Info("task service is stopped") } -func (obj *PluginObject) NewTask(spec string, file string, policy *string) error { +func (obj *PluginObject) AddTask(spec string, file string, policy *string) error { fi := u.GetFileInfo(file) if fi == nil { - return u.Error("task file not exists") + logger.Error("task file not exists", "spec", spec, "file", file, "policy", policy) + return gojs.Err("task file not exists") } rt := gojs.New() if _, err := rt.RunFile(file); err != nil { - return u.Error(err.Error()) + logger.Error("failed to run task file", "spec", spec, "file", file, "policy", policy, "err", err.Error()) + return gojs.Err(err.Error()) } task := &Task{ + spec: spec, file: file, vm: rt, mtime: fi.ModTime, policy: u.String(policy), } if err := task.RunFunc("onStart"); err != nil { - return u.Error(err.Error()) + logger.Error("failed to run onStart for task", "spec", spec, "file", file, "policy", task.policy, "err", err.Error()) + return gojs.Err(err.Error()) } var err error @@ -135,9 +145,11 @@ func (obj *PluginObject) NewTask(spec string, file string, policy *string) error } if err != nil { - return u.Error(err.Error()) + logger.Error("failed to add task", "spec", spec, "file", file, "policy", task.policy, "err", err.Error()) + return gojs.Err(err.Error()) } + logger.Info("add task", "spec", spec, "file", file, "policy", task.policy) obj.lock.Lock() defer obj.lock.Unlock() obj.tasks[spec+file] = task @@ -151,7 +163,19 @@ func (obj *PluginObject) RemoveTask(spec string, file string) error { if task == nil { return nil } - task.RunFunc("onStop") + err := obj.removeTask(task) + if err == nil { + obj.lock.Lock() + defer obj.lock.Unlock() + delete(obj.tasks, task.spec+task.file) + } + return err +} + +func (obj *PluginObject) removeTask(task *Task) error { + if err := task.RunFunc("onStop"); err != nil { + logger.Error("failed to run onStop for task", "spec", task.spec, "file", task.file, "policy", task.policy, "err", err.Error()) + } switch task.policy { case "skip": obj.skipCron.Remove(task.entryId) @@ -161,16 +185,35 @@ func (obj *PluginObject) RemoveTask(spec string, file string) error { obj.delayCron.Remove(task.entryId) } - obj.lock.Lock() - defer obj.lock.Unlock() - delete(obj.tasks, spec+file) + logger.Info("remove task", "spec", task.spec, "file", task.file, "policy", task.policy) return nil } +func (obj *PluginObject) RunOnce(file string) error { + rt := gojs.New() + if _, err := rt.RunFile(file); err == nil { + if _, err := rt.RunCode("if(typeof onStart === 'function')onStart()"); err != nil { + logger.Error("failed to run onStart for task", "file", file, "err", err.Error()) + return gojs.Err(err.Error()) + } + if _, err := rt.RunCode("if(typeof onRun === 'function')onRun()"); err != nil { + logger.Error("failed to run onRun for task", "file", file, "err", err.Error()) + return gojs.Err(err.Error()) + } + if _, err := rt.RunCode("if(typeof onStop === 'function')onStop()"); err != nil { + logger.Error("failed to run onStop for task", "file", file, "err", err.Error()) + return gojs.Err(err.Error()) + } + return nil + } else { + return gojs.Err(err.Error()) + } +} + func (task *Task) RunFunc(fnName string) error { task.lock.RLock() rt := task.vm task.lock.RUnlock() - _, err := rt.RunCode(fmt.Sprintf("if(%s)%s()", fnName, fnName)) + _, err := rt.RunCode(fmt.Sprintf("if(typeof %s === 'function')%s()", fnName, fnName)) return err }