2026-05-10 21:19:30 +08:00
|
|
|
package task
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2026-05-14 00:40:49 +08:00
|
|
|
"errors"
|
|
|
|
|
"runtime/debug"
|
2026-05-10 21:19:30 +08:00
|
|
|
"sort"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
"apigo.cc/go/log"
|
2026-05-10 21:19:30 +08:00
|
|
|
"github.com/robfig/cron/v3"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type Scheduler struct {
|
2026-05-14 00:40:49 +08:00
|
|
|
cron *cron.Cron
|
|
|
|
|
tasks map[string]*Task
|
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
running bool
|
2026-05-10 21:19:30 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var DefaultScheduler = NewScheduler()
|
|
|
|
|
|
|
|
|
|
func NewScheduler() *Scheduler {
|
|
|
|
|
return &Scheduler{
|
|
|
|
|
cron: cron.New(cron.WithSeconds()),
|
|
|
|
|
tasks: make(map[string]*Task),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
func (s *Scheduler) Add(name string, spec string, fn func(context.Context) error, cfg Config) *Task {
|
2026-05-10 21:19:30 +08:00
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
// Remove old task if exists
|
|
|
|
|
if old, exists := s.tasks[name]; exists {
|
|
|
|
|
s.cron.Remove(old.entryID)
|
|
|
|
|
if old.cancel != nil {
|
|
|
|
|
old.cancel()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-10 21:19:30 +08:00
|
|
|
t := &Task{
|
|
|
|
|
Name: name,
|
|
|
|
|
Spec: spec,
|
2026-05-14 00:40:49 +08:00
|
|
|
Config: cfg,
|
2026-05-10 21:19:30 +08:00
|
|
|
Status: StatusIdle,
|
2026-05-14 00:40:49 +08:00
|
|
|
Func: fn,
|
2026-05-10 21:19:30 +08:00
|
|
|
s: s,
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
id, err := s.cron.AddFunc(spec, func() {
|
|
|
|
|
s.runTask(name)
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.DefaultLogger.Error("failed to add task", "name", name, "spec", spec, "err", err)
|
|
|
|
|
return nil
|
2026-05-10 21:19:30 +08:00
|
|
|
}
|
|
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
t.entryID = id
|
|
|
|
|
s.tasks[name] = t
|
|
|
|
|
return t
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Scheduler) Start(ctx context.Context, logger *log.Logger) error {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
if s.running {
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
return nil
|
2026-05-10 21:19:30 +08:00
|
|
|
}
|
2026-05-14 00:40:49 +08:00
|
|
|
s.running = true
|
|
|
|
|
s.mu.Unlock()
|
2026-05-10 21:19:30 +08:00
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
s.cron.Start()
|
|
|
|
|
if logger != nil {
|
|
|
|
|
logger.Info("scheduler started")
|
|
|
|
|
} else {
|
|
|
|
|
log.DefaultLogger.Info("scheduler started")
|
2026-05-10 21:19:30 +08:00
|
|
|
}
|
2026-05-14 00:40:49 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
2026-05-10 21:19:30 +08:00
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
func (s *Scheduler) Stop(ctx context.Context) error {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
if !s.running {
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
return nil
|
2026-05-10 21:19:30 +08:00
|
|
|
}
|
2026-05-14 00:40:49 +08:00
|
|
|
s.running = false
|
|
|
|
|
s.mu.Unlock()
|
2026-05-10 21:19:30 +08:00
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
s.cron.Stop()
|
2026-05-10 21:19:30 +08:00
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
s.mu.Lock()
|
|
|
|
|
for _, t := range s.tasks {
|
|
|
|
|
if t.cancel != nil {
|
|
|
|
|
t.cancel()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
// Wait for running tasks with context
|
|
|
|
|
done := make(chan struct{})
|
|
|
|
|
go func() {
|
|
|
|
|
s.wg.Wait()
|
|
|
|
|
close(done)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-done:
|
|
|
|
|
log.DefaultLogger.Info("scheduler stopped gracefully")
|
|
|
|
|
return nil
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
log.DefaultLogger.Warning("scheduler stop timed out or cancelled", "err", ctx.Err())
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
}
|
2026-05-10 21:19:30 +08:00
|
|
|
}
|
|
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
func (s *Scheduler) Health() error {
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
|
if !s.running {
|
|
|
|
|
return errors.New("scheduler is not running")
|
|
|
|
|
}
|
|
|
|
|
return nil
|
2026-05-10 21:19:30 +08:00
|
|
|
}
|
|
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
func (s *Scheduler) Run(name string) {
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
_, ok := s.tasks[name]
|
|
|
|
|
s.mu.RUnlock()
|
|
|
|
|
if ok {
|
|
|
|
|
go s.runTask(name)
|
|
|
|
|
}
|
2026-05-10 21:19:30 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Scheduler) Remove(name string) {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
if t, ok := s.tasks[name]; ok {
|
|
|
|
|
s.cron.Remove(t.entryID)
|
2026-05-14 00:40:49 +08:00
|
|
|
if t.cancel != nil {
|
|
|
|
|
t.cancel()
|
|
|
|
|
}
|
2026-05-10 21:19:30 +08:00
|
|
|
delete(s.tasks, name)
|
2026-05-14 00:40:49 +08:00
|
|
|
log.DefaultLogger.Info("task removed", "name", name)
|
2026-05-10 21:19:30 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Scheduler) List() []*Task {
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
|
list := make([]*Task, 0, len(s.tasks))
|
|
|
|
|
for _, t := range s.tasks {
|
|
|
|
|
list = append(list, t)
|
|
|
|
|
}
|
|
|
|
|
sort.Slice(list, func(i, j int) bool {
|
|
|
|
|
return list[i].Name < list[j].Name
|
|
|
|
|
})
|
|
|
|
|
return list
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Scheduler) Get(name string) *Task {
|
|
|
|
|
s.mu.RLock()
|
|
|
|
|
defer s.mu.RUnlock()
|
|
|
|
|
return s.tasks[name]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Scheduler) runTask(name string) {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
t, ok := s.tasks[name]
|
|
|
|
|
if !ok || t.Status == StatusDisabled {
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
if t.Config.Policy == PolicySkip && t.running {
|
|
|
|
|
log.DefaultLogger.Debug("task skipped", "name", name)
|
2026-05-10 21:19:30 +08:00
|
|
|
s.mu.Unlock()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t.running = true
|
|
|
|
|
t.Status = StatusRunning
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
s.wg.Add(1)
|
|
|
|
|
defer s.wg.Done()
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
if t.Config.Timeout > 0 {
|
|
|
|
|
ctx, cancel = context.WithTimeout(ctx, t.Config.Timeout)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
t.cancel = cancel
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
2026-05-10 21:19:30 +08:00
|
|
|
defer func() {
|
2026-05-14 00:40:49 +08:00
|
|
|
cancel()
|
2026-05-10 21:19:30 +08:00
|
|
|
s.mu.Lock()
|
|
|
|
|
t.running = false
|
|
|
|
|
t.Status = StatusIdle
|
2026-05-14 00:40:49 +08:00
|
|
|
t.cancel = nil
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
}()
|
2026-05-10 21:19:30 +08:00
|
|
|
|
2026-05-14 00:40:49 +08:00
|
|
|
// Recover
|
|
|
|
|
defer func() {
|
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
|
log.DefaultLogger.Error("task panic recovered", "name", name, "err", r, "stack", string(debug.Stack()))
|
2026-05-10 21:19:30 +08:00
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
start := time.Now()
|
2026-05-14 00:40:49 +08:00
|
|
|
err := t.Func(ctx)
|
2026-05-10 21:19:30 +08:00
|
|
|
duration := time.Since(start)
|
2026-05-14 00:40:49 +08:00
|
|
|
|
2026-05-10 21:19:30 +08:00
|
|
|
if err != nil {
|
2026-05-14 00:40:49 +08:00
|
|
|
log.DefaultLogger.Error("task failed", "name", name, "err", err, "duration", duration.String())
|
2026-05-10 21:19:30 +08:00
|
|
|
} else {
|
2026-05-14 00:40:49 +08:00
|
|
|
log.DefaultLogger.Info("task success", "name", name, "duration", duration.String())
|
2026-05-10 21:19:30 +08:00
|
|
|
}
|
|
|
|
|
}
|