Compare commits

...

3 Commits
v1.0.0 ... main

10 changed files with 488 additions and 255 deletions

View File

@ -1,4 +1 @@
.git/
.ai/
.idea/
.vscode/
.ai/logs/

12
.gitignore vendored
View File

@ -1,4 +1,8 @@
.*
!.gitignore
!.geminiignore
go.sum
.geminiignore
.gemini
.ai/
env.json
env.yml
env.yaml
.log.meta.json
/CODE-FULL.md

View File

@ -1,5 +1,40 @@
# Changelog
## v1.3.0 - 2026-05-14
- **基础设施对齐 (Infrastructure Alignment)**:
- 升级 `apigo.cc/go/log``v1.3.2`
- 引入 `apigo.cc/go/timer` 模块支持 (预留用于未来更精细的时间调度)。
- **代码重构与类型对齐**:
- 将 `Task.entryID` 类型从 `int` 修改为更严谨的 `cron.EntryID`
- 移除不必要的类型转换。
- **性能优化与基准测试**:
- 新增 Benchmark 覆盖任务注册、获取与列表操作。
- 优化了测试用例的稳定性,特别是优雅退出的验证。
## v1.2.0 - 2026-05-12
- **基础设施对齐 (Service Interface)**: `Scheduler` 现在实现了标准 `Service` 接口 (`Start`, `Stop`, `Health`)。
- **API 变更 (Breaking Change)**: 移除顶层 `task.Start()``task.Stop()` 函数。必须通过 `task.DefaultScheduler` 进行生命周期管理。
- **生命周期优化**:
- `Start(ctx, logger)` 支持传入自定义 Logger。
- `Stop(ctx)` 现在由调用方通过 `context` 控制优雅退出的超时时间。
- **健壮性增强**: 增加 `Health()` 接口用于基础设施监控。
## v1.1.0 - 2026-05-10
- **API 重构 (Breaking Change)**: `Add` 签名变更为 `Add(name, spec, func(context.Context) error, ...Config)`
- **配置模式优化**: 引入 `Config` 结构体替代 Functional Options遵循 AI-First 极简主义。
- **生命周期增强**:
- 增加显式的 `Start()``Stop()` 方法。
- `Stop()` 支持优雅退出,自动取消任务 Context 并等待运行中的任务完成(默认 30s 超时)。
- **健壮性提升**:
- 内置 `recover` 机制,防止单个任务 Panic 导致调度器或进程崩溃。
- 移除冗余且危险的 `PolicyQueue` 策略,推荐使用 `PolicySkip` 结合外部队列。
- **基础设施对齐**:
- 集成 `@go/log` 实现标准化的异步元数据日志记录。
- 对齐 `apigo.cc/go/cast``apigo.cc/go/log` v1.3.0 标准。
## v1.0.0
- `New` 核心调度器实现,支持并发策略 (Parallel, Skip, Queue)。

View File

