task/task.go
2025-07-21 13:20:25 +08:00

220 lines
5.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package plugin
import (
"fmt"
"sync"
"time"
"apigo.cc/gojs"
"github.com/robfig/cron/v3"
"github.com/ssgo/log"
"github.com/ssgo/u"
)
type Task struct {
spec string
file string
vm *gojs.Runtime
lock sync.RWMutex
mtime time.Time
entryId cron.EntryID
policy string
}
func (obj *PluginObject) Start() {
obj.lock.Lock()
defer obj.lock.Unlock()
if obj.isStarted {
return
}
waitChan = make(chan struct{})
logger = log.New(u.ShortUniqueId())
logger.Info("task service is starting")
obj.skipCron.Start()
obj.delayCron.Start()
// obj.asyncCron = cron.New(cron.WithSeconds())
// obj.asyncCron.Start()
if *conf.HotLoad {
// 每隔5秒检查一次任务文件是否有更新如果更新就进行热加载
obj.monitorTaskId, _ = obj.skipCron.AddFunc("@every 5s", func() {
tasks := []*Task{}
obj.lock.RLock()
for _, task := range obj.tasks {
tasks = append(tasks, task)
}
obj.lock.RUnlock()
for _, task := range tasks {
if fi := u.GetFileInfo(task.file); fi != nil {
if !fi.ModTime.Equal(task.mtime) {
// 文件变化,重新加载
rt := gojs.New()
if _, err := rt.RunFile(task.file); err == nil {
task.lock.Lock()
task.vm = rt
task.mtime = fi.ModTime
task.lock.Unlock()
} else {
// 发生错误时仅更新 mtime防止不断重新加载
log.DefaultLogger.Error("failed to reload task code", "err", err.Error(), "file", task.file)
task.lock.Lock()
task.mtime = fi.ModTime
task.lock.Unlock()
}
}
}
}
})
}
obj.isStarted = true
logger.Info("task service is started")
}
func (obj *PluginObject) Stop() {
obj.lock.Lock()
defer obj.lock.Unlock()
if !obj.isStarted {
return
}
defer close(waitChan)
logger.Info("task service is stopping")
if *conf.HotLoad && obj.monitorTaskId != 0 {
obj.skipCron.Remove(obj.monitorTaskId)
}
ctx1 := obj.skipCron.Stop()
ctx2 := obj.delayCron.Stop()
// ctx3 := obj.asyncCron.Stop()
<-ctx1.Done()
<-ctx2.Done()
// <-ctx3.Done()
for _, task := range obj.tasks {
obj.removeTask(task)
}
obj.isStarted = false
obj.tasks = map[string]*Task{}
logger.Info("task service is stopped")
}
func (obj *PluginObject) AddTask(spec string, file string, policy *string) error {
fi := u.GetFileInfo(file)
if fi == nil {
logger.Error("task file not exists", "spec", spec, "file", file, "policy", policy)
return gojs.Err("task file not exists")
}
rt := gojs.New()
if _, err := rt.RunFile(file); err != nil {
logger.Error("failed to run task file", "spec", spec, "file", file, "policy", policy, "err", err.Error())
return gojs.Err(err.Error())
}
task := &Task{
spec: spec,
file: file,
vm: rt,
mtime: fi.ModTime,
policy: u.String(policy),
}
if err := task.RunFunc("onStart"); err != nil {
logger.Error("failed to run onStart for task", "spec", spec, "file", file, "policy", task.policy, "err", err.Error())
return gojs.Err(err.Error())
}
var err error
switch task.policy {
case "skip":
task.entryId, err = obj.skipCron.AddFunc(spec, func() {
task.RunFunc("onRun")
})
// case "async":
// task.entryId, err = obj.asyncCron.AddFunc(spec, func() {
// go func() {
// task.RunFunc("onRun")
// }()
// })
default:
task.policy = "delay"
task.entryId, err = obj.delayCron.AddFunc(spec, func() {
task.RunFunc("onRun")
})
}
if err != nil {
logger.Error("failed to add task", "spec", spec, "file", file, "policy", task.policy, "err", err.Error())
return gojs.Err(err.Error())
}
logger.Info("add task", "spec", spec, "file", file, "policy", task.policy)
obj.lock.Lock()
defer obj.lock.Unlock()
obj.tasks[spec+file] = task
return nil
}
func (obj *PluginObject) RemoveTask(spec string, file string) error {
obj.lock.RLock()
task := obj.tasks[spec+file]
obj.lock.RUnlock()
if task == nil {
return nil
}
err := obj.removeTask(task)
if err == nil {
obj.lock.Lock()
defer obj.lock.Unlock()
delete(obj.tasks, task.spec+task.file)
}
return err
}
func (obj *PluginObject) removeTask(task *Task) error {
if err := task.RunFunc("onStop"); err != nil {
logger.Error("failed to run onStop for task", "spec", task.spec, "file", task.file, "policy", task.policy, "err", err.Error())
}
switch task.policy {
case "skip":
obj.skipCron.Remove(task.entryId)
// case "async":
// obj.asyncCron.Remove(task.entryId)
default:
obj.delayCron.Remove(task.entryId)
}
logger.Info("remove task", "spec", task.spec, "file", task.file, "policy", task.policy)
return nil
}
func (obj *PluginObject) RunOnce(file string) error {
rt := gojs.New()
if _, err := rt.RunFile(file); err == nil {
if _, err := rt.RunCode("if(typeof onStart === 'function')onStart()"); err != nil {
logger.Error("failed to run onStart for task", "file", file, "err", err.Error())
return gojs.Err(err.Error())
}
if _, err := rt.RunCode("if(typeof onRun === 'function')onRun()"); err != nil {
logger.Error("failed to run onRun for task", "file", file, "err", err.Error())
return gojs.Err(err.Error())
}
if _, err := rt.RunCode("if(typeof onStop === 'function')onStop()"); err != nil {
logger.Error("failed to run onStop for task", "file", file, "err", err.Error())
return gojs.Err(err.Error())
}
return nil
} else {
return gojs.Err(err.Error())
}
}
func (task *Task) RunFunc(fnName string) error {
task.lock.RLock()
rt := task.vm
task.lock.RUnlock()
_, err := rt.RunCode(fmt.Sprintf("if(typeof %s === 'function')%s()", fnName, fnName))
return err
}