必须用 sync.WaitGroup 等待或 channel + range + close() 控制退出,否则主 goroutine 提前结束导致数据丢失;管道串联时需各阶段独立启动 goroutine 并关闭输出 channel,下游用 for range 消费。
因为没有同步机制,主 goroutine 在子任务完成前就退出,导致 runtime.Goexit 或程序直接结束。常见现象是:输入 100 条数据,输出只有 0–3 条,且每次运行结果不一致。
sync.WaitGroup 显式等待所有清洗 goroutine 结束range 配合 close() 控制消费端退出典型死锁场景:cleanStage1 向 stage2Ch 发送数据,但 cleanStage2 还没启动或已提前退出,发送方卡住;或者多个 stage 共用一个未关闭的 channel,range 永不退出。
close(outputCh)
for val := range inputCh,不能用 for { select { case v := 无限循环
func cleanStage1(in <-chan string, out chan<- int) { defer close(out) for line := range in { if len(line) == 0 { continue } if n, err := strconv.Atoi(line); err == nil { out <- n * 2 // 示例:数值翻倍 } } }
并发清洗中 goroutine 不退出,内存持续上涨,最终 OOM。最常发生在错误的 channel 使用模式上。
→ goroutine 永久挂起,无法被调度器回收
time.After 做超时但没配合 select default 分支,导致 channel 接收永远等下去取决于清洗耗时是否波动大、各阶段吞吐是否均衡。不加缓冲不是错,但容易暴露性能瓶颈。
make(chan T, 100),防瞬时积压阻塞上游runtime.NumCPU() 的 2–4 倍,避免线程切换开销过大make(chan T, 0)(即无缓冲)串联多个 CPU 密集 stage,这等于强制串行化缓冲太大也不行,比如设成 10000,可能把本该失败的脏数据全缓存住,延迟报错,掩盖数据质量问题。