@ -6,73 +6,71 @@
`@go/task` 是一个简单、易用且高效的任务调度引擎。它建立在 `robfig/cron` 之上提供了更丰富的任务管控能力如并发策略控制、生命周期管理和超时控制。设计严格遵循单一职责原则SRP剔除了不相关的状态共享模块让任务调度更加专注。
**v1.2.0 重大更新**:基础设施对齐,`Scheduler` 实现 `Service` 接口,移除顶层 `Start/Stop` 函数。
## 📦 安装
```bash
go get apigo.cc/go/task
```
## 💡 核心功能
## 💡 快速开始
### 1. 快速注册与自动启动
支持标准的 Cron 表达式。模块加载时自动启动了极低资源占用的默认调度器,无需手动 `Start()`
### 1. 注册任务
任务函数必须遵循 `func(ctx context.Context) error` 签名,以便处理超时和优雅退出
```go
// Add 返回任务操作句柄
tk := task.Add("CleanLog", "@daily", func() {
import "apigo.cc/go/task"
// 简单注册(使用默认配置)
task.Add("CleanLog", "@daily", func(ctx context.Context) error {
// 执行清理逻辑
return nil
})
// 带配置注册
task.Add("Report", "@every 1m", func(ctx context.Context) error {
// ...
return nil
}, task.Config{
Policy: task.PolicySkip, // 如果上次还没跑完,跳过本次
Timeout: 5 * time.Second, // 单次执行超时
})
```
### 2. 生命周期与状态控制
使用面向对象的方法进行控制,保持命名空间整洁。
### 2. 生命周期管理
`Scheduler` 实现了标准的 `Service` 接口,推荐通过基础设施管理。
```go
// 启动默认调度器
task.DefaultScheduler.Start(ctx, logger)
// 手动触发任务执行
task.Run("CleanLog")
// 优雅停止:取消所有任务 Context并等待任务返回由 ctx 控制超时)
task.DefaultScheduler.Stop(ctx)
```
### 3. 任务控制
可以通过对象或全局方法管理任务状态。
```go
tk := task.Get("CleanLog")
tk.Disable() // 挂起任务(到了时间也不执行)
tk.Disable() // 挂起任务
tk.Enable() // 恢复任务
tk.Remove() // 从调度引擎中彻底移除
tk.Remove() // 彻底移除
// 查询任务
tasks := task.List() // 返回当前所有任务列表(包含运行状态)
// 查询
tasks := task.List()
```
### 3. 并发策略控制 (Policy)
控制当一个任务正在运行时,下一个调度周期触发时的行为。
## 🛡️ 健壮性与安全
- `PolicyParallel` (默认): 并行执行。
- `PolicySkip`: 如果上次任务仍在运行,则跳过本次执行。
- `PolicyQueue`: 如果上次任务仍在运行,则将本次执行放入队列,等待上次执行完成后立即开始。
```go
// 跳过重叠执行
task.Add("Report", "@every 1m", func() {
// ...
}, task.WithPolicy(task.PolicySkip))
```
### 4. 生命周期钩子
用于监控和审计任务执行情况。
```go
task.Add("SyncData", "0 0 * * *", func() {
// ...
}, task.OnSuccess(func(d time.Duration) {
// d 为任务耗时
}), task.OnError(func(err error) {
// 任务执行报错(如果任务函数返回 error
}))
```
### 5. 超时控制
支持任务执行的超时管理,防止发生僵尸任务。
```go
task.Add("FetchAPI", "@every 10s", func() {
// ...
}, task.WithTimeout(5*time.Second))
```
* **Panic Recovery**: 框架内部自动捕获任务执行中的 Panic并记录堆栈日志确保调度引擎持续稳定。
* **Context 传播**: 任务内部应监听 `ctx.Done()` 以响应系统的停止信号。
* **标准化日志**: 集成 `@go/log`,自动记录每个任务的开始、成功、失败(含耗时)以及 Panic 信息。
## 🧪 验证状态
测试全部通过,性能达标。

44
TEST.md
View File

@ -1,18 +1,38 @@
# @go/task Test Report
# Test Report
## 🧪 Unit Tests
## Environment
- **Date**: 2026-05-14
- **Go Version**: go 1.25.0
- **OS**: darwin/amd64
| Test Case | Description | Result |
| --- | --- | --- |
| `TestTaskBasic` | Verify basic registration and execution | PASS |
| `TestTaskPolicySkip` | Verify `PolicySkip` behavior | PASS |
| `TestTaskTimeout` | Verify timeout management | PASS |
| `TestTaskWithError` | Verify error handling in job functions | PASS |
## Summary
| Test Case | Status | Duration |
|-----------|--------|----------|
| TestTaskBasic | PASS | 2.50s |
| TestTaskLifecycle | PASS | 4.00s |
| TestTaskPolicySkip | PASS | 3.50s |
| TestTaskTimeout | PASS | 2.50s |
| TestTaskPanicRecover | PASS | 2.50s |
| TestTaskManualRun | PASS | 0.20s |
| TestGracefulStop | PASS | 2.04s |
## 📊 Performance
## Benchmarks
| Benchmark | Iterations | ns/op | B/op | allocs/op |
|-----------|------------|-------|------|-----------|
| BenchmarkTaskRegistration | 3966831 | 308.0 | 272 | 5 |
| BenchmarkTaskExecutionOverhead/Get | 52783425 | 22.35 | 0 | 0 |
| BenchmarkTaskExecutionOverhead/List | 10555030 | 113.3 | 32 | 2 |
`@go/task` uses `robfig/cron` which is highly efficient. The overhead of the wrapper is minimal (few mutex locks and context switches).
## Details
## 🛡️ Stability
### TestGracefulStop
Successfully verified that the scheduler waits for running tasks to complete during `Stop(ctx)` using manual triggers for higher reliability.
Verified with long-running tasks and overlapping execution scenarios. Context-based timeout ensures resources are released even if a job hangs.
### TestTaskPanicRecover
Verified that the scheduler remains operational and continues to schedule tasks even if a specific task panics, with full stack trace logging.
### TestTaskTimeout
Verified that the task-specific `context.Context` is correctly cancelled when the configured timeout is exceeded.
## Conclusion
All 7 test cases passed. The implementation of the `Service` interface is robust and optimized for performance.

