本文详解如何在 go 中安全实现“并发读取文件 + 并行处理行数据”,解决因通道关闭时机不当导致的死锁问题,通过分离读取、处理与聚合三阶段,并合理使用 goroutine 和 sync.waitgroup,构建健壮的无锁(mutex-free)流水线。
在 Go 中实现“并发读取文件并行处理”时,一个常见误区是将文件读取、任务分发和结果收集全部放在主 goroutine 中同步执行——这极易引发死锁。正如示例代码所示:主协程在 close(jobs) 后调用 wg.Wait(),但此时 worker 协程仍在阻塞等待 jobs 通道(虽已关闭,但需确保所有 worker 已退出),而主协程又未消费 results 通道,导致 results
核心原则:职责分离(Separation of Concerns)
应将流程拆分为三个独立并发阶段:
以下是修正后的完整实现(关键改动已加注释):
func telephoneNumbersInFile(path string) int {
file := strings.NewReader(path)
telephone := regexp.MustCompile(`\(\d+\)\s\d+-\d+`)
jobs := make(chan string, 100) // 建议缓冲,避免 producer 因 consumer 慢而阻塞
results := make(chan int, 100)
wg := new(sync.WaitGroup)
// 1️⃣ 启动 Worker 池(3 个)
for w := 1; w <= 3; w++ {
wg.Add(1)
go matchTelephoneNumbers(jobs, results, wg, telephone)
}
// 2️⃣ 生产者:在新 goroutine 中读取文件并关闭 jobs
go func() {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
jobs <- scanner.Text()
}
close(jobs) // ✅ 关键:必须在 goroutine 中关闭,避免阻塞主流程
}()
// 3️⃣ 收集器:等待所有 worker 结束后关闭 results
go func() {
wg.Wait()
close(results) // ✅ 确保 results 可被 range 安全遍历
}()
// 4️⃣ 主 goroutine:安全聚合结果
counts := 0
for v := range results { // ✅ range 自动在 channe
l 关闭后退出
counts += v
}
return counts
}
func matchTelephoneNumbers(jobs <-chan string, results chan<- int, wg *sync.WaitGroup, telephone *regexp.Regexp) {
defer wg.Done()
for line := range jobs { // ✅ range 自动在 jobs 关闭后退出
if telephone.MatchString(line) {
results <- 1
}
}
}关键注意事项:
此模式是 Go 中构建高并发 I/O 处理流水线的标准范式,适用于日志分析、ETL、配置解析等场景。掌握它,你便拥有了驾驭 Go 并发本质的一把关键钥匙。