task/scheduler.go

184 lines
2.9 KiB
Go
Raw Normal View History

2026-05-10 21:19:30 +08:00
package task
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/robfig/cron/v3"
)
type Scheduler struct {
cron *cron.Cron
tasks map[string]*Task
mu sync.RWMutex
}
var DefaultScheduler = NewScheduler()
func NewScheduler() *Scheduler {
return &Scheduler{
cron: cron.New(cron.WithSeconds()),
tasks: make(map[string]*Task),
}
}
func (s *Scheduler) Add(name string, spec string, cmd any, opts ...Option) *Task {
s.mu.Lock()
defer s.mu.Unlock()
t := &Task{
Name: name,
Spec: spec,
Status: StatusIdle,
s: s,
}
switch fn := cmd.(type) {
case func():
t.Func = fn
case func() error:
t.FuncErr = fn
default:
panic("invalid job function: must be func() or func() error")
}
for _, opt := range opts {
opt(t)
}
if t.Policy == PolicyQueue {
t.queue = make(chan struct{}, 100)
}
// Remove old task if exists to replace it
if old, exists := s.tasks[name]; exists {
s.cron.Remove(old.entryID)
}
s.tasks[name] = t
id, _ := s.cron.AddFunc(spec, func() {
s.runTask(name)
})
t.entryID = id
return t
}
func (s *Scheduler) Start() {
s.cron.Start()
}
func (s *Scheduler) Stop() {
s.cron.Stop()
}
func (s *Scheduler) Remove(name string) {
s.mu.Lock()
defer s.mu.Unlock()
if t, ok := s.tasks[name]; ok {
s.cron.Remove(t.entryID)
delete(s.tasks, name)
}
}
func (s *Scheduler) List() []*Task {
s.mu.RLock()
defer s.mu.RUnlock()
list := make([]*Task, 0, len(s.tasks))
for _, t := range s.tasks {
list = append(list, t)
}
sort.Slice(list, func(i, j int) bool {
return list[i].Name < list[j].Name
})
return list
}
func (s *Scheduler) Get(name string) *Task {
s.mu.RLock()
defer s.mu.RUnlock()
return s.tasks[name]
}
func (s *Scheduler) runTask(name string) {
s.mu.Lock()
t, ok := s.tasks[name]
if !ok || t.Status == StatusDisabled {
s.mu.Unlock()
return
}
if t.Policy == PolicySkip && t.running {
s.mu.Unlock()
return
}
if t.Policy == PolicyQueue && t.running {
s.mu.Unlock()
t.queue <- struct{}{}
return
}
t.running = true
t.Status = StatusRunning
s.mu.Unlock()
defer func() {
s.mu.Lock()
t.running = false
t.Status = StatusIdle
if t.Policy == PolicyQueue && len(t.queue) > 0 {
<-t.queue
s.mu.Unlock()
go s.runTask(name)
} else {
s.mu.Unlock()
}
}()
start := time.Now()
var err error
if t.Timeout > 0 {
ctx, cancel := context.WithTimeout(context.Background(), t.Timeout)
defer cancel()
done := make(chan error, 1)
go func() {
if t.FuncErr != nil {
done <- t.FuncErr()
} else {
t.Func()
done <- nil
}
}()
select {
case <-ctx.Done():
err = fmt.Errorf("task %s timed out after %v", t.Name, t.Timeout)
case e := <-done:
err = e
}
} else {
if t.FuncErr != nil {
err = t.FuncErr()
} else {
t.Func()
}
}
duration := time.Since(start)
if err != nil {
if t.onError != nil {
t.onError(err)
}
} else {
if t.onSuccess != nil {
t.onSuccess(duration)
}
}
}