Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
47dd76db71 |
8
go.mod
8
go.mod
@ -3,14 +3,15 @@ module apigo.cc/gojs/task
|
|||||||
go 1.23.0
|
go 1.23.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
apigo.cc/gojs v0.0.17
|
apigo.cc/gojs v0.0.21
|
||||||
apigo.cc/gojs/console v0.0.2
|
apigo.cc/gojs/console v0.0.2
|
||||||
apigo.cc/gojs/file v0.0.4
|
apigo.cc/gojs/file v0.0.4
|
||||||
apigo.cc/gojs/runtime v0.0.3
|
apigo.cc/gojs/runtime v0.0.3
|
||||||
apigo.cc/gojs/util v0.0.11
|
apigo.cc/gojs/util v0.0.12
|
||||||
github.com/robfig/cron/v3 v3.0.1
|
github.com/robfig/cron/v3 v3.0.1
|
||||||
|
github.com/ssgo/config v1.7.9
|
||||||
github.com/ssgo/log v1.7.7
|
github.com/ssgo/log v1.7.7
|
||||||
github.com/ssgo/u v1.7.20
|
github.com/ssgo/u v1.7.21
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
@ -22,7 +23,6 @@ require (
|
|||||||
github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect
|
github.com/google/pprof v0.0.0-20250630185457-6e76a2b096b5 // indirect
|
||||||
github.com/kr/text v0.2.0 // indirect
|
github.com/kr/text v0.2.0 // indirect
|
||||||
github.com/obscuren/ecies v0.0.0-20150213224233-7c0f4a9b18d9 // indirect
|
github.com/obscuren/ecies v0.0.0-20150213224233-7c0f4a9b18d9 // indirect
|
||||||
github.com/ssgo/config v1.7.9 // indirect
|
|
||||||
github.com/ssgo/standard v1.7.7 // indirect
|
github.com/ssgo/standard v1.7.7 // indirect
|
||||||
github.com/ssgo/tool v0.4.29 // indirect
|
github.com/ssgo/tool v0.4.29 // indirect
|
||||||
golang.org/x/crypto v0.40.0 // indirect
|
golang.org/x/crypto v0.40.0 // indirect
|
||||||
|
30
plugin.go
30
plugin.go
@ -7,16 +7,30 @@ import (
|
|||||||
"apigo.cc/gojs"
|
"apigo.cc/gojs"
|
||||||
"apigo.cc/gojs/goja"
|
"apigo.cc/gojs/goja"
|
||||||
"github.com/robfig/cron/v3"
|
"github.com/robfig/cron/v3"
|
||||||
|
"github.com/ssgo/config"
|
||||||
|
"github.com/ssgo/log"
|
||||||
|
"github.com/ssgo/u"
|
||||||
)
|
)
|
||||||
|
|
||||||
const pluginName = "task"
|
const pluginName = "task"
|
||||||
|
|
||||||
|
var logger = log.DefaultLogger
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
HotLoad *bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var conf = Config{
|
||||||
|
HotLoad: u.BoolPtr(false),
|
||||||
|
}
|
||||||
|
var waitChan chan struct{}
|
||||||
|
|
||||||
type PluginObject struct {
|
type PluginObject struct {
|
||||||
skipCron *cron.Cron
|
skipCron *cron.Cron
|
||||||
delayCron *cron.Cron
|
delayCron *cron.Cron
|
||||||
// asyncCron *cron.Cron
|
// asyncCron *cron.Cron
|
||||||
stopChan chan struct{}
|
|
||||||
isStarted bool
|
isStarted bool
|
||||||
|
monitorTaskId cron.EntryID
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
tasks map[string]*Task
|
tasks map[string]*Task
|
||||||
taskData map[string]any
|
taskData map[string]any
|
||||||
@ -27,12 +41,15 @@ type PluginObject struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var _pluginObject = &PluginObject{
|
var _pluginObject = &PluginObject{
|
||||||
|
skipCron: cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger))),
|
||||||
|
delayCron: cron.New(cron.WithSeconds()),
|
||||||
tasks: map[string]*Task{},
|
tasks: map[string]*Task{},
|
||||||
taskData: map[string]any{},
|
taskData: map[string]any{},
|
||||||
taskList: map[string]*list.List{},
|
taskList: map[string]*list.List{},
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
config.LoadConfig(pluginName, &conf)
|
||||||
tsCode := gojs.MakeTSCode(_pluginObject)
|
tsCode := gojs.MakeTSCode(_pluginObject)
|
||||||
mappedObj := gojs.ToMap(_pluginObject)
|
mappedObj := gojs.ToMap(_pluginObject)
|
||||||
gojs.Register("apigo.cc/gojs/"+pluginName, gojs.Module{
|
gojs.Register("apigo.cc/gojs/"+pluginName, gojs.Module{
|
||||||
@ -42,7 +59,18 @@ func init() {
|
|||||||
OnKill: func() {
|
OnKill: func() {
|
||||||
_pluginObject.Stop()
|
_pluginObject.Stop()
|
||||||
},
|
},
|
||||||
|
WaitForStop: func() {
|
||||||
|
if waitChan != nil {
|
||||||
|
<-waitChan
|
||||||
|
}
|
||||||
|
},
|
||||||
TsCode: tsCode,
|
TsCode: tsCode,
|
||||||
Desc: "task api",
|
Desc: "task api",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (obj *PluginObject) Config(cfg Config) {
|
||||||
|
if cfg.HotLoad != nil {
|
||||||
|
conf.HotLoad = cfg.HotLoad
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -7,33 +7,42 @@ import u from 'apigo.cc/gojs/util'
|
|||||||
co.info('plugin test start')
|
co.info('plugin test start')
|
||||||
try {
|
try {
|
||||||
file.mkdir('testTasks/tmp')
|
file.mkdir('testTasks/tmp')
|
||||||
task.asyncStart()
|
task.start()
|
||||||
|
|
||||||
for (let i = 0; i < 10; i++) {
|
for (let i = 0; i < 10; i++) {
|
||||||
let jsFileA = 'testTasks/tmp/a' + i + '.js'
|
let jsFileA = 'testTasks/tmp/a' + i + '.js'
|
||||||
let jsFileB = 'testTasks/tmp/b' + i + '.js'
|
let jsFileB = 'testTasks/tmp/b' + i + '.js'
|
||||||
file.copy('testTasks/a.js', jsFileA)
|
file.copy('testTasks/a.js', jsFileA)
|
||||||
file.copy('testTasks/b.js', jsFileB)
|
file.copy('testTasks/b.js', jsFileB)
|
||||||
task.newTask('@every 1s', jsFileA)
|
task.addTask('@every 1s', jsFileA)
|
||||||
task.newTask('@every 1s', jsFileB)
|
task.addTask('@every 1s', jsFileB)
|
||||||
}
|
}
|
||||||
|
co.info('任务全部启动完成,等待任务运行')
|
||||||
rt.sleep(3000)
|
rt.sleep(3000)
|
||||||
|
co.info('任务运行结束,正在停止任务进程')
|
||||||
task.stop()
|
task.stop()
|
||||||
|
|
||||||
let aStarts = task.get('aStarts')
|
let aStarts = task.get('aStarts')
|
||||||
if (aStarts !== 10) return 'aStarts(' + aStarts + ') not 10'
|
if (aStarts !== 10) return 'aStarts(' + aStarts + ') not 10'
|
||||||
|
co.info('aStarts 校验通过', aStarts)
|
||||||
|
|
||||||
let aStops = task.get('aStops')
|
let aStops = task.get('aStops')
|
||||||
if (aStops !== 10) return 'aStops(' + aStops + ') not 10'
|
if (aStops !== 10) return 'aStops(' + aStops + ') not 10'
|
||||||
|
co.info('aStops 校验通过', aStops)
|
||||||
|
|
||||||
let bStarts = task.get('bStarts')
|
let bStarts = task.get('bStarts')
|
||||||
if (bStarts !== 10) return 'bStarts(' + bStarts + ') not 10'
|
if (bStarts !== 10) return 'bStarts(' + bStarts + ') not 10'
|
||||||
|
co.info('bStarts 校验通过', bStarts)
|
||||||
|
|
||||||
let bStops = task.get('bStops')
|
let bStops = task.get('bStops')
|
||||||
if (bStops !== 10) return 'bStops(' + bStops + ') not 10'
|
if (bStops !== 10) return 'bStops(' + bStops + ') not 10'
|
||||||
|
co.info('bStops 校验通过', bStops)
|
||||||
|
|
||||||
let aRunTimes = task.get('aRunTimes')
|
let aRunTimes = task.get('aRunTimes')
|
||||||
if (aRunTimes !== 30) return 'aRunTimes(' + aRunTimes + ') not 30'
|
if (aRunTimes !== 30) return 'aRunTimes(' + aRunTimes + ') not 30'
|
||||||
|
co.info('aRunTimes 校验通过', aRunTimes)
|
||||||
|
|
||||||
co.info()
|
co.info('全部测试通过')
|
||||||
|
|
||||||
return true
|
return true
|
||||||
} catch (ex) {
|
} catch (ex) {
|
||||||
co.error(ex)
|
co.error(ex)
|
||||||
|
103
task.go
103
task.go
@ -12,6 +12,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Task struct {
|
type Task struct {
|
||||||
|
spec string
|
||||||
file string
|
file string
|
||||||
vm *gojs.Runtime
|
vm *gojs.Runtime
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
@ -20,19 +21,23 @@ type Task struct {
|
|||||||
policy string
|
policy string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (obj *PluginObject) AsyncStart() {
|
func (obj *PluginObject) Start() {
|
||||||
obj.lock.Lock()
|
obj.lock.Lock()
|
||||||
defer obj.lock.Unlock()
|
defer obj.lock.Unlock()
|
||||||
if !obj.isStarted {
|
if obj.isStarted {
|
||||||
obj.skipCron = cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger)))
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
waitChan = make(chan struct{})
|
||||||
|
logger = log.New(u.ShortUniqueId())
|
||||||
|
logger.Info("task service is starting")
|
||||||
obj.skipCron.Start()
|
obj.skipCron.Start()
|
||||||
obj.delayCron = cron.New(cron.WithSeconds())
|
|
||||||
obj.delayCron.Start()
|
obj.delayCron.Start()
|
||||||
// obj.asyncCron = cron.New(cron.WithSeconds())
|
// obj.asyncCron = cron.New(cron.WithSeconds())
|
||||||
// obj.asyncCron.Start()
|
// obj.asyncCron.Start()
|
||||||
obj.isStarted = true
|
if *conf.HotLoad {
|
||||||
obj.stopChan = make(chan struct{})
|
// 每隔5秒检查一次任务文件是否有更新,如果更新就进行热加载
|
||||||
obj.skipCron.AddFunc("@every 5s", func() {
|
obj.monitorTaskId, _ = obj.skipCron.AddFunc("@every 5s", func() {
|
||||||
tasks := []*Task{}
|
tasks := []*Task{}
|
||||||
obj.lock.RLock()
|
obj.lock.RLock()
|
||||||
for _, task := range obj.tasks {
|
for _, task := range obj.tasks {
|
||||||
@ -62,21 +67,23 @@ func (obj *PluginObject) AsyncStart() {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
obj.isStarted = true
|
||||||
|
logger.Info("task service is started")
|
||||||
func (obj *PluginObject) Start() {
|
|
||||||
obj.AsyncStart()
|
|
||||||
<-obj.stopChan
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (obj *PluginObject) Stop() {
|
func (obj *PluginObject) Stop() {
|
||||||
obj.lock.RLock()
|
obj.lock.Lock()
|
||||||
isStarted := obj.isStarted
|
defer obj.lock.Unlock()
|
||||||
obj.lock.RUnlock()
|
if !obj.isStarted {
|
||||||
if !isStarted {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer close(obj.stopChan)
|
|
||||||
|
defer close(waitChan)
|
||||||
|
logger.Info("task service is stopping")
|
||||||
|
if *conf.HotLoad && obj.monitorTaskId != 0 {
|
||||||
|
obj.skipCron.Remove(obj.monitorTaskId)
|
||||||
|
}
|
||||||
|
|
||||||
ctx1 := obj.skipCron.Stop()
|
ctx1 := obj.skipCron.Stop()
|
||||||
ctx2 := obj.delayCron.Stop()
|
ctx2 := obj.delayCron.Stop()
|
||||||
// ctx3 := obj.asyncCron.Stop()
|
// ctx3 := obj.asyncCron.Stop()
|
||||||
@ -84,35 +91,38 @@ func (obj *PluginObject) Stop() {
|
|||||||
<-ctx2.Done()
|
<-ctx2.Done()
|
||||||
// <-ctx3.Done()
|
// <-ctx3.Done()
|
||||||
|
|
||||||
obj.lock.Lock()
|
|
||||||
defer obj.lock.Unlock()
|
|
||||||
for _, task := range obj.tasks {
|
for _, task := range obj.tasks {
|
||||||
task.RunFunc("onStop")
|
obj.removeTask(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.isStarted = false
|
obj.isStarted = false
|
||||||
obj.tasks = map[string]*Task{}
|
obj.tasks = map[string]*Task{}
|
||||||
|
logger.Info("task service is stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (obj *PluginObject) NewTask(spec string, file string, policy *string) error {
|
func (obj *PluginObject) AddTask(spec string, file string, policy *string) error {
|
||||||
fi := u.GetFileInfo(file)
|
fi := u.GetFileInfo(file)
|
||||||
if fi == nil {
|
if fi == nil {
|
||||||
return u.Error("task file not exists")
|
logger.Error("task file not exists", "spec", spec, "file", file, "policy", policy)
|
||||||
|
return gojs.Err("task file not exists")
|
||||||
}
|
}
|
||||||
|
|
||||||
rt := gojs.New()
|
rt := gojs.New()
|
||||||
if _, err := rt.RunFile(file); err != nil {
|
if _, err := rt.RunFile(file); err != nil {
|
||||||
return u.Error(err.Error())
|
logger.Error("failed to run task file", "spec", spec, "file", file, "policy", policy, "err", err.Error())
|
||||||
|
return gojs.Err(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
task := &Task{
|
task := &Task{
|
||||||
|
spec: spec,
|
||||||
file: file,
|
file: file,
|
||||||
vm: rt,
|
vm: rt,
|
||||||
mtime: fi.ModTime,
|
mtime: fi.ModTime,
|
||||||
policy: u.String(policy),
|
policy: u.String(policy),
|
||||||
}
|
}
|
||||||
if err := task.RunFunc("onStart"); err != nil {
|
if err := task.RunFunc("onStart"); err != nil {
|
||||||
return u.Error(err.Error())
|
logger.Error("failed to run onStart for task", "spec", spec, "file", file, "policy", task.policy, "err", err.Error())
|
||||||
|
return gojs.Err(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
@ -135,9 +145,11 @@ func (obj *PluginObject) NewTask(spec string, file string, policy *string) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return u.Error(err.Error())
|
logger.Error("failed to add task", "spec", spec, "file", file, "policy", task.policy, "err", err.Error())
|
||||||
|
return gojs.Err(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Info("add task", "spec", spec, "file", file, "policy", task.policy)
|
||||||
obj.lock.Lock()
|
obj.lock.Lock()
|
||||||
defer obj.lock.Unlock()
|
defer obj.lock.Unlock()
|
||||||
obj.tasks[spec+file] = task
|
obj.tasks[spec+file] = task
|
||||||
@ -151,7 +163,19 @@ func (obj *PluginObject) RemoveTask(spec string, file string) error {
|
|||||||
if task == nil {
|
if task == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
task.RunFunc("onStop")
|
err := obj.removeTask(task)
|
||||||
|
if err == nil {
|
||||||
|
obj.lock.Lock()
|
||||||
|
defer obj.lock.Unlock()
|
||||||
|
delete(obj.tasks, task.spec+task.file)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (obj *PluginObject) removeTask(task *Task) error {
|
||||||
|
if err := task.RunFunc("onStop"); err != nil {
|
||||||
|
logger.Error("failed to run onStop for task", "spec", task.spec, "file", task.file, "policy", task.policy, "err", err.Error())
|
||||||
|
}
|
||||||
switch task.policy {
|
switch task.policy {
|
||||||
case "skip":
|
case "skip":
|
||||||
obj.skipCron.Remove(task.entryId)
|
obj.skipCron.Remove(task.entryId)
|
||||||
@ -161,16 +185,35 @@ func (obj *PluginObject) RemoveTask(spec string, file string) error {
|
|||||||
obj.delayCron.Remove(task.entryId)
|
obj.delayCron.Remove(task.entryId)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.lock.Lock()
|
logger.Info("remove task", "spec", task.spec, "file", task.file, "policy", task.policy)
|
||||||
defer obj.lock.Unlock()
|
|
||||||
delete(obj.tasks, spec+file)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (obj *PluginObject) RunOnce(file string) error {
|
||||||
|
rt := gojs.New()
|
||||||
|
if _, err := rt.RunFile(file); err == nil {
|
||||||
|
if _, err := rt.RunCode("if(typeof onStart === 'function')onStart()"); err != nil {
|
||||||
|
logger.Error("failed to run onStart for task", "file", file, "err", err.Error())
|
||||||
|
return gojs.Err(err.Error())
|
||||||
|
}
|
||||||
|
if _, err := rt.RunCode("if(typeof onRun === 'function')onRun()"); err != nil {
|
||||||
|
logger.Error("failed to run onRun for task", "file", file, "err", err.Error())
|
||||||
|
return gojs.Err(err.Error())
|
||||||
|
}
|
||||||
|
if _, err := rt.RunCode("if(typeof onStop === 'function')onStop()"); err != nil {
|
||||||
|
logger.Error("failed to run onStop for task", "file", file, "err", err.Error())
|
||||||
|
return gojs.Err(err.Error())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
return gojs.Err(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (task *Task) RunFunc(fnName string) error {
|
func (task *Task) RunFunc(fnName string) error {
|
||||||
task.lock.RLock()
|
task.lock.RLock()
|
||||||
rt := task.vm
|
rt := task.vm
|
||||||
task.lock.RUnlock()
|
task.lock.RUnlock()
|
||||||
_, err := rt.RunCode(fmt.Sprintf("if(%s)%s()", fnName, fnName))
|
_, err := rt.RunCode(fmt.Sprintf("if(typeof %s === 'function')%s()", fnName, fnName))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user