← 返回首页
Go 多并发控制 -- errgroup 使用
errgroup 是 Go 官方提供的一个并发控制工具包(golang.org/x/sync/errgroup),用于管理一组 goroutine,并在任意一个 goroutine 返回错误时,取消其余 goroutine 的执行。它结合了 sync.WaitGroup 和 context.Context 的功能,非常适合需要“一错全停”的并发场景。
✅ 核心特性
| 特性 | 说明 |
|---|---|
| 错误传播 | 任一 goroutine 返回非 nil error,整个组立即失败 |
| 上下文取消 | 自动创建带取消功能的 context,可传递给子任务 |
| 等待所有完成 | 调用 Wait() 阻塞直到所有 goroutine 结束 |
| 简洁 API | 只有 Go(), Wait() 两个核心方法 |
📦 安装
go get golang.org/x/sync/errgroup
⚠️ 注意:这是 x/sync,不是标准库,但由 Go 团队维护,非常稳定。
🌰 基本用法
场景:并发下载多个文件,任一失败则全部取消
package main
import (
"context"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func downloadFile(ctx context.Context, url string) error {
// 模拟下载耗时
select {
case <-time.After(2 * time.Second):
fmt.Printf("✅ 下载成功: %s\n", url)
return nil
case <-ctx.Done():
fmt.Printf("❌ 被取消: %s\n", url)
return ctx.Err()
}
}
func main() {
urls := []string{
"https://example.com/file1.zip",
"https://example.com/file2.zip",
"https://example.com/file3.zip",
}
// 创建带 errgroup 的 context
g, ctx := errgroup.WithContext(context.Background())
for _, url := range urls {
url := url // 闭包捕获
g.Go(func() error {
return downloadFile(ctx, url)
})
}
// 等待所有任务完成或任一失败
if err := g.Wait(); err != nil {
fmt.Printf("⚠️ 任务失败: %v\n", err)
} else {
fmt.Println("🎉 所有文件下载成功!")
}
}
输出(如果一切正常):
✅ 下载成功: https://example.com/file1.zip
✅ 下载成功: https://example.com/file2.zip
✅ 下载成功: https://example.com/file3.zip
🎉 所有文件下载成功!
🔧 两种使用方式
1. 带 Context 的 errgroup(推荐)
g, ctx := errgroup.WithContext(context.Background())
- 自动创建 context.WithCancel()
- 任一 goroutine 返回 error → 自动调用 cancel()
- 其他 goroutine 可通过 ctx.Done() 感知取消
✅ 适用于需要响应取消信号的任务(如 HTTP 请求、数据库操作、文件 I/O)
2. 不带 Context 的 errgroup
var g errgroup.Group
- 不提供 context
- 任一 goroutine 返回 error → 其他 goroutine 继续运行,但
Wait()会立即返回 error - 无法主动取消正在运行的 goroutine
⚠️ 仅适用于“无状态、不可取消”的任务(如纯计算)
🛠 实际应用场景
场景 1:微服务并行调用
func fetchUserData(ctx context.Context, userID string) (*User, error) {
g, ctx := errgroup.WithContext(ctx)
var profile *Profile
var orders []Order
var settings *Settings
g.Go(func() error {
var err error
profile, err = fetchProfile(ctx, userID)
return err
})
g.Go(func() error {
var err error
orders, err = fetchOrders(ctx, userID)
return err
})
g.Go(func() error {
var err error
settings, err = fetchSettings(ctx, userID)
return err
})
if err := g.Wait(); err != nil {
return nil, err // 任一失败,整体失败
}
return &User{Profile: profile, Orders: orders, Settings: settings}, nil
}
场景 2:批量处理 + 错误收集(变通)
errgroup 默认“一错就停”,但有时我们想收集所有错误。
// 方案:用 channel 收集错误,最后统一判断
func processAll(items []Item) error {
errs := make(chan error, len(items))
g := new(errgroup.Group)
for _, item := range items {
item := item
g.Go(func() error {
if err := process(item); err != nil {
errs <- err // 发送错误
}
return nil // 始终返回 nil,避免提前 cancel
})
}
g.Wait()
close(errs)
// 检查是否有错误
for err := range errs {
return err // 或收集所有错误
}
return nil
}
💡 更优雅的方式:使用 multierror 包收集多个错误。
⚠️ 常见陷阱
❌ 1. 忘记闭包变量捕获
// 错误写法
for _, url := range urls {
g.Go(func() error {
return download(url) // 所有 goroutine 共享同一个 url(循环结束后的值)
})
}
// 正确写法
for _, url := range urls {
url := url // 创建新变量
g.Go(func() error {
return download(url)
})
}
❌ 2. 在 Go 函数中 panic
errgroup 不会捕获 panic!必须自行 recover。
g.Go(func() error {
defer func() {
if r := recover(); r != nil {
// 记录日志并返回 error
err = fmt.Errorf("panic: %v", r)
}
}()
// ...
})
❌ 3. 忽略 context 取消
即使使用了 errgroup.WithContext,你的任务函数也必须监听 ctx.Done(),否则无法及时停止。
// 好:检查 ctx.Done()
select {
case result := <-workCh:
// 处理
case <-ctx.Done():
return ctx.Err()
}
// 坏:无限循环不检查 context
for {
doWork() // 无法被取消!
}
✅ 与 sync.WaitGroup 对比
| 功能 | sync.WaitGroup |
errgroup.Group |
|---|---|---|
| 等待 goroutine 完成 | ✅ | ✅ |
| 传播错误 | ❌ | ✅ |
| 自动取消机制 | ❌ | ✅(配合 context) |
| API 简洁性 | 需手动管理 error | 内置错误处理 |
🌟 结论:
只要涉及错误处理的并发任务,优先使用errgroup。
✅ 总结
| 问题 | 答案 |
|---|---|
| 什么时候用? | 需要“并发执行 + 一错全停”的场景 |
| 是否替代 WaitGroup? | 是,当需要错误处理时 |
| 必须用 context 吗? | 推荐用 errgroup.WithContext,除非任务不可取消 |
| 能收集多个错误吗? | 默认不能,需配合 channel 或 multierror |
💡 最佳实践:
g, ctx := errgroup.WithContext(parentCtx) // 启动 goroutine if err := g.Wait(); err != nil { return err }
errgroup 是 Go 并发编程中简洁、安全、高效的利器,尤其适合微服务、批处理、数据聚合等场景。