diff --git a/go.mod b/go.mod index 5242d57..fc2bfed 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,13 @@ module apigo.cc/gojs/task go 1.23.0 require ( - apigo.cc/gojs v0.0.21 + apigo.cc/gojs v0.0.23 apigo.cc/gojs/console v0.0.2 apigo.cc/gojs/file v0.0.4 apigo.cc/gojs/runtime v0.0.3 apigo.cc/gojs/util v0.0.12 github.com/robfig/cron/v3 v3.0.1 github.com/ssgo/config v1.7.9 - github.com/ssgo/log v1.7.7 github.com/ssgo/u v1.7.21 ) @@ -23,6 +22,7 @@ require ( github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect github.com/kr/text v0.2.0 // indirect github.com/obscuren/ecies v0.0.0-20150213224233-7c0f4a9b18d9 // indirect + github.com/ssgo/log v1.7.7 // indirect github.com/ssgo/standard v1.7.7 // indirect github.com/ssgo/tool v0.4.29 // indirect golang.org/x/crypto v0.40.0 // indirect diff --git a/plugin.go b/plugin.go index 79e4257..a89780d 100644 --- a/plugin.go +++ b/plugin.go @@ -8,20 +8,17 @@ import ( "apigo.cc/gojs/goja" "github.com/robfig/cron/v3" "github.com/ssgo/config" - "github.com/ssgo/log" "github.com/ssgo/u" ) const pluginName = "task" -var logger = log.DefaultLogger - type Config struct { - HotLoad *bool + HotLoad *int } var conf = Config{ - HotLoad: u.BoolPtr(false), + HotLoad: u.IntPtr(0), } var waitChan chan struct{} @@ -57,7 +54,7 @@ func init() { return mappedObj }, OnKill: func() { - _pluginObject.Stop() + _pluginObject.Stop(nil) }, WaitForStop: func() { if waitChan != nil { diff --git a/plugin_test.js b/plugin_test.js index 3c5245f..7d7ee05 100644 --- a/plugin_test.js +++ b/plugin_test.js @@ -7,8 +7,14 @@ import u from 'apigo.cc/gojs/util' co.info('plugin test start') try { file.mkdir('testTasks/tmp') + task.config({ + hotLoad: 1 + }) task.start() + let jsFileC = 'testTasks/tmp/c.js' + file.copy('testTasks/c.js', jsFileC) + task.addTask('@every 1s', jsFileC) for (let i = 0; i < 10; i++) { let jsFileA = 'testTasks/tmp/a' + i + '.js' let jsFileB = 'testTasks/tmp/b' + i + '.js' @@ -18,7 +24,9 @@ try { task.addTask('@every 1s', jsFileB) } co.info('任务全部启动完成,等待任务运行') - rt.sleep(3000) + rt.sleep(1000) + file.write(jsFileC, file.read('testTasks/c.js').replace('let runValue = 1', 'let runValue = 2')) + rt.sleep(2000) co.info('任务运行结束,正在停止任务进程') task.stop() @@ -42,6 +50,19 @@ try { if (aRunTimes !== 30) return 'aRunTimes(' + aRunTimes + ') not 30' co.info('aRunTimes 校验通过', aRunTimes) + // c 重新加载过,前2次1,后1次2 + let cStarts = task.get('cStarts') + if (cStarts !== 3) return 'cStarts(' + cStarts + ') not 3' + co.info('cStarts 校验通过', cStarts) + + let cStops = task.get('cStops') + if (cStops !== 3) return 'cStops(' + cStops + ') not 3' + co.info('cStops 校验通过', cStops) + + let cTag = task.get('cTag') + if (cTag !== 4) return 'cTag(' + cTag + ') not 4' + co.info('cTag 校验通过', cTag) + co.info('全部测试通过') return true } catch (ex) { diff --git a/task.go b/task.go index 2d04bc5..ee3fbf2 100644 --- a/task.go +++ b/task.go @@ -6,8 +6,8 @@ import ( "time" "apigo.cc/gojs" + "apigo.cc/gojs/goja" "github.com/robfig/cron/v3" - "github.com/ssgo/log" "github.com/ssgo/u" ) @@ -21,7 +21,8 @@ type Task struct { policy string } -func (obj *PluginObject) Start() { +func (obj *PluginObject) Start(vm *goja.Runtime) { + logger := gojs.GetLogger(vm) obj.lock.Lock() defer obj.lock.Unlock() if obj.isStarted { @@ -29,15 +30,14 @@ func (obj *PluginObject) Start() { } waitChan = make(chan struct{}) - logger = log.New(u.ShortUniqueId()) logger.Info("task service is starting") obj.skipCron.Start() obj.delayCron.Start() // obj.asyncCron = cron.New(cron.WithSeconds()) // obj.asyncCron.Start() - if *conf.HotLoad { + if *conf.HotLoad > 0 { // 每隔5秒检查一次任务文件是否有更新,如果更新就进行热加载 - obj.monitorTaskId, _ = obj.skipCron.AddFunc("@every 5s", func() { + obj.monitorTaskId, _ = obj.skipCron.AddFunc(fmt.Sprintf("@every %ds", *conf.HotLoad), func() { tasks := []*Task{} obj.lock.RLock() for _, task := range obj.tasks { @@ -51,13 +51,16 @@ func (obj *PluginObject) Start() { // 文件变化,重新加载 rt := gojs.New() if _, err := rt.RunFile(task.file); err == nil { + task.RunFunc("onStop") task.lock.Lock() task.vm = rt task.mtime = fi.ModTime task.lock.Unlock() + logger.Info("reload task code success", "file", task.file) + task.RunFunc("onStart") } else { // 发生错误时仅更新 mtime,防止不断重新加载 - log.DefaultLogger.Error("failed to reload task code", "err", err.Error(), "file", task.file) + logger.Error("failed to reload task code", "err", err.Error(), "file", task.file) task.lock.Lock() task.mtime = fi.ModTime task.lock.Unlock() @@ -71,7 +74,8 @@ func (obj *PluginObject) Start() { logger.Info("task service is started") } -func (obj *PluginObject) Stop() { +func (obj *PluginObject) Stop(vm *goja.Runtime) { + logger := gojs.GetLogger(vm) obj.lock.Lock() defer obj.lock.Unlock() if !obj.isStarted { @@ -80,7 +84,7 @@ func (obj *PluginObject) Stop() { defer close(waitChan) logger.Info("task service is stopping") - if *conf.HotLoad && obj.monitorTaskId != 0 { + if *conf.HotLoad > 0 { obj.skipCron.Remove(obj.monitorTaskId) } @@ -92,7 +96,7 @@ func (obj *PluginObject) Stop() { // <-ctx3.Done() for _, task := range obj.tasks { - obj.removeTask(task) + obj.removeTask(task, vm) } obj.isStarted = false @@ -100,7 +104,8 @@ func (obj *PluginObject) Stop() { logger.Info("task service is stopped") } -func (obj *PluginObject) AddTask(spec string, file string, policy *string) error { +func (obj *PluginObject) AddTask(spec string, file string, policy *string, vm *goja.Runtime) error { + logger := gojs.GetLogger(vm) fi := u.GetFileInfo(file) if fi == nil { logger.Error("task file not exists", "spec", spec, "file", file, "policy", policy) @@ -110,7 +115,7 @@ func (obj *PluginObject) AddTask(spec string, file string, policy *string) error rt := gojs.New() if _, err := rt.RunFile(file); err != nil { logger.Error("failed to run task file", "spec", spec, "file", file, "policy", policy, "err", err.Error()) - return gojs.Err(err.Error()) + return err } task := &Task{ @@ -122,7 +127,7 @@ func (obj *PluginObject) AddTask(spec string, file string, policy *string) error } if err := task.RunFunc("onStart"); err != nil { logger.Error("failed to run onStart for task", "spec", spec, "file", file, "policy", task.policy, "err", err.Error()) - return gojs.Err(err.Error()) + return err } var err error @@ -156,14 +161,14 @@ func (obj *PluginObject) AddTask(spec string, file string, policy *string) error return nil } -func (obj *PluginObject) RemoveTask(spec string, file string) error { +func (obj *PluginObject) RemoveTask(spec string, file string, vm *goja.Runtime) error { obj.lock.RLock() task := obj.tasks[spec+file] obj.lock.RUnlock() if task == nil { return nil } - err := obj.removeTask(task) + err := obj.removeTask(task, vm) if err == nil { obj.lock.Lock() defer obj.lock.Unlock() @@ -172,7 +177,8 @@ func (obj *PluginObject) RemoveTask(spec string, file string) error { return err } -func (obj *PluginObject) removeTask(task *Task) error { +func (obj *PluginObject) removeTask(task *Task, vm *goja.Runtime) error { + logger := gojs.GetLogger(vm) if err := task.RunFunc("onStop"); err != nil { logger.Error("failed to run onStop for task", "spec", task.spec, "file", task.file, "policy", task.policy, "err", err.Error()) } @@ -189,24 +195,26 @@ func (obj *PluginObject) removeTask(task *Task) error { return nil } -func (obj *PluginObject) RunOnce(file string) error { +func (obj *PluginObject) RunOnce(file string, vm *goja.Runtime) error { + logger := gojs.GetLogger(vm) + rt := gojs.New() if _, err := rt.RunFile(file); err == nil { if _, err := rt.RunCode("if(typeof onStart === 'function')onStart()"); err != nil { logger.Error("failed to run onStart for task", "file", file, "err", err.Error()) - return gojs.Err(err.Error()) + return err } if _, err := rt.RunCode("if(typeof onRun === 'function')onRun()"); err != nil { logger.Error("failed to run onRun for task", "file", file, "err", err.Error()) - return gojs.Err(err.Error()) + return err } if _, err := rt.RunCode("if(typeof onStop === 'function')onStop()"); err != nil { logger.Error("failed to run onStop for task", "file", file, "err", err.Error()) - return gojs.Err(err.Error()) + return err } return nil } else { - return gojs.Err(err.Error()) + return err } } diff --git a/testTasks/c.js b/testTasks/c.js new file mode 100644 index 0000000..0d64395 --- /dev/null +++ b/testTasks/c.js @@ -0,0 +1,29 @@ +import rt from 'apigo.cc/gojs/runtime' +import co from 'apigo.cc/gojs/console' +import task from 'apigo.cc/gojs/task' +import u from 'apigo.cc/gojs/util' + +const taskName = '「c」' +let runValue = 1 + +function onStart() { + // co.info(taskName, 'cStart', runValue) + task.getSet('cStarts', old => { + return u.int(old) + runValue + }) +} + +let i = 0 +function onRun() { + // co.info(taskName, 'cRun', runValue) + task.getSet('cTag', old => { + return u.int(old) + runValue + }) +} + +function onStop() { + // co.info(taskName, 'cStop', runValue) + task.getSet('cStops', old => { + return u.int(old) + runValue + }) +}