task/task.go
2025-07-18 15:27:22 +08:00

177 lines
3.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package plugin
import (
"fmt"
"sync"
"time"
"apigo.cc/gojs"
"github.com/robfig/cron/v3"
"github.com/ssgo/log"
"github.com/ssgo/u"
)
type Task struct {
file string
vm *gojs.Runtime
lock sync.RWMutex
mtime time.Time
entryId cron.EntryID
policy string
}
func (obj *PluginObject) AsyncStart() {
obj.lock.Lock()
defer obj.lock.Unlock()
if !obj.isStarted {
obj.skipCron = cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger)))
obj.skipCron.Start()
obj.delayCron = cron.New(cron.WithSeconds())
obj.delayCron.Start()
// obj.asyncCron = cron.New(cron.WithSeconds())
// obj.asyncCron.Start()
obj.isStarted = true
obj.stopChan = make(chan struct{})
obj.skipCron.AddFunc("@every 5s", func() {
tasks := []*Task{}
obj.lock.RLock()
for _, task := range obj.tasks {
tasks = append(tasks, task)
}
obj.lock.RUnlock()
for _, task := range tasks {
if fi := u.GetFileInfo(task.file); fi != nil {
if !fi.ModTime.Equal(task.mtime) {
// 文件变化,重新加载
rt := gojs.New()
if _, err := rt.RunFile(task.file); err == nil {
task.lock.Lock()
task.vm = rt
task.mtime = fi.ModTime
task.lock.Unlock()
} else {
// 发生错误时仅更新 mtime防止不断重新加载
log.DefaultLogger.Error("failed to reload task code", "err", err.Error(), "file", task.file)
task.lock.Lock()
task.mtime = fi.ModTime
task.lock.Unlock()
}
}
}
}
})
}
}
func (obj *PluginObject) Start() {
obj.AsyncStart()
<-obj.stopChan
}
func (obj *PluginObject) Stop() {
obj.lock.RLock()
isStarted := obj.isStarted
obj.lock.RUnlock()
if !isStarted {
return
}
defer close(obj.stopChan)
ctx1 := obj.skipCron.Stop()
ctx2 := obj.delayCron.Stop()
// ctx3 := obj.asyncCron.Stop()
<-ctx1.Done()
<-ctx2.Done()
// <-ctx3.Done()
obj.lock.Lock()
defer obj.lock.Unlock()
for _, task := range obj.tasks {
task.RunFunc("onStop")
}
obj.isStarted = false
obj.tasks = map[string]*Task{}
}
func (obj *PluginObject) NewTask(spec string, file string, policy *string) error {
fi := u.GetFileInfo(file)
if fi == nil {
return u.Error("task file not exists")
}
rt := gojs.New()
if _, err := rt.RunFile(file); err != nil {
return u.Error(err.Error())
}
task := &Task{
file: file,
vm: rt,
mtime: fi.ModTime,
policy: u.String(policy),
}
if err := task.RunFunc("onStart"); err != nil {
return u.Error(err.Error())
}
var err error
switch task.policy {
case "skip":
task.entryId, err = obj.skipCron.AddFunc(spec, func() {
task.RunFunc("onRun")
})
// case "async":
// task.entryId, err = obj.asyncCron.AddFunc(spec, func() {
// go func() {
// task.RunFunc("onRun")
// }()
// })
default:
task.policy = "delay"
task.entryId, err = obj.delayCron.AddFunc(spec, func() {
task.RunFunc("onRun")
})
}
if err != nil {
return u.Error(err.Error())
}
obj.lock.Lock()
defer obj.lock.Unlock()
obj.tasks[spec+file] = task
return nil
}
func (obj *PluginObject) RemoveTask(spec string, file string) error {
obj.lock.RLock()
task := obj.tasks[spec+file]
obj.lock.RUnlock()
if task == nil {
return nil
}
task.RunFunc("onStop")
switch task.policy {
case "skip":
obj.skipCron.Remove(task.entryId)
// case "async":
// obj.asyncCron.Remove(task.entryId)
default:
obj.delayCron.Remove(task.entryId)
}
obj.lock.Lock()
defer obj.lock.Unlock()
delete(obj.tasks, spec+file)
return nil
}
func (task *Task) RunFunc(fnName string) error {
task.lock.RLock()
rt := task.vm
task.lock.RUnlock()
_, err := rt.RunCode(fmt.Sprintf("if(%s)%s()", fnName, fnName))
return err
}