Go中不能直接用chan *Task作任务队列,因其缺乏动态启停、多消费者协调、积压控制、状态追踪等能力;需结合context.Context、sync.WaitGroup及缓冲chan构建安全队列,持久化场景则须换用Redis、RabbitMQ等专业方案。
Go 里没有内置的“任务队列”类型,chan 是基础,但直接裸用 chan 做任务队列容易出错——比如漏处理、panic、goroutine 泄漏、无缓冲导致阻塞等。
chan *Task 当任务队列?看似简单:开一个 chan *Task,生产者 send,消费者 range。但实际中会立刻撞上几个硬伤:
chan 关闭后无法再写入,而任务队列通常需要动态启停,不是“一次性消费完就关”chan 时,range 无法感知谁该退出;若用 select + default 轮询,又浪费 CPUchan 满了就阻塞或 panic(如果没做 select 非阻塞判断)chan 打补丁sync.WaitGroup + chan 组合怎么安全启停?核心是分离“任务流”和“生命周期控制”。不靠 close(chan) 通知结束,而是用 context.Context 控制 goroutine 存活,用 sync.WaitGroup 等待所有 worker 归位。
type TaskQueue struct {
tasks chan *Task
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewTaskQueue(workers int) TaskQueue {
ctx, cancel := context.WithCancel(context.Background())
q := &TaskQueue{
tasks: make(chan Task, 1024), // 缓冲很重要
ctx: ctx,
cancel: cancel,
}
for i := 0; i < workers; i++ {
q.wg.Add(1)
go q.worker()
}
return q
}
func (q *TaskQueue) worker() {
defer q.wg.Done()
for {
select {
case task, ok := <-q.tasks:
if !ok {
return // chan closed
}
task.Do()
case <-q.ctx.Done():
return
}
}
}
func (q TaskQueue) Submit(task Task) bool {
select {
case q.tasks <- task:
return true
default:
return false // 队列满,拒绝
}
}
func (q *TaskQueue) Shutdown() {
close(q.tasks)
q.cancel()
q.wg.Wait()
}
注意点:

make(chan *Task, 1024) 必须设缓冲,否则 Submit 可能永远阻塞worker 中的 select 必须同时监听 q.tasks 和 q.ctx.Done(),否则 Shutdown 时可能卡住Submit 用 select + default 实现非阻塞提交,避免调用方被拖慢chan
一旦任务要落盘、重启不丢、多实例共享,chan 就彻底失效。这时候必须换模型:
Redis 的 LPUSH/BRPOP 或 Redis Streams,配合 redigo 或 go-redis
RabbitMQ、Kafka 或云服务(如 AWS SQS),用官方 Go SDKbadger(KV)+ 内存 chan 双写,但复杂度陡增,建议先评估是否真需要强行把 chan 包装成“带持久化的队列”,最后都会变成 bug 温床:比如崩溃时内存任务丢失、重复投递、ACK 时机错乱。
context.Context?要,但别直接存 context.Context 字段。正确做法是每个任务在创建时绑定自己的 ctx,且该 ctx 应带超时或取消信号:
type Task struct {
ID string
Payload []byte
CreatedAt time.Time
ctx context.Context // 私有字段,不导出
}
func NewTask(payload []byte, timeout time.Duration) *Task {
ctx, _ := context.WithTimeout(context.Background(), timeout)
return &Task{
ID: uuid.New().String(),
Payload: payload,
CreatedAt: time.Now(),
ctx: ctx,
}
}
func (t Task) Do() {
select {
case <-time.After(5 time.Second):
// 模拟处理
case <-t.ctx.Done():
// 被取消或超时,直接返回
return
}
}
这样做的好处:
task.ctx.Cancel()(需改造为可访问)context,也不会因 worker ctx 取消而误杀还在跑的任务真正难的从来不是“怎么塞进队列”,而是“怎么定义任务边界、失败语义和上下文生命周期”。chan 只是管道,别指望它帮你管业务逻辑。