package plugin import ( "fmt" "sync" "time" "apigo.cc/gojs" "github.com/robfig/cron/v3" "github.com/ssgo/log" "github.com/ssgo/u" ) type Task struct { file string vm *gojs.Runtime lock sync.RWMutex mtime time.Time entryId cron.EntryID policy string } func (obj *PluginObject) AsyncStart() { obj.lock.Lock() defer obj.lock.Unlock() if !obj.isStarted { obj.skipCron = cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger))) obj.skipCron.Start() obj.delayCron = cron.New(cron.WithSeconds()) obj.delayCron.Start() // obj.asyncCron = cron.New(cron.WithSeconds()) // obj.asyncCron.Start() obj.isStarted = true obj.stopChan = make(chan struct{}) obj.skipCron.AddFunc("@every 5s", func() { tasks := []*Task{} obj.lock.RLock() for _, task := range obj.tasks { tasks = append(tasks, task) } obj.lock.RUnlock() for _, task := range tasks { if fi := u.GetFileInfo(task.file); fi != nil { if !fi.ModTime.Equal(task.mtime) { // 文件变化,重新加载 rt := gojs.New() if _, err := rt.RunFile(task.file); err == nil { task.lock.Lock() task.vm = rt task.mtime = fi.ModTime task.lock.Unlock() } else { // 发生错误时仅更新 mtime,防止不断重新加载 log.DefaultLogger.Error("failed to reload task code", "err", err.Error(), "file", task.file) task.lock.Lock() task.mtime = fi.ModTime task.lock.Unlock() } } } } }) } } func (obj *PluginObject) Start() { obj.AsyncStart() <-obj.stopChan } func (obj *PluginObject) Stop() { obj.lock.RLock() isStarted := obj.isStarted obj.lock.RUnlock() if !isStarted { return } defer close(obj.stopChan) ctx1 := obj.skipCron.Stop() ctx2 := obj.delayCron.Stop() // ctx3 := obj.asyncCron.Stop() <-ctx1.Done() <-ctx2.Done() // <-ctx3.Done() obj.lock.Lock() defer obj.lock.Unlock() for _, task := range obj.tasks { task.RunFunc("onStop") } obj.isStarted = false obj.tasks = map[string]*Task{} } func (obj *PluginObject) NewTask(spec string, file string, policy *string) error { fi := u.GetFileInfo(file) if fi == nil { return u.Error("task file not exists") } rt := gojs.New() if _, err := rt.RunFile(file); err != nil { return u.Error(err.Error()) } task := &Task{ file: file, vm: rt, mtime: fi.ModTime, policy: u.String(policy), } if err := task.RunFunc("onStart"); err != nil { return u.Error(err.Error()) } var err error switch task.policy { case "skip": task.entryId, err = obj.skipCron.AddFunc(spec, func() { task.RunFunc("onRun") }) // case "async": // task.entryId, err = obj.asyncCron.AddFunc(spec, func() { // go func() { // task.RunFunc("onRun") // }() // }) default: task.policy = "delay" task.entryId, err = obj.delayCron.AddFunc(spec, func() { task.RunFunc("onRun") }) } if err != nil { return u.Error(err.Error()) } obj.lock.Lock() defer obj.lock.Unlock() obj.tasks[spec+file] = task return nil } func (obj *PluginObject) RemoveTask(spec string, file string) error { obj.lock.RLock() task := obj.tasks[spec+file] obj.lock.RUnlock() if task == nil { return nil } task.RunFunc("onStop") switch task.policy { case "skip": obj.skipCron.Remove(task.entryId) // case "async": // obj.asyncCron.Remove(task.entryId) default: obj.delayCron.Remove(task.entryId) } obj.lock.Lock() defer obj.lock.Unlock() delete(obj.tasks, spec+file) return nil } func (task *Task) RunFunc(fnName string) error { task.lock.RLock() rt := task.vm task.lock.RUnlock() _, err := rt.RunCode(fmt.Sprintf("if(%s)%s()", fnName, fnName)) return err }