worker pool 的核心价值是可控并发,通过固定数量 worker 从共享通道取任务实现限流与复用;需用 sync.WaitGroup 和 done 通道实现优雅退出,避免任务丢失。
go f() 直接起 goroutine直接用 go f() 启动大量任务,容易导致 goroutine 泛滥,内存暴涨甚至 OOM。尤其当任务来自网络请求、文件读取或数据库查询时,数量不可控。worker pool 的核心价值不是“并发”,而是“可控并发”——把任务排队、限流、复用 goroutine。
go handle(req),QPS 上千时可能创建上万个 goroutinechan *Task 拿任务,处理完继续取下一个runtime.NumCPU()
通道类型和关闭时机决定 worker 是否能干净退出。别用 chan Task(值拷贝开销大),优先用 chan *Task 或 chan func();退出不能靠 panic 或 os.Exit,必须支持 graceful shutdown。
chan interface{} 灵活但 lose type safety,推荐 chan Job(定义具体接口或结构体)
退出sync.WaitGroup 计数活跃 worker,配合 done chan struct{} 通知停止取新任务close(jobChan) 后没等 worker 处理完剩余任务就退出,导致任务丢失workerPool.Run() 的典型实现要点一个健壮的 Run() 方法要处理启动、任务分发、错误传播、超时控制和回收。它不该阻塞主流程,但要提供同步等待入口(如 Wait())。
func (p *WorkerPool) Run() {
for i := 0; i < p.workers; i++ {
go func() {
defer p.wg.Done()
for {
select {
case job, ok := <-p.jobs:
if !ok {
return // 通道关闭,退出
}
job.Do()
case <-p.done:
return // 强制退出
}
}
}()
}
}jobs 通道建议带缓冲(如 make(chan Job, 1000)),避免生产者阻塞defer p.wg.Done(),否则 Wait() 永远不返回job.ResultChan 或回调函数
真实场景下,worker pool 很少只跑“理想任务”。超时、重试、优先级、任务取消这些需求一加,逻辑复杂度指数上升。
ctx 传进 job 结构体,并在 Do() 中 select 判断 ctx.Done()
defer func(){ if r := recover(); r != nil { p.errCh
worker-3),否则并发日志无法归因最麻烦的从来不是启动一堆 goroutine,而是让它们在出错、中断、扩容、监控全链路里都保持可观察、可终止、不丢任务。写完 Run() 只是开始,压测时看 goroutine 数是否稳定、pprof 查 block 链路、日志查 timeout 分布,这些才决定 pool 是否真可用。