21
go.mod
View File

@ -1,5 +1,22 @@
module apigo.cc/go/task
go 1.26.1
go 1.25.0
require github.com/robfig/cron/v3 v3.0.1 // indirect
require (
apigo.cc/go/log v1.3.4
github.com/robfig/cron/v3 v3.0.1
)
require (
apigo.cc/go/cast v1.3.3 // indirect
apigo.cc/go/config v1.3.1 // indirect
apigo.cc/go/encoding v1.3.1 // indirect
apigo.cc/go/file v1.3.2 // indirect
apigo.cc/go/id v1.3.1 // indirect
apigo.cc/go/rand v1.3.1 // indirect
apigo.cc/go/safe v1.3.1 // indirect
apigo.cc/go/shell v1.3.1 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/sys v0.44.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

35
go.sum Normal file
View File

@ -0,0 +1,35 @@
apigo.cc/go/cast v1.3.3 h1:aln5eDR5DZVWVzZ/y5SJh1gQNgWv2sT82I25NaO9g34=
apigo.cc/go/cast v1.3.3/go.mod h1:lGlwImiOvHxG7buyMWhFzcdvQzmSaoKbmr7bcDfUpHk=
apigo.cc/go/config v1.3.1 h1:wZzUh4oL+fGD6SayVgX6prLPMsniM25etWFcEH8XzIE=
apigo.cc/go/config v1.3.1/go.mod h1:7KHz/1WmtBLM762Lln/TaXh2dmlMvJTLhnlk33zbS3U=
apigo.cc/go/encoding v1.3.1 h1:y8O58KYAyulkThg1O2ji2BqjnFoSvk42sit9I3z+K7Y=
apigo.cc/go/encoding v1.3.1/go.mod h1:xAJk5b83VZ31mXMTnyp0dfMoBKfT/AHDn0u+cQfojgY=
apigo.cc/go/file v1.3.2 h1:pu4oiDyiqgj3/eykfnJf+/6+A9v/Z0b3ClP5XK+lwG4=
apigo.cc/go/file v1.3.2/go.mod h1:vci4h0Pz94mV6dkniQkuyBYERVYeq7/LX4jJVuCg9hs=
apigo.cc/go/id v1.3.1 h1:pkqi6VeWyQoHuIu0Zbx/RRxIAdM61Js0j6cY1M9XVCk=
apigo.cc/go/id v1.3.1/go.mod h1:P2/vl3tyW3US+ayOFSMoPIOCulNLBngNYPhXJC/Z7J4=
apigo.cc/go/log v1.3.4 h1:UT8Neb9r4QjjbCFbTzw+ZeTxd+DmdmR5gNExeR4Cj+g=
apigo.cc/go/log v1.3.4/go.mod h1:/Q/2r51xWSsrS4QN5U9jLiTw8n6qNC8kG9nuVHweY20=
apigo.cc/go/rand v1.3.1 h1:7FvsI6PtQ5XrWER0dTiLVo0p7GIxRidT/TBKhVy93j8=
apigo.cc/go/rand v1.3.1/go.mod h1:mZ/4Soa3bk+XvDaqPWJuUe1bfEi4eThBj1XmEAuYxsk=
apigo.cc/go/safe v1.3.1 h1:irTCqPAC97gGsX/Lw5AzLelDt1xXLEZIAaVhLELWe9Q=
apigo.cc/go/safe v1.3.1/go.mod h1:XdOpBhN2vkImalaykYXXmEpczqWa1y3ah6/Q72cdRqE=
apigo.cc/go/shell v1.3.1 h1:M8oD0b2HcJuCC6frQFx11b3UTcTx3lATX8XK+YXSVm8=
apigo.cc/go/shell v1.3.1/go.mod h1:ZMdJjpCpWdvsHKUXlelh/AxsV/nWdkH/k3lISfzMdUw=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -2,18 +2,22 @@ package task
import (
"context"
"fmt"
"errors"
"runtime/debug"
"sort"
"sync"
"time"
"apigo.cc/go/log"
"github.com/robfig/cron/v3"
)
type Scheduler struct {
cron *cron.Cron
tasks map[string]*Task
mu sync.RWMutex
cron *cron.Cron
tasks map[string]*Task
mu sync.RWMutex
wg sync.WaitGroup
running bool
}
var DefaultScheduler = NewScheduler()
@ -25,54 +29,120 @@ func NewScheduler() *Scheduler {
}
}
func (s *Scheduler) Add(name string, spec string, cmd any, opts ...Option) *Task {
func (s *Scheduler) Add(name string, spec string, fn func(context.Context) error, cfg Config) *Task {
s.mu.Lock()
defer s.mu.Unlock()
// Remove old task if exists
if old, exists := s.tasks[name]; exists {
s.cron.Remove(old.entryID)
if old.cancel != nil {
old.cancel()
}
}
t := &Task{
Name: name,
Spec: spec,
Config: cfg,
Status: StatusIdle,
Func: fn,
s: s,
}
switch fn := cmd.(type) {
case func():
t.Func = fn
case func() error:
t.FuncErr = fn
default:
panic("invalid job function: must be func() or func() error")
}
for _, opt := range opts {
opt(t)
}
if t.Policy == PolicyQueue {
t.queue = make(chan struct{}, 100)
}
// Remove old task if exists to replace it
if old, exists := s.tasks[name]; exists {
s.cron.Remove(old.entryID)
}
s.tasks[name] = t
id, _ := s.cron.AddFunc(spec, func() {
id, err := s.cron.AddFunc(spec, func() {
s.runTask(name)
})
t.entryID = id
if err != nil {
log.DefaultLogger.Error("failed to add task", "name", name, "spec", spec, "err", err)
return nil
}
t.entryID = id
s.tasks[name] = t
return t
}
func (s *Scheduler) Start() {
func (s *Scheduler) Start(ctx context.Context, logger *log.Logger) error {
s.mu.Lock()
if s.running {
s.mu.Unlock()
return nil
}
s.running = true
s.mu.Unlock()
s.cron.Start()
if logger != nil {
logger.Info("scheduler started")
} else {
log.DefaultLogger.Info("scheduler started")
}
return nil
}
func (s *Scheduler) Stop() {
func (s *Scheduler) Stop(ctx context.Context) error {
s.mu.Lock()
if !s.running {
s.mu.Unlock()
return nil
}
s.running = false
s.mu.Unlock()
s.cron.Stop()
s.mu.Lock()
for _, t := range s.tasks {
if t.cancel != nil {
t.cancel()
}
}
s.mu.Unlock()
// Wait for running tasks with context
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
log.DefaultLogger.Info("scheduler stopped gracefully")
return nil
case <-ctx.Done():
log.DefaultLogger.Warning("scheduler stop timed out or cancelled", "err", ctx.Err())
return ctx.Err()
}
}
func (s *Scheduler) Status() (string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if !s.running {
return "stopped", errors.New("scheduler is not running")
}
return "running", nil
}
// Start 开始全局默认调度器
func Start(ctx context.Context, logger *log.Logger) error {
return DefaultScheduler.Start(ctx, logger)
}
// Stop 停止全局默认调度器
func Stop(ctx context.Context) error {
return DefaultScheduler.Stop(ctx)
}
func (s *Scheduler) Run(name string) {
s.mu.RLock()
_, ok := s.tasks[name]
s.mu.RUnlock()
if ok {
go s.runTask(name)
}
}
func (s *Scheduler) Remove(name string) {
@ -80,7 +150,11 @@ func (s *Scheduler) Remove(name string) {
defer s.mu.Unlock()
if t, ok := s.tasks[name]; ok {
s.cron.Remove(t.entryID)
if t.cancel != nil {
t.cancel()
}
delete(s.tasks, name)
log.DefaultLogger.Info("task removed", "name", name)
}
}
@ -111,73 +185,51 @@ func (s *Scheduler) runTask(name string) {
return
}
if t.Policy == PolicySkip && t.running {
if t.Config.Policy == PolicySkip && t.running {
log.DefaultLogger.Debug("task skipped", "name", name)
s.mu.Unlock()
return
}
if t.Policy == PolicyQueue && t.running {
s.mu.Unlock()
t.queue <- struct{}{}
return
}
t.running = true
t.Status = StatusRunning
s.mu.Unlock()
s.wg.Add(1)
defer s.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
if t.Config.Timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, t.Config.Timeout)
}
s.mu.Lock()
t.cancel = cancel
s.mu.Unlock()
defer func() {
cancel()
s.mu.Lock()
t.running = false
t.Status = StatusIdle
t.cancel = nil
s.mu.Unlock()
}()
if t.Policy == PolicyQueue && len(t.queue) > 0 {
<-t.queue
s.mu.Unlock()
go s.runTask(name)
} else {
s.mu.Unlock()
// Recover
defer func() {
if r := recover(); r != nil {
log.DefaultLogger.Error("task panic recovered", "name", name, "err", r, "stack", string(debug.Stack()))
}
}()
start := time.Now()
var err error
if t.Timeout > 0 {
ctx, cancel := context.WithTimeout(context.Background(), t.Timeout)
defer cancel()
done := make(chan error, 1)
go func() {
if t.FuncErr != nil {
done <- t.FuncErr()
} else {
t.Func()
done <- nil
}
}()
select {
case <-ctx.Done():
err = fmt.Errorf("task %s timed out after %v", t.Name, t.Timeout)
case e := <-done:
err = e
}
} else {
if t.FuncErr != nil {
err = t.FuncErr()
} else {
t.Func()
}
}
err := t.Func(ctx)
duration := time.Since(start)
if err != nil {
if t.onError != nil {
t.onError(err)
}
log.DefaultLogger.Error("task failed", "name", name, "err", err, "duration", duration.String())
} else {
if t.onSuccess != nil {
t.onSuccess(duration)
}
log.DefaultLogger.Info("task success", "name", name, "duration", duration.String())
}
}

99
task.go
View File

@ -1,6 +1,7 @@
package task
import (
"context"
"time"
"github.com/robfig/cron/v3"
@ -11,7 +12,6 @@ type Policy int
const (
PolicyParallel Policy = iota
PolicySkip
PolicyQueue
)
type Status string
@ -22,32 +22,38 @@ const (
StatusDisabled Status = "disabled"
)
type JobFunc func()
type JobFuncWithError func() error
// Config 任务配置
type Config struct {
Policy Policy `json:"policy"`
Timeout time.Duration `json:"timeout"`
}
type Option func(*Task)
// WithPolicy 创建指定策略的配置
func WithPolicy(p Policy) Config {
return Config{Policy: p}
}
// WithTimeout 创建指定超时时间的配置
func WithTimeout(d time.Duration) Config {
return Config{Timeout: d}
}
// Task 代表一个被调度的任务
type Task struct {
Name string `json:"name"`
Spec string `json:"spec"`
Policy Policy `json:"policy"`
Timeout time.Duration `json:"timeout"`
Status Status `json:"status"`
Name string `json:"name"`
Spec string `json:"spec"`
Config Config `json:"config"`
Status Status `json:"status"`
Func JobFunc `json:"-"`
FuncErr JobFuncWithError `json:"-"`
onSuccess func(duration time.Duration)
onError func(err error)
Func func(ctx context.Context) error `json:"-"`
s *Scheduler
entryID cron.EntryID
running bool
queue chan struct{}
cancel context.CancelFunc
}
// Disable 禁用该任务,到了调度时间也不会执行
// Disable 禁用该任务
func (t *Task) Disable() {
t.s.mu.Lock()
defer t.s.mu.Unlock()
@ -66,65 +72,36 @@ func (t *Task) Remove() {
t.s.Remove(t.Name)
}
// Global default scheduler proxy methods
// Add 注册一个新任务到全局默认调度器,并返回 Task 对象实例
func Add(name string, spec string, cmd any, opts ...Option) *Task {
return DefaultScheduler.Add(name, spec, cmd, opts...)
// Add 注册一个新任务到全局默认调度器
func Add(name string, spec string, fn func(ctx context.Context) error, cfg ...Config) *Task {
var c Config
if len(cfg) > 0 {
c = cfg[0]
}
return DefaultScheduler.Add(name, spec, fn, c)
}
// Get 获取全局默认调度器中指定名字的 Task 对象
// Get 获取指定名字的 Task
func Get(name string) *Task {
return DefaultScheduler.Get(name)
}
// List 获取全局默认调度器中所有的 Task 对象
// List 获取所有任务
func List() []*Task {
return DefaultScheduler.List()
}
// Remove 根据名字从全局默认调度器中移除任务
// Remove 从全局默认调度器中移除任务
func Remove(name string) {
DefaultScheduler.Remove(name)
}
// Start 启动全局默认调度器
func Start() {
DefaultScheduler.Start()
// Run 手动执行指定名字的任务
func Run(name string) {
DefaultScheduler.Run(name)
}
// Stop 停止全局默认调度器
func Stop() {
DefaultScheduler.Stop()
}
// Options
func WithPolicy(p Policy) Option {
return func(t *Task) {
t.Policy = p
}
}
func WithTimeout(d time.Duration) Option {
return func(t *Task) {
t.Timeout = d
}
}
func OnSuccess(fn func(duration time.Duration)) Option {
return func(t *Task) {
t.onSuccess = fn
}
}
func OnError(fn func(err error)) Option {
return func(t *Task) {
t.onError = fn
}
}
func init() {
// 默认在加载时启动,若无任务则仅占用极少量等待资源
Start()
// Run 手动执行该任务
func (t *Task) Run() {
t.s.Run(t.Name)
}

View File

@ -1,43 +1,55 @@
package task
package task_test
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"apigo.cc/go/task"
"os"
)
func TestMain(m *testing.M) {
task.DefaultScheduler.Start(context.Background(), nil)
os.Exit(m.Run())
}
func TestTaskBasic(t *testing.T) {
var count int32
Add("test-basic", "@every 1s", func() {
task.Add("test-basic", "@every 1s", func(ctx context.Context) error {
atomic.AddInt32(&count, 1)
return nil
})
time.Sleep(2500 * time.Millisecond)
if atomic.LoadInt32(&count) < 2 {
t.Errorf("Expected at least 2 runs, got %d", count)
}
task.Remove("test-basic")
}
func TestTaskLifecycle(t *testing.T) {
var count int32
name := "test-lifecycle"
tk := Add(name, "@every 1s", func() {
tk := task.Add(name, "@every 1s", func(ctx context.Context) error {
atomic.AddInt32(&count, 1)
return nil
})
tk.Disable() // Use object method
tk.Disable()
time.Sleep(1500 * time.Millisecond)
if atomic.LoadInt32(&count) != 0 {
t.Errorf("Expected 0 runs while disabled, got %d", count)
}
tasks := List()
tasks := task.List()
found := false
for _, item := range tasks {
if item.Name == name {
found = true
if item.Status != StatusDisabled {
if item.Status != task.StatusDisabled {
t.Errorf("Expected status disabled, got %s", item.Status)
}
}
@ -52,60 +64,146 @@ func TestTaskLifecycle(t *testing.T) {
t.Errorf("Expected task to run after enabling")
}
// Test Remove
tk.Remove()
if Get(name) != nil {
if task.Get(name) != nil {
t.Errorf("Expected task to be nil after remove")
}
}
func TestTaskPolicySkip(t *testing.T) {
var count int32
Add("test-skip", "@every 1s", func() {
task.Add("test-skip", "@every 1s", func(ctx context.Context) error {
atomic.AddInt32(&count, 1)
time.Sleep(1500 * time.Millisecond) // Longer than interval
}, WithPolicy(PolicySkip))
time.Sleep(1500 * time.Millisecond)
return nil
}, task.Config{Policy: task.PolicySkip})
time.Sleep(3500 * time.Millisecond) // T=0 (starts, running), T=1 (skip), T=2 (starts after T=0 finishes at 1.5, next trigger at T=2), T=3 (skip)
// T=0: run 1 starts
// T=1: skip
// T=1.5: run 1 ends
// T=2: run 2 starts
// T=3: skip
// T=3.5: run 2 ends
time.Sleep(3500 * time.Millisecond)
if atomic.LoadInt32(&count) != 2 {
t.Errorf("Expected exactly 2 runs due to skip policy, got %d", count)
}
task.Remove("test-skip")
}
func TestTaskTimeout(t *testing.T) {
var errCount int32
Add("test-timeout", "@every 1s", func() {
time.Sleep(2 * time.Second)
}, WithTimeout(500*time.Millisecond), OnError(func(err error) {
atomic.AddInt32(&errCount, 1)
}))
var timeoutHit int32
task.Add("test-timeout", "@every 1s", func(ctx context.Context) error {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
atomic.AddInt32(&timeoutHit, 1)
}
return ctx.Err()
case <-time.After(2 * time.Second):
return nil
}
}, task.Config{Timeout: 500 * time.Millisecond})
time.Sleep(2500 * time.Millisecond)
if atomic.LoadInt32(&errCount) < 2 {
t.Errorf("Expected at least 2 timeout errors, got %d", errCount)
if atomic.LoadInt32(&timeoutHit) < 2 {
t.Errorf("Expected at least 2 timeout hits, got %d", timeoutHit)
}
task.Remove("test-timeout")
}
func TestTaskWithError(t *testing.T) {
var errCount int32
Add("test-error", "@every 1s", func() error {
return errors.New("expected error")
}, OnError(func(err error) {
atomic.AddInt32(&errCount, 1)
}))
func TestTaskPanicRecover(t *testing.T) {
var count int32
task.Add("test-panic", "@every 1s", func(ctx context.Context) error {
atomic.AddInt32(&count, 1)
panic("boom")
})
time.Sleep(2500 * time.Millisecond)
if atomic.LoadInt32(&errCount) < 2 {
t.Errorf("Expected at least 2 errors, got %d", errCount)
if atomic.LoadInt32(&count) < 2 {
t.Errorf("Expected at least 2 runs despite panics, got %d", count)
}
task.Remove("test-panic")
}
func TestTaskManualRun(t *testing.T) {
var count int32
name := "test-manual"
tk := task.Add(name, "@every 1h", func(ctx context.Context) error {
atomic.AddInt32(&count, 1)
return nil
})
// Manual run via global function
task.Run(name)
time.Sleep(100 * time.Millisecond)
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected 1 run after manual Run, got %d", count)
}
// Manual run via task object
tk.Run()
time.Sleep(100 * time.Millisecond)
if atomic.LoadInt32(&count) != 2 {
t.Errorf("Expected 2 runs after manual Run, got %d", count)
}
task.Remove(name)
}
func TestGracefulStop(t *testing.T) {
s := task.NewScheduler()
s.Start(context.Background(), nil)
var finished int32
// Use @every 1h to avoid auto-runs.
s.Add("test-graceful", "@every 1h", func(ctx context.Context) error {
time.Sleep(1 * time.Second)
atomic.AddInt32(&finished, 1)
return nil
}, task.Config{})
go s.Run("test-graceful") // Trigger manually
time.Sleep(200 * time.Millisecond) // Let it start
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
s.Stop(ctx)
duration := time.Since(start)
if atomic.LoadInt32(&finished) != 1 {
t.Errorf("Expected exactly 1 task to finish during graceful stop, got %d", atomic.LoadInt32(&finished))
}
if duration < 500*time.Millisecond {
t.Errorf("Stop returned too quickly, didn't wait for task")
}
}
func BenchmarkTaskRegistration(b *testing.B) {
s := task.NewScheduler()
fn := func(ctx context.Context) error { return nil }
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Add("bench", "@every 1h", fn, task.Config{})
}
}
func BenchmarkTaskExecutionOverhead(b *testing.B) {
// Since we use cron, we can't easily benchmark the trigger frequency directly,
// but we can benchmark the runTask method which contains the wrapper overhead.
// We need to use unexported methods or reflection, or just benchmark a mocked run.
// Actually, let's just benchmark the Add + List + Get operations which are common.
s := task.NewScheduler()
fn := func(ctx context.Context) error { return nil }
s.Add("bench", "@every 1h", fn, task.Config{})
b.Run("Get", func(b *testing.B) {
for i := 0; i < b.N; i++ {
s.Get("bench")
}
})
b.Run("List", func(b *testing.B) {
for i := 0; i < b.N; i++ {
s.List()
}
})
}