task/task.go

177 lines
3.6 KiB
Go
Raw Permalink Normal View History

2025-07-18 15:27:22 +08:00
package plugin
import (
"fmt"
"sync"
"time"
"apigo.cc/gojs"
"github.com/robfig/cron/v3"
"github.com/ssgo/log"
"github.com/ssgo/u"
)
type Task struct {
file string
vm *gojs.Runtime
lock sync.RWMutex
mtime time.Time
entryId cron.EntryID
policy string
}
func (obj *PluginObject) AsyncStart() {
obj.lock.Lock()
defer obj.lock.Unlock()
if !obj.isStarted {
obj.skipCron = cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger)))
obj.skipCron.Start()
obj.delayCron = cron.New(cron.WithSeconds())
obj.delayCron.Start()
// obj.asyncCron = cron.New(cron.WithSeconds())
// obj.asyncCron.Start()
obj.isStarted = true
obj.stopChan = make(chan struct{})
obj.skipCron.AddFunc("@every 5s", func() {
tasks := []*Task{}
obj.lock.RLock()
for _, task := range obj.tasks {
tasks = append(tasks, task)
}
obj.lock.RUnlock()
for _, task := range tasks {
if fi := u.GetFileInfo(task.file); fi != nil {
if !fi.ModTime.Equal(task.mtime) {
// 文件变化,重新加载
rt := gojs.New()
if _, err := rt.RunFile(task.file); err == nil {
task.lock.Lock()
task.vm = rt
task.mtime = fi.ModTime
task.lock.Unlock()
} else {
// 发生错误时仅更新 mtime防止不断重新加载
log.DefaultLogger.Error("failed to reload task code", "err", err.Error(), "file", task.file)
task.lock.Lock()
task.mtime = fi.ModTime
task.lock.Unlock()
}
}
}
}
})
}
}
func (obj *PluginObject) Start() {
obj.AsyncStart()
<-obj.stopChan
}
func (obj *PluginObject) Stop() {
obj.lock.RLock()
isStarted := obj.isStarted
obj.lock.RUnlock()
if !isStarted {
return
}
defer close(obj.stopChan)
ctx1 := obj.skipCron.Stop()
ctx2 := obj.delayCron.Stop()
// ctx3 := obj.asyncCron.Stop()
<-ctx1.Done()
<-ctx2.Done()
// <-ctx3.Done()
obj.lock.Lock()
defer obj.lock.Unlock()
for _, task := range obj.tasks {
task.RunFunc("onStop")
}
obj.isStarted = false
obj.tasks = map[string]*Task{}
}
func (obj *PluginObject) NewTask(spec string, file string, policy *string) error {
fi := u.GetFileInfo(file)
if fi == nil {
return u.Error("task file not exists")
}
rt := gojs.New()
if _, err := rt.RunFile(file); err != nil {
return u.Error(err.Error())
}
task := &Task{
file: file,
vm: rt,
mtime: fi.ModTime,
policy: u.String(policy),
}
if err := task.RunFunc("onStart"); err != nil {
return u.Error(err.Error())
}
var err error
switch task.policy {
case "skip":
task.entryId, err = obj.skipCron.AddFunc(spec, func() {
task.RunFunc("onRun")
})
// case "async":
// task.entryId, err = obj.asyncCron.AddFunc(spec, func() {
// go func() {
// task.RunFunc("onRun")
// }()
// })
default:
task.policy = "delay"
task.entryId, err = obj.delayCron.AddFunc(spec, func() {
task.RunFunc("onRun")
})
}
if err != nil {
return u.Error(err.Error())
}
obj.lock.Lock()
defer obj.lock.Unlock()
obj.tasks[spec+file] = task
return nil
}
func (obj *PluginObject) RemoveTask(spec string, file string) error {
obj.lock.RLock()
task := obj.tasks[spec+file]
obj.lock.RUnlock()
if task == nil {
return nil
}
task.RunFunc("onStop")
switch task.policy {
case "skip":
obj.skipCron.Remove(task.entryId)
// case "async":
// obj.asyncCron.Remove(task.entryId)
default:
obj.delayCron.Remove(task.entryId)
}
obj.lock.Lock()
defer obj.lock.Unlock()
delete(obj.tasks, spec+file)
return nil
}
func (task *Task) RunFunc(fnName string) error {
task.lock.RLock()
rt := task.vm
task.lock.RUnlock()
_, err := rt.RunCode(fmt.Sprintf("if(%s)%s()", fnName, fnName))
return err
}