Golang通用任务调度器需解耦任务定义、触发逻辑与执行控制,含Task、Trigger、Scheduler、Executor四大模块,支持扩展接口、轻量状态管理及安全并发机制。
用 Golang 设计一个通用任务调度器,核心在于解耦「任务定义」、「触发逻辑」和「执行控制」。不依赖 cron 表达式硬编码,也不绑定具体业务,而是提供可扩展的接口、轻量的状态管理与安全的并发执行机制。下面从模块结构到关键实现逐步说明。
一个健壮的调度器通常包含以下 4 个主模块:
func() error)、元信息(超时、重试、标签等),不关心何时运行chan time.Time 或 chan struct{}
很多初版调度器崩在边界场景。以下是必须处理的细节:
select { case 或带缓冲 channel
sync.Map 存储 taskID → *Task,并配合 context.CancelFunc 控制生命周期time.AfterFunc 做周期任务 —— 它不支持取消,应改用 time.Ticker + select + ctx.Done()
以下是最小可行结构,使用 robfig/cron/v3 作为 Trigger 实现之一:
type Task struct {
ID string
Func func() error
Timeout time.Duration
MaxRetry int
}
type Scheduler struct {
mu sync.RWMutex
tasks map[string]Task
executors map[string]executor // taskID → executor
cron *cron.Cron
}
func (s Scheduler) RegisterCronTask(id, spec string, fn func() error) error {
t := &Task{ID: id, Func: fn, Timeout: 30 time.Second}
s.mu.Lock()
s.tasks[id] = t
s.mu.Unlock()
return s.cron.AddFunc(spec, func() {
s.runTask(id)
})}
func (s *Scheduler) runTask(id string) {
s.mu.RLock()
t, ok := s.tasks[id]
s.mu.RUnlock()
if !ok { return }
ctx, can
cel := context.WithTimeout(context.Background(), t.Timeout)
defer cancel()
// 使用独立 executor 控制并发与重试
result := s.newExecutor(t).Run(ctx)
if result.Err != nil {
log.Printf("task %s failed: %v", id, result.Err)
}
}
注意:cron.Cron 本身已处理 panic 捕获和 goroutine 安全,无需重复封装;重点是把任务执行收口到 runTask,方便后续加日志、指标、熔断等能力。
进阶可扩展方向
通用 ≠ 大而全。真正易维护的调度器会预留这些扩展点:
Trigger interface { Start() ,轻松接入 Kafka offset、Redis key expire、甚至 Prometheus alert webhook
Persister interface { Save(TaskState); Load(taskID) (TaskState, error) },对接 SQLite / Redis / PostgreSQL/health/scheduler 返回活跃任务数、最近失败率、ticker 延迟等,便于可观测性集成embed.FS 内置静态页,提供任务启停、手动触发、日志查看(结合 log.Logger 的 writer hook)基本上就这些。不复杂但容易忽略的是状态一致性与错误传播路径 —— 把 panic、ctx cancel、网络超时、函数返回 error 这四类失败统一建模,调度器才真正可靠。