更新动态加载策略,增加动态加载测试用例

This commit is contained in:
Star 2025-07-23 21:03:38 +08:00
parent 47dd76db71
commit cceea9466c
5 changed files with 84 additions and 29 deletions

4
go.mod
View File

@ -3,14 +3,13 @@ module apigo.cc/gojs/task
go 1.23.0 go 1.23.0
require ( require (
apigo.cc/gojs v0.0.21 apigo.cc/gojs v0.0.23
apigo.cc/gojs/console v0.0.2 apigo.cc/gojs/console v0.0.2
apigo.cc/gojs/file v0.0.4 apigo.cc/gojs/file v0.0.4
apigo.cc/gojs/runtime v0.0.3 apigo.cc/gojs/runtime v0.0.3
apigo.cc/gojs/util v0.0.12 apigo.cc/gojs/util v0.0.12
github.com/robfig/cron/v3 v3.0.1 github.com/robfig/cron/v3 v3.0.1
github.com/ssgo/config v1.7.9 github.com/ssgo/config v1.7.9
github.com/ssgo/log v1.7.7
github.com/ssgo/u v1.7.21 github.com/ssgo/u v1.7.21
) )
@ -23,6 +22,7 @@ require (
github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
github.com/obscuren/ecies v0.0.0-20150213224233-7c0f4a9b18d9 // indirect github.com/obscuren/ecies v0.0.0-20150213224233-7c0f4a9b18d9 // indirect
github.com/ssgo/log v1.7.7 // indirect
github.com/ssgo/standard v1.7.7 // indirect github.com/ssgo/standard v1.7.7 // indirect
github.com/ssgo/tool v0.4.29 // indirect github.com/ssgo/tool v0.4.29 // indirect
golang.org/x/crypto v0.40.0 // indirect golang.org/x/crypto v0.40.0 // indirect

View File

@ -8,20 +8,17 @@ import (
"apigo.cc/gojs/goja" "apigo.cc/gojs/goja"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/ssgo/config" "github.com/ssgo/config"
"github.com/ssgo/log"
"github.com/ssgo/u" "github.com/ssgo/u"
) )
const pluginName = "task" const pluginName = "task"
var logger = log.DefaultLogger
type Config struct { type Config struct {
HotLoad *bool HotLoad *int
} }
var conf = Config{ var conf = Config{
HotLoad: u.BoolPtr(false), HotLoad: u.IntPtr(0),
} }
var waitChan chan struct{} var waitChan chan struct{}
@ -57,7 +54,7 @@ func init() {
return mappedObj return mappedObj
}, },
OnKill: func() { OnKill: func() {
_pluginObject.Stop() _pluginObject.Stop(nil)
}, },
WaitForStop: func() { WaitForStop: func() {
if waitChan != nil { if waitChan != nil {

View File

@ -7,8 +7,14 @@ import u from 'apigo.cc/gojs/util'
co.info('plugin test start') co.info('plugin test start')
try { try {
file.mkdir('testTasks/tmp') file.mkdir('testTasks/tmp')
task.config({
hotLoad: 1
})
task.start() task.start()
let jsFileC = 'testTasks/tmp/c.js'
file.copy('testTasks/c.js', jsFileC)
task.addTask('@every 1s', jsFileC)
for (let i = 0; i < 10; i++) { for (let i = 0; i < 10; i++) {
let jsFileA = 'testTasks/tmp/a' + i + '.js' let jsFileA = 'testTasks/tmp/a' + i + '.js'
let jsFileB = 'testTasks/tmp/b' + i + '.js' let jsFileB = 'testTasks/tmp/b' + i + '.js'
@ -18,7 +24,9 @@ try {
task.addTask('@every 1s', jsFileB) task.addTask('@every 1s', jsFileB)
} }
co.info('任务全部启动完成,等待任务运行') co.info('任务全部启动完成,等待任务运行')
rt.sleep(3000) rt.sleep(1000)
file.write(jsFileC, file.read('testTasks/c.js').replace('let runValue = 1', 'let runValue = 2'))
rt.sleep(2000)
co.info('任务运行结束,正在停止任务进程') co.info('任务运行结束,正在停止任务进程')
task.stop() task.stop()
@ -42,6 +50,19 @@ try {
if (aRunTimes !== 30) return 'aRunTimes(' + aRunTimes + ') not 30' if (aRunTimes !== 30) return 'aRunTimes(' + aRunTimes + ') not 30'
co.info('aRunTimes 校验通过', aRunTimes) co.info('aRunTimes 校验通过', aRunTimes)
// c 重新加载过前2次1后1次2
let cStarts = task.get('cStarts')
if (cStarts !== 3) return 'cStarts(' + cStarts + ') not 3'
co.info('cStarts 校验通过', cStarts)
let cStops = task.get('cStops')
if (cStops !== 3) return 'cStops(' + cStops + ') not 3'
co.info('cStops 校验通过', cStops)
let cTag = task.get('cTag')
if (cTag !== 4) return 'cTag(' + cTag + ') not 4'
co.info('cTag 校验通过', cTag)
co.info('全部测试通过') co.info('全部测试通过')
return true return true
} catch (ex) { } catch (ex) {

48
task.go
View File

@ -6,8 +6,8 @@ import (
"time" "time"
"apigo.cc/gojs" "apigo.cc/gojs"
"apigo.cc/gojs/goja"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/ssgo/log"
"github.com/ssgo/u" "github.com/ssgo/u"
) )
@ -21,7 +21,8 @@ type Task struct {
policy string policy string
} }
func (obj *PluginObject) Start() { func (obj *PluginObject) Start(vm *goja.Runtime) {
logger := gojs.GetLogger(vm)
obj.lock.Lock() obj.lock.Lock()
defer obj.lock.Unlock() defer obj.lock.Unlock()
if obj.isStarted { if obj.isStarted {
@ -29,15 +30,14 @@ func (obj *PluginObject) Start() {
} }
waitChan = make(chan struct{}) waitChan = make(chan struct{})
logger = log.New(u.ShortUniqueId())
logger.Info("task service is starting") logger.Info("task service is starting")
obj.skipCron.Start() obj.skipCron.Start()
obj.delayCron.Start() obj.delayCron.Start()
// obj.asyncCron = cron.New(cron.WithSeconds()) // obj.asyncCron = cron.New(cron.WithSeconds())
// obj.asyncCron.Start() // obj.asyncCron.Start()
if *conf.HotLoad { if *conf.HotLoad > 0 {
// 每隔5秒检查一次任务文件是否有更新如果更新就进行热加载 // 每隔5秒检查一次任务文件是否有更新如果更新就进行热加载
obj.monitorTaskId, _ = obj.skipCron.AddFunc("@every 5s", func() { obj.monitorTaskId, _ = obj.skipCron.AddFunc(fmt.Sprintf("@every %ds", *conf.HotLoad), func() {
tasks := []*Task{} tasks := []*Task{}
obj.lock.RLock() obj.lock.RLock()
for _, task := range obj.tasks { for _, task := range obj.tasks {
@ -51,13 +51,16 @@ func (obj *PluginObject) Start() {
// 文件变化,重新加载 // 文件变化,重新加载
rt := gojs.New() rt := gojs.New()
if _, err := rt.RunFile(task.file); err == nil { if _, err := rt.RunFile(task.file); err == nil {
task.RunFunc("onStop")
task.lock.Lock() task.lock.Lock()
task.vm = rt task.vm = rt
task.mtime = fi.ModTime task.mtime = fi.ModTime
task.lock.Unlock() task.lock.Unlock()
logger.Info("reload task code success", "file", task.file)
task.RunFunc("onStart")
} else { } else {
// 发生错误时仅更新 mtime防止不断重新加载 // 发生错误时仅更新 mtime防止不断重新加载
log.DefaultLogger.Error("failed to reload task code", "err", err.Error(), "file", task.file) logger.Error("failed to reload task code", "err", err.Error(), "file", task.file)
task.lock.Lock() task.lock.Lock()
task.mtime = fi.ModTime task.mtime = fi.ModTime
task.lock.Unlock() task.lock.Unlock()
@ -71,7 +74,8 @@ func (obj *PluginObject) Start() {
logger.Info("task service is started") logger.Info("task service is started")
} }
func (obj *PluginObject) Stop() { func (obj *PluginObject) Stop(vm *goja.Runtime) {
logger := gojs.GetLogger(vm)
obj.lock.Lock() obj.lock.Lock()
defer obj.lock.Unlock() defer obj.lock.Unlock()
if !obj.isStarted { if !obj.isStarted {
@ -80,7 +84,7 @@ func (obj *PluginObject) Stop() {
defer close(waitChan) defer close(waitChan)
logger.Info("task service is stopping") logger.Info("task service is stopping")
if *conf.HotLoad && obj.monitorTaskId != 0 { if *conf.HotLoad > 0 {
obj.skipCron.Remove(obj.monitorTaskId) obj.skipCron.Remove(obj.monitorTaskId)
} }
@ -92,7 +96,7 @@ func (obj *PluginObject) Stop() {
// <-ctx3.Done() // <-ctx3.Done()
for _, task := range obj.tasks { for _, task := range obj.tasks {
obj.removeTask(task) obj.removeTask(task, vm)
} }
obj.isStarted = false obj.isStarted = false
@ -100,7 +104,8 @@ func (obj *PluginObject) Stop() {
logger.Info("task service is stopped") logger.Info("task service is stopped")
} }
func (obj *PluginObject) AddTask(spec string, file string, policy *string) error { func (obj *PluginObject) AddTask(spec string, file string, policy *string, vm *goja.Runtime) error {
logger := gojs.GetLogger(vm)
fi := u.GetFileInfo(file) fi := u.GetFileInfo(file)
if fi == nil { if fi == nil {
logger.Error("task file not exists", "spec", spec, "file", file, "policy", policy) logger.Error("task file not exists", "spec", spec, "file", file, "policy", policy)
@ -110,7 +115,7 @@ func (obj *PluginObject) AddTask(spec string, file string, policy *string) error
rt := gojs.New() rt := gojs.New()
if _, err := rt.RunFile(file); err != nil { if _, err := rt.RunFile(file); err != nil {
logger.Error("failed to run task file", "spec", spec, "file", file, "policy", policy, "err", err.Error()) logger.Error("failed to run task file", "spec", spec, "file", file, "policy", policy, "err", err.Error())
return gojs.Err(err.Error()) return err
} }
task := &Task{ task := &Task{
@ -122,7 +127,7 @@ func (obj *PluginObject) AddTask(spec string, file string, policy *string) error
} }
if err := task.RunFunc("onStart"); err != nil { if err := task.RunFunc("onStart"); err != nil {
logger.Error("failed to run onStart for task", "spec", spec, "file", file, "policy", task.policy, "err", err.Error()) logger.Error("failed to run onStart for task", "spec", spec, "file", file, "policy", task.policy, "err", err.Error())
return gojs.Err(err.Error()) return err
} }
var err error var err error
@ -156,14 +161,14 @@ func (obj *PluginObject) AddTask(spec string, file string, policy *string) error
return nil return nil
} }
func (obj *PluginObject) RemoveTask(spec string, file string) error { func (obj *PluginObject) RemoveTask(spec string, file string, vm *goja.Runtime) error {
obj.lock.RLock() obj.lock.RLock()
task := obj.tasks[spec+file] task := obj.tasks[spec+file]
obj.lock.RUnlock() obj.lock.RUnlock()
if task == nil { if task == nil {
return nil return nil
} }
err := obj.removeTask(task) err := obj.removeTask(task, vm)
if err == nil { if err == nil {
obj.lock.Lock() obj.lock.Lock()
defer obj.lock.Unlock() defer obj.lock.Unlock()
@ -172,7 +177,8 @@ func (obj *PluginObject) RemoveTask(spec string, file string) error {
return err return err
} }
func (obj *PluginObject) removeTask(task *Task) error { func (obj *PluginObject) removeTask(task *Task, vm *goja.Runtime) error {
logger := gojs.GetLogger(vm)
if err := task.RunFunc("onStop"); err != nil { if err := task.RunFunc("onStop"); err != nil {
logger.Error("failed to run onStop for task", "spec", task.spec, "file", task.file, "policy", task.policy, "err", err.Error()) logger.Error("failed to run onStop for task", "spec", task.spec, "file", task.file, "policy", task.policy, "err", err.Error())
} }
@ -189,24 +195,26 @@ func (obj *PluginObject) removeTask(task *Task) error {
return nil return nil
} }
func (obj *PluginObject) RunOnce(file string) error { func (obj *PluginObject) RunOnce(file string, vm *goja.Runtime) error {
logger := gojs.GetLogger(vm)
rt := gojs.New() rt := gojs.New()
if _, err := rt.RunFile(file); err == nil { if _, err := rt.RunFile(file); err == nil {
if _, err := rt.RunCode("if(typeof onStart === 'function')onStart()"); err != nil { if _, err := rt.RunCode("if(typeof onStart === 'function')onStart()"); err != nil {
logger.Error("failed to run onStart for task", "file", file, "err", err.Error()) logger.Error("failed to run onStart for task", "file", file, "err", err.Error())
return gojs.Err(err.Error()) return err
} }
if _, err := rt.RunCode("if(typeof onRun === 'function')onRun()"); err != nil { if _, err := rt.RunCode("if(typeof onRun === 'function')onRun()"); err != nil {
logger.Error("failed to run onRun for task", "file", file, "err", err.Error()) logger.Error("failed to run onRun for task", "file", file, "err", err.Error())
return gojs.Err(err.Error()) return err
} }
if _, err := rt.RunCode("if(typeof onStop === 'function')onStop()"); err != nil { if _, err := rt.RunCode("if(typeof onStop === 'function')onStop()"); err != nil {
logger.Error("failed to run onStop for task", "file", file, "err", err.Error()) logger.Error("failed to run onStop for task", "file", file, "err", err.Error())
return gojs.Err(err.Error()) return err
} }
return nil return nil
} else { } else {
return gojs.Err(err.Error()) return err
} }
} }

29
testTasks/c.js Normal file
View File

@ -0,0 +1,29 @@
import rt from 'apigo.cc/gojs/runtime'
import co from 'apigo.cc/gojs/console'
import task from 'apigo.cc/gojs/task'
import u from 'apigo.cc/gojs/util'
const taskName = '「c」'
let runValue = 1
function onStart() {
// co.info(taskName, 'cStart', runValue)
task.getSet('cStarts', old => {
return u.int(old) + runValue
})
}
let i = 0
function onRun() {
// co.info(taskName, 'cRun', runValue)
task.getSet('cTag', old => {
return u.int(old) + runValue
})
}
function onStop() {
// co.info(taskName, 'cStop', runValue)
task.getSet('cStops', old => {
return u.int(old) + runValue
})
}