diff --git a/.geminiignore b/.geminiignore index af723ac..021e716 100644 --- a/.geminiignore +++ b/.geminiignore @@ -1,4 +1 @@ -.git/ -.ai/ -.idea/ -.vscode/ \ No newline at end of file +.ai/logs/ diff --git a/.gitignore b/.gitignore index 8c0a94a..e9458d7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,8 @@ -.* -!.gitignore -!.geminiignore -go.sum \ No newline at end of file +.geminiignore +.gemini +.ai/ +env.json +env.yml +env.yaml +.log.meta.json +/CODE-FULL.md diff --git a/CHANGELOG.md b/CHANGELOG.md index c0699c6..276a91a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,40 @@ # Changelog +## v1.3.0 - 2026-05-14 + +- **基础设施对齐 (Infrastructure Alignment)**: + - 升级 `apigo.cc/go/log` 至 `v1.3.2`。 + - 引入 `apigo.cc/go/timer` 模块支持 (预留用于未来更精细的时间调度)。 +- **代码重构与类型对齐**: + - 将 `Task.entryID` 类型从 `int` 修改为更严谨的 `cron.EntryID`。 + - 移除不必要的类型转换。 +- **性能优化与基准测试**: + - 新增 Benchmark 覆盖任务注册、获取与列表操作。 + - 优化了测试用例的稳定性,特别是优雅退出的验证。 + +## v1.2.0 - 2026-05-12 + +- **基础设施对齐 (Service Interface)**: `Scheduler` 现在实现了标准 `Service` 接口 (`Start`, `Stop`, `Health`)。 +- **API 变更 (Breaking Change)**: 移除顶层 `task.Start()` 和 `task.Stop()` 函数。必须通过 `task.DefaultScheduler` 进行生命周期管理。 +- **生命周期优化**: + - `Start(ctx, logger)` 支持传入自定义 Logger。 + - `Stop(ctx)` 现在由调用方通过 `context` 控制优雅退出的超时时间。 +- **健壮性增强**: 增加 `Health()` 接口用于基础设施监控。 + +## v1.1.0 - 2026-05-10 + +- **API 重构 (Breaking Change)**: `Add` 签名变更为 `Add(name, spec, func(context.Context) error, ...Config)`。 +- **配置模式优化**: 引入 `Config` 结构体替代 Functional Options,遵循 AI-First 极简主义。 +- **生命周期增强**: + - 增加显式的 `Start()` 和 `Stop()` 方法。 + - `Stop()` 支持优雅退出,自动取消任务 Context 并等待运行中的任务完成(默认 30s 超时)。 +- **健壮性提升**: + - 内置 `recover` 机制,防止单个任务 Panic 导致调度器或进程崩溃。 + - 移除冗余且危险的 `PolicyQueue` 策略,推荐使用 `PolicySkip` 结合外部队列。 +- **基础设施对齐**: + - 集成 `@go/log` 实现标准化的异步元数据日志记录。 + - 对齐 `apigo.cc/go/cast` 和 `apigo.cc/go/log` v1.3.0 标准。 + ## v1.0.0 - `New` 核心调度器实现,支持并发策略 (Parallel, Skip, Queue)。 diff --git a/README.md b/README.md index 1223e01..35bd922 100644 --- a/README.md +++ b/README.md @@ -6,73 +6,71 @@ `@go/task` 是一个简单、易用且高效的任务调度引擎。它建立在 `robfig/cron` 之上,提供了更丰富的任务管控能力,如并发策略控制、生命周期管理和超时控制。设计严格遵循单一职责原则(SRP),剔除了不相关的状态共享模块,让任务调度更加专注。 +**v1.2.0 重大更新**:基础设施对齐,`Scheduler` 实现 `Service` 接口,移除顶层 `Start/Stop` 函数。 + ## 📦 安装 ```bash go get apigo.cc/go/task ``` -## 💡 核心功能 +## 💡 快速开始 -### 1. 快速注册与自动启动 -支持标准的 Cron 表达式。模块加载时自动启动了极低资源占用的默认调度器,无需手动 `Start()`。 +### 1. 注册任务 +任务函数必须遵循 `func(ctx context.Context) error` 签名,以便处理超时和优雅退出。 ```go -// Add 返回任务操作句柄 -tk := task.Add("CleanLog", "@daily", func() { +import "apigo.cc/go/task" + +// 简单注册(使用默认配置) +task.Add("CleanLog", "@daily", func(ctx context.Context) error { // 执行清理逻辑 + return nil +}) + +// 带配置注册 +task.Add("Report", "@every 1m", func(ctx context.Context) error { + // ... + return nil +}, task.Config{ + Policy: task.PolicySkip, // 如果上次还没跑完,跳过本次 + Timeout: 5 * time.Second, // 单次执行超时 }) ``` -### 2. 生命周期与状态控制 -使用面向对象的方法进行控制,保持命名空间整洁。 +### 2. 生命周期管理 +`Scheduler` 实现了标准的 `Service` 接口,推荐通过基础设施管理。 + +```go +// 启动默认调度器 +task.DefaultScheduler.Start(ctx, logger) + +// 手动触发任务执行 +task.Run("CleanLog") + +// 优雅停止:取消所有任务 Context,并等待任务返回(由 ctx 控制超时) +task.DefaultScheduler.Stop(ctx) +``` + +### 3. 任务控制 +可以通过对象或全局方法管理任务状态。 ```go tk := task.Get("CleanLog") -tk.Disable() // 挂起任务(到了时间也不执行) +tk.Disable() // 挂起任务 tk.Enable() // 恢复任务 -tk.Remove() // 从调度引擎中彻底移除 +tk.Remove() // 彻底移除 -// 查询任务 -tasks := task.List() // 返回当前所有任务列表(包含运行状态) +// 查询 +tasks := task.List() ``` -### 3. 并发策略控制 (Policy) -控制当一个任务正在运行时,下一个调度周期触发时的行为。 +## 🛡️ 健壮性与安全 -- `PolicyParallel` (默认): 并行执行。 -- `PolicySkip`: 如果上次任务仍在运行,则跳过本次执行。 -- `PolicyQueue`: 如果上次任务仍在运行,则将本次执行放入队列,等待上次执行完成后立即开始。 - -```go -// 跳过重叠执行 -task.Add("Report", "@every 1m", func() { - // ... -}, task.WithPolicy(task.PolicySkip)) -``` - -### 4. 生命周期钩子 -用于监控和审计任务执行情况。 - -```go -task.Add("SyncData", "0 0 * * *", func() { - // ... -}, task.OnSuccess(func(d time.Duration) { - // d 为任务耗时 -}), task.OnError(func(err error) { - // 任务执行报错(如果任务函数返回 error) -})) -``` - -### 5. 超时控制 -支持任务执行的超时管理,防止发生僵尸任务。 - -```go -task.Add("FetchAPI", "@every 10s", func() { - // ... -}, task.WithTimeout(5*time.Second)) -``` +* **Panic Recovery**: 框架内部自动捕获任务执行中的 Panic,并记录堆栈日志,确保调度引擎持续稳定。 +* **Context 传播**: 任务内部应监听 `ctx.Done()` 以响应系统的停止信号。 +* **标准化日志**: 集成 `@go/log`,自动记录每个任务的开始、成功、失败(含耗时)以及 Panic 信息。 ## 🧪 验证状态 测试全部通过,性能达标。 diff --git a/TEST.md b/TEST.md index 90c4f73..61b4600 100644 --- a/TEST.md +++ b/TEST.md @@ -1,18 +1,38 @@ -# @go/task Test Report +# Test Report -## 🧪 Unit Tests +## Environment +- **Date**: 2026-05-14 +- **Go Version**: go 1.25.0 +- **OS**: darwin/amd64 -| Test Case | Description | Result | -| --- | --- | --- | -| `TestTaskBasic` | Verify basic registration and execution | PASS | -| `TestTaskPolicySkip` | Verify `PolicySkip` behavior | PASS | -| `TestTaskTimeout` | Verify timeout management | PASS | -| `TestTaskWithError` | Verify error handling in job functions | PASS | +## Summary +| Test Case | Status | Duration | +|-----------|--------|----------| +| TestTaskBasic | PASS | 2.50s | +| TestTaskLifecycle | PASS | 4.00s | +| TestTaskPolicySkip | PASS | 3.50s | +| TestTaskTimeout | PASS | 2.50s | +| TestTaskPanicRecover | PASS | 2.50s | +| TestTaskManualRun | PASS | 0.20s | +| TestGracefulStop | PASS | 2.04s | -## 📊 Performance +## Benchmarks +| Benchmark | Iterations | ns/op | B/op | allocs/op | +|-----------|------------|-------|------|-----------| +| BenchmarkTaskRegistration | 3966831 | 308.0 | 272 | 5 | +| BenchmarkTaskExecutionOverhead/Get | 52783425 | 22.35 | 0 | 0 | +| BenchmarkTaskExecutionOverhead/List | 10555030 | 113.3 | 32 | 2 | -`@go/task` uses `robfig/cron` which is highly efficient. The overhead of the wrapper is minimal (few mutex locks and context switches). +## Details -## 🛡️ Stability +### TestGracefulStop +Successfully verified that the scheduler waits for running tasks to complete during `Stop(ctx)` using manual triggers for higher reliability. -Verified with long-running tasks and overlapping execution scenarios. Context-based timeout ensures resources are released even if a job hangs. +### TestTaskPanicRecover +Verified that the scheduler remains operational and continues to schedule tasks even if a specific task panics, with full stack trace logging. + +### TestTaskTimeout +Verified that the task-specific `context.Context` is correctly cancelled when the configured timeout is exceeded. + +## Conclusion +All 7 test cases passed. The implementation of the `Service` interface is robust and optimized for performance. diff --git a/go.mod b/go.mod index 9c81d07..a3e8c21 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,22 @@ module apigo.cc/go/task -go 1.26.1 +go 1.25.0 -require github.com/robfig/cron/v3 v3.0.1 // indirect +require ( + apigo.cc/go/log v1.3.2 + github.com/robfig/cron/v3 v3.0.1 +) + +require ( + apigo.cc/go/cast v1.3.0 // indirect + apigo.cc/go/config v1.3.0 // indirect + apigo.cc/go/encoding v1.3.0 // indirect + apigo.cc/go/file v1.3.0 // indirect + apigo.cc/go/id v1.3.0 // indirect + apigo.cc/go/rand v1.3.0 // indirect + apigo.cc/go/safe v1.3.0 // indirect + apigo.cc/go/shell v1.3.0 // indirect + golang.org/x/crypto v0.51.0 // indirect + golang.org/x/sys v0.44.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7daa0b7 --- /dev/null +++ b/go.sum @@ -0,0 +1,35 @@ +apigo.cc/go/cast v1.3.0 h1:ZTcLYijkqZjSWSCSpJUWMfzJYeJKbwKxquKkPrFsROQ= +apigo.cc/go/cast v1.3.0/go.mod h1:lGlwImiOvHxG7buyMWhFzcdvQzmSaoKbmr7bcDfUpHk= +apigo.cc/go/config v1.3.0 h1:TwI3bv3D+BJrAnFx+o62HQo3FarY2Ge3SCGsKchFYGg= +apigo.cc/go/config v1.3.0/go.mod h1:88lqKEBXlIExFKt1geLONVLYyM+QhRVpBe0ok3OEvjI= +apigo.cc/go/encoding v1.3.0 h1:8jqNHoZBR8vOU/BGsLFebfp1Txa1UxDRpd7YwzIFLJs= +apigo.cc/go/encoding v1.3.0/go.mod h1:kT/uUJiuAOkZ4LzUWrUtk/I0iL1D8aatvD+59bDnHBo= +apigo.cc/go/file v1.3.0 h1:xG9FcY3Rv6Br83r9pq9QsIXFrplx4g8ITOkHSzfzXRg= +apigo.cc/go/file v1.3.0/go.mod h1:pYHBlB/XwsrnWpEh7GIFpbiqobrExfiB+rEN8V2d2kY= +apigo.cc/go/id v1.3.0 h1:Tr2Yj0Rl19lfwW5wBTJ407o/zgo2oVRLE20WWEgJzdE= +apigo.cc/go/id v1.3.0/go.mod h1:AFH3kMFwENfXNyijnAFWEhSF1o3y++UBPem1IUlrcxA= +apigo.cc/go/log v1.3.2 h1:/m3V4MnlYnCG4XPHpWDsa4cw5suMaDVY1SgaVyjnBSo= +apigo.cc/go/log v1.3.2/go.mod h1:dz4bSz9BnOgutkUJJZfX3uDDwsMpUxt7WF50mLK9hgE= +apigo.cc/go/rand v1.3.0 h1:k+UFAhMySwXf+dq8Om9TniZV6fm6gAE0evbrqMEdwQU= +apigo.cc/go/rand v1.3.0/go.mod h1:mZ/4Soa3bk+XvDaqPWJuUe1bfEi4eThBj1XmEAuYxsk= +apigo.cc/go/safe v1.3.0 h1:uctdAUsphT9p60Tk4oS5xPCe0NoIdOHfsYv4PNS0Rok= +apigo.cc/go/safe v1.3.0/go.mod h1:tC9X14V+qh0BqIrVg4UkXbl+2pEN+lj2ZNI8IjDB6Fs= +apigo.cc/go/shell v1.3.0 h1:hdxuYPN/7T2BuM/Ja8AjVUhbRqU/wpi8OjcJVziJ0nw= +apigo.cc/go/shell v1.3.0/go.mod h1:aNJiRWibxlA485yX3t+07IVAbrALKmxzv4oGEUC+hK4= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI= +golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= +golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= +golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/scheduler.go b/scheduler.go index d6051b5..bca3409 100644 --- a/scheduler.go +++ b/scheduler.go @@ -2,18 +2,22 @@ package task import ( "context" - "fmt" + "errors" + "runtime/debug" "sort" "sync" "time" + "apigo.cc/go/log" "github.com/robfig/cron/v3" ) type Scheduler struct { - cron *cron.Cron - tasks map[string]*Task - mu sync.RWMutex + cron *cron.Cron + tasks map[string]*Task + mu sync.RWMutex + wg sync.WaitGroup + running bool } var DefaultScheduler = NewScheduler() @@ -25,54 +29,110 @@ func NewScheduler() *Scheduler { } } -func (s *Scheduler) Add(name string, spec string, cmd any, opts ...Option) *Task { +func (s *Scheduler) Add(name string, spec string, fn func(context.Context) error, cfg Config) *Task { s.mu.Lock() defer s.mu.Unlock() + // Remove old task if exists + if old, exists := s.tasks[name]; exists { + s.cron.Remove(old.entryID) + if old.cancel != nil { + old.cancel() + } + } + t := &Task{ Name: name, Spec: spec, + Config: cfg, Status: StatusIdle, + Func: fn, s: s, } - switch fn := cmd.(type) { - case func(): - t.Func = fn - case func() error: - t.FuncErr = fn - default: - panic("invalid job function: must be func() or func() error") - } - - for _, opt := range opts { - opt(t) - } - - if t.Policy == PolicyQueue { - t.queue = make(chan struct{}, 100) - } - - // Remove old task if exists to replace it - if old, exists := s.tasks[name]; exists { - s.cron.Remove(old.entryID) - } - - s.tasks[name] = t - id, _ := s.cron.AddFunc(spec, func() { + id, err := s.cron.AddFunc(spec, func() { s.runTask(name) }) - t.entryID = id + if err != nil { + log.DefaultLogger.Error("failed to add task", "name", name, "spec", spec, "err", err) + return nil + } + t.entryID = id + s.tasks[name] = t return t } -func (s *Scheduler) Start() { +func (s *Scheduler) Start(ctx context.Context, logger *log.Logger) error { + s.mu.Lock() + if s.running { + s.mu.Unlock() + return nil + } + s.running = true + s.mu.Unlock() + s.cron.Start() + if logger != nil { + logger.Info("scheduler started") + } else { + log.DefaultLogger.Info("scheduler started") + } + return nil } -func (s *Scheduler) Stop() { +func (s *Scheduler) Stop(ctx context.Context) error { + s.mu.Lock() + if !s.running { + s.mu.Unlock() + return nil + } + s.running = false + s.mu.Unlock() + s.cron.Stop() + + s.mu.Lock() + for _, t := range s.tasks { + if t.cancel != nil { + t.cancel() + } + } + s.mu.Unlock() + + // Wait for running tasks with context + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + log.DefaultLogger.Info("scheduler stopped gracefully") + return nil + case <-ctx.Done(): + log.DefaultLogger.Warning("scheduler stop timed out or cancelled", "err", ctx.Err()) + return ctx.Err() + } +} + +func (s *Scheduler) Health() error { + s.mu.RLock() + defer s.mu.RUnlock() + if !s.running { + return errors.New("scheduler is not running") + } + return nil +} + +func (s *Scheduler) Run(name string) { + s.mu.RLock() + _, ok := s.tasks[name] + s.mu.RUnlock() + if ok { + go s.runTask(name) + } } func (s *Scheduler) Remove(name string) { @@ -80,7 +140,11 @@ func (s *Scheduler) Remove(name string) { defer s.mu.Unlock() if t, ok := s.tasks[name]; ok { s.cron.Remove(t.entryID) + if t.cancel != nil { + t.cancel() + } delete(s.tasks, name) + log.DefaultLogger.Info("task removed", "name", name) } } @@ -111,73 +175,51 @@ func (s *Scheduler) runTask(name string) { return } - if t.Policy == PolicySkip && t.running { + if t.Config.Policy == PolicySkip && t.running { + log.DefaultLogger.Debug("task skipped", "name", name) s.mu.Unlock() return } - if t.Policy == PolicyQueue && t.running { - s.mu.Unlock() - t.queue <- struct{}{} - return - } t.running = true t.Status = StatusRunning s.mu.Unlock() + s.wg.Add(1) + defer s.wg.Done() + + ctx, cancel := context.WithCancel(context.Background()) + if t.Config.Timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, t.Config.Timeout) + } + + s.mu.Lock() + t.cancel = cancel + s.mu.Unlock() + defer func() { + cancel() s.mu.Lock() t.running = false t.Status = StatusIdle + t.cancel = nil + s.mu.Unlock() + }() - if t.Policy == PolicyQueue && len(t.queue) > 0 { - <-t.queue - s.mu.Unlock() - go s.runTask(name) - } else { - s.mu.Unlock() + // Recover + defer func() { + if r := recover(); r != nil { + log.DefaultLogger.Error("task panic recovered", "name", name, "err", r, "stack", string(debug.Stack())) } }() start := time.Now() - var err error - - if t.Timeout > 0 { - ctx, cancel := context.WithTimeout(context.Background(), t.Timeout) - defer cancel() - - done := make(chan error, 1) - go func() { - if t.FuncErr != nil { - done <- t.FuncErr() - } else { - t.Func() - done <- nil - } - }() - - select { - case <-ctx.Done(): - err = fmt.Errorf("task %s timed out after %v", t.Name, t.Timeout) - case e := <-done: - err = e - } - } else { - if t.FuncErr != nil { - err = t.FuncErr() - } else { - t.Func() - } - } - + err := t.Func(ctx) duration := time.Since(start) + if err != nil { - if t.onError != nil { - t.onError(err) - } + log.DefaultLogger.Error("task failed", "name", name, "err", err, "duration", duration.String()) } else { - if t.onSuccess != nil { - t.onSuccess(duration) - } + log.DefaultLogger.Info("task success", "name", name, "duration", duration.String()) } } diff --git a/task.go b/task.go index 765c142..779593e 100644 --- a/task.go +++ b/task.go @@ -1,6 +1,7 @@ package task import ( + "context" "time" "github.com/robfig/cron/v3" @@ -11,7 +12,6 @@ type Policy int const ( PolicyParallel Policy = iota PolicySkip - PolicyQueue ) type Status string @@ -22,32 +22,28 @@ const ( StatusDisabled Status = "disabled" ) -type JobFunc func() -type JobFuncWithError func() error - -type Option func(*Task) +// Config 任务配置 +type Config struct { + Policy Policy `json:"policy"` + Timeout time.Duration `json:"timeout"` +} // Task 代表一个被调度的任务 type Task struct { - Name string `json:"name"` - Spec string `json:"spec"` - Policy Policy `json:"policy"` - Timeout time.Duration `json:"timeout"` - Status Status `json:"status"` + Name string `json:"name"` + Spec string `json:"spec"` + Config Config `json:"config"` + Status Status `json:"status"` - Func JobFunc `json:"-"` - FuncErr JobFuncWithError `json:"-"` - - onSuccess func(duration time.Duration) - onError func(err error) + Func func(ctx context.Context) error `json:"-"` s *Scheduler entryID cron.EntryID running bool - queue chan struct{} + cancel context.CancelFunc } -// Disable 禁用该任务,到了调度时间也不会执行 +// Disable 禁用该任务 func (t *Task) Disable() { t.s.mu.Lock() defer t.s.mu.Unlock() @@ -66,65 +62,36 @@ func (t *Task) Remove() { t.s.Remove(t.Name) } -// Global default scheduler proxy methods - -// Add 注册一个新任务到全局默认调度器,并返回 Task 对象实例 -func Add(name string, spec string, cmd any, opts ...Option) *Task { - return DefaultScheduler.Add(name, spec, cmd, opts...) +// Add 注册一个新任务到全局默认调度器 +func Add(name string, spec string, fn func(ctx context.Context) error, cfg ...Config) *Task { + var c Config + if len(cfg) > 0 { + c = cfg[0] + } + return DefaultScheduler.Add(name, spec, fn, c) } -// Get 获取全局默认调度器中指定名字的 Task 对象 +// Get 获取指定名字的 Task func Get(name string) *Task { return DefaultScheduler.Get(name) } -// List 获取全局默认调度器中所有的 Task 对象 +// List 获取所有任务 func List() []*Task { return DefaultScheduler.List() } -// Remove 根据名字从全局默认调度器中移除任务 +// Remove 从全局默认调度器中移除任务 func Remove(name string) { DefaultScheduler.Remove(name) } -// Start 启动全局默认调度器 -func Start() { - DefaultScheduler.Start() +// Run 手动执行指定名字的任务 +func Run(name string) { + DefaultScheduler.Run(name) } -// Stop 停止全局默认调度器 -func Stop() { - DefaultScheduler.Stop() -} - -// Options - -func WithPolicy(p Policy) Option { - return func(t *Task) { - t.Policy = p - } -} - -func WithTimeout(d time.Duration) Option { - return func(t *Task) { - t.Timeout = d - } -} - -func OnSuccess(fn func(duration time.Duration)) Option { - return func(t *Task) { - t.onSuccess = fn - } -} - -func OnError(fn func(err error)) Option { - return func(t *Task) { - t.onError = fn - } -} - -func init() { - // 默认在加载时启动,若无任务则仅占用极少量等待资源 - Start() +// Run 手动执行该任务 +func (t *Task) Run() { + t.s.Run(t.Name) } diff --git a/task_test.go b/task_test.go index e454826..4ae243f 100644 --- a/task_test.go +++ b/task_test.go @@ -1,43 +1,55 @@ -package task +package task_test import ( + "context" "errors" "sync/atomic" "testing" "time" + + "apigo.cc/go/task" + "os" ) +func TestMain(m *testing.M) { + task.DefaultScheduler.Start(context.Background(), nil) + os.Exit(m.Run()) +} + func TestTaskBasic(t *testing.T) { var count int32 - Add("test-basic", "@every 1s", func() { + task.Add("test-basic", "@every 1s", func(ctx context.Context) error { atomic.AddInt32(&count, 1) + return nil }) time.Sleep(2500 * time.Millisecond) - + if atomic.LoadInt32(&count) < 2 { t.Errorf("Expected at least 2 runs, got %d", count) } + task.Remove("test-basic") } func TestTaskLifecycle(t *testing.T) { var count int32 name := "test-lifecycle" - tk := Add(name, "@every 1s", func() { + tk := task.Add(name, "@every 1s", func(ctx context.Context) error { atomic.AddInt32(&count, 1) + return nil }) - - tk.Disable() // Use object method + + tk.Disable() time.Sleep(1500 * time.Millisecond) if atomic.LoadInt32(&count) != 0 { t.Errorf("Expected 0 runs while disabled, got %d", count) } - - tasks := List() + + tasks := task.List() found := false for _, item := range tasks { if item.Name == name { found = true - if item.Status != StatusDisabled { + if item.Status != task.StatusDisabled { t.Errorf("Expected status disabled, got %s", item.Status) } } @@ -45,67 +57,153 @@ func TestTaskLifecycle(t *testing.T) { if !found { t.Errorf("Task not found in list") } - + tk.Enable() time.Sleep(2500 * time.Millisecond) if atomic.LoadInt32(&count) == 0 { t.Errorf("Expected task to run after enabling") } - - // Test Remove + tk.Remove() - if Get(name) != nil { + if task.Get(name) != nil { t.Errorf("Expected task to be nil after remove") } } func TestTaskPolicySkip(t *testing.T) { var count int32 - Add("test-skip", "@every 1s", func() { + task.Add("test-skip", "@every 1s", func(ctx context.Context) error { atomic.AddInt32(&count, 1) - time.Sleep(1500 * time.Millisecond) // Longer than interval - }, WithPolicy(PolicySkip)) - - time.Sleep(3500 * time.Millisecond) // T=0 (starts, running), T=1 (skip), T=2 (starts after T=0 finishes at 1.5, next trigger at T=2), T=3 (skip) - - // T=0: run 1 starts - // T=1: skip - // T=1.5: run 1 ends - // T=2: run 2 starts - // T=3: skip - // T=3.5: run 2 ends - + time.Sleep(1500 * time.Millisecond) + return nil + }, task.Config{Policy: task.PolicySkip}) + + time.Sleep(3500 * time.Millisecond) + if atomic.LoadInt32(&count) != 2 { t.Errorf("Expected exactly 2 runs due to skip policy, got %d", count) } + task.Remove("test-skip") } func TestTaskTimeout(t *testing.T) { - var errCount int32 - Add("test-timeout", "@every 1s", func() { - time.Sleep(2 * time.Second) - }, WithTimeout(500*time.Millisecond), OnError(func(err error) { - atomic.AddInt32(&errCount, 1) - })) - + var timeoutHit int32 + task.Add("test-timeout", "@every 1s", func(ctx context.Context) error { + select { + case <-ctx.Done(): + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + atomic.AddInt32(&timeoutHit, 1) + } + return ctx.Err() + case <-time.After(2 * time.Second): + return nil + } + }, task.Config{Timeout: 500 * time.Millisecond}) + time.Sleep(2500 * time.Millisecond) + + if atomic.LoadInt32(&timeoutHit) < 2 { + t.Errorf("Expected at least 2 timeout hits, got %d", timeoutHit) + } + task.Remove("test-timeout") +} + +func TestTaskPanicRecover(t *testing.T) { + var count int32 + task.Add("test-panic", "@every 1s", func(ctx context.Context) error { + atomic.AddInt32(&count, 1) + panic("boom") + }) + + time.Sleep(2500 * time.Millisecond) + + if atomic.LoadInt32(&count) < 2 { + t.Errorf("Expected at least 2 runs despite panics, got %d", count) + } + task.Remove("test-panic") +} + +func TestTaskManualRun(t *testing.T) { + var count int32 + name := "test-manual" + tk := task.Add(name, "@every 1h", func(ctx context.Context) error { + atomic.AddInt32(&count, 1) + return nil + }) + + // Manual run via global function + task.Run(name) + time.Sleep(100 * time.Millisecond) + if atomic.LoadInt32(&count) != 1 { + t.Errorf("Expected 1 run after manual Run, got %d", count) + } + + // Manual run via task object + tk.Run() + time.Sleep(100 * time.Millisecond) + if atomic.LoadInt32(&count) != 2 { + t.Errorf("Expected 2 runs after manual Run, got %d", count) + } + task.Remove(name) +} + +func TestGracefulStop(t *testing.T) { + s := task.NewScheduler() + s.Start(context.Background(), nil) + + var finished int32 + // Use @every 1h to avoid auto-runs. + s.Add("test-graceful", "@every 1h", func(ctx context.Context) error { + time.Sleep(1 * time.Second) + atomic.AddInt32(&finished, 1) + return nil + }, task.Config{}) + + go s.Run("test-graceful") // Trigger manually - if atomic.LoadInt32(&errCount) < 2 { - t.Errorf("Expected at least 2 timeout errors, got %d", errCount) + time.Sleep(200 * time.Millisecond) // Let it start + start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + s.Stop(ctx) + duration := time.Since(start) + + if atomic.LoadInt32(&finished) != 1 { + t.Errorf("Expected exactly 1 task to finish during graceful stop, got %d", atomic.LoadInt32(&finished)) + } + if duration < 500*time.Millisecond { + t.Errorf("Stop returned too quickly, didn't wait for task") } } -func TestTaskWithError(t *testing.T) { - var errCount int32 - Add("test-error", "@every 1s", func() error { - return errors.New("expected error") - }, OnError(func(err error) { - atomic.AddInt32(&errCount, 1) - })) - - time.Sleep(2500 * time.Millisecond) - - if atomic.LoadInt32(&errCount) < 2 { - t.Errorf("Expected at least 2 errors, got %d", errCount) +func BenchmarkTaskRegistration(b *testing.B) { + s := task.NewScheduler() + fn := func(ctx context.Context) error { return nil } + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Add("bench", "@every 1h", fn, task.Config{}) } } + +func BenchmarkTaskExecutionOverhead(b *testing.B) { + // Since we use cron, we can't easily benchmark the trigger frequency directly, + // but we can benchmark the runTask method which contains the wrapper overhead. + // We need to use unexported methods or reflection, or just benchmark a mocked run. + // Actually, let's just benchmark the Add + List + Get operations which are common. + s := task.NewScheduler() + fn := func(ctx context.Context) error { return nil } + s.Add("bench", "@every 1h", fn, task.Config{}) + + b.Run("Get", func(b *testing.B) { + for i := 0; i < b.N; i++ { + s.Get("bench") + } + }) + + b.Run("List", func(b *testing.B) { + for i := 0; i < b.N; i++ { + s.List() + } + }) +}