← 返回首页

Go 多并发控制 -- errgroup 使用

errgroup 是 Go 官方提供的一个并发控制工具包(golang.org/x/sync/errgroup),用于管理一组 goroutine,并在任意一个 goroutine 返回错误时,取消其余 goroutine 的执行。它结合了 sync.WaitGroup 和 context.Context 的功能,非常适合需要“一错全停”的并发场景。


✅ 核心特性

特性 说明
错误传播 任一 goroutine 返回非 nil error,整个组立即失败
上下文取消 自动创建带取消功能的 context,可传递给子任务
等待所有完成 调用 Wait() 阻塞直到所有 goroutine 结束
简洁 API 只有 Go(), Wait() 两个核心方法

📦 安装

go get golang.org/x/sync/errgroup

⚠️ 注意:这是 x/sync,不是标准库,但由 Go 团队维护,非常稳定。


🌰 基本用法

场景:并发下载多个文件,任一失败则全部取消

package main

import (
    "context"
    "fmt"
    "time"

    "golang.org/x/sync/errgroup"
)

func downloadFile(ctx context.Context, url string) error {
    // 模拟下载耗时
    select {
    case <-time.After(2 * time.Second):
        fmt.Printf("✅ 下载成功: %s\n", url)
        return nil
    case <-ctx.Done():
        fmt.Printf("❌ 被取消: %s\n", url)
        return ctx.Err()
    }
}

func main() {
    urls := []string{
        "https://example.com/file1.zip",
        "https://example.com/file2.zip",
        "https://example.com/file3.zip",
    }

    // 创建带 errgroup 的 context
    g, ctx := errgroup.WithContext(context.Background())

    for _, url := range urls {
        url := url // 闭包捕获
        g.Go(func() error {
            return downloadFile(ctx, url)
        })
    }

    // 等待所有任务完成或任一失败
    if err := g.Wait(); err != nil {
        fmt.Printf("⚠️ 任务失败: %v\n", err)
    } else {
        fmt.Println("🎉 所有文件下载成功!")
    }
}

输出(如果一切正常):

✅ 下载成功: https://example.com/file1.zip
✅ 下载成功: https://example.com/file2.zip
✅ 下载成功: https://example.com/file3.zip
🎉 所有文件下载成功!

🔧 两种使用方式

1. 带 Context 的 errgroup(推荐)

g, ctx := errgroup.WithContext(context.Background())
  • 自动创建 context.WithCancel()
  • 任一 goroutine 返回 error → 自动调用 cancel()
  • 其他 goroutine 可通过 ctx.Done() 感知取消

适用于需要响应取消信号的任务(如 HTTP 请求、数据库操作、文件 I/O)


2. 不带 Context 的 errgroup

var g errgroup.Group
  • 不提供 context
  • 任一 goroutine 返回 error → 其他 goroutine 继续运行,但 Wait() 会立即返回 error
  • 无法主动取消正在运行的 goroutine

⚠️ 仅适用于“无状态、不可取消”的任务(如纯计算)


🛠 实际应用场景

场景 1:微服务并行调用

func fetchUserData(ctx context.Context, userID string) (*User, error) {
    g, ctx := errgroup.WithContext(ctx)

    var profile *Profile
    var orders []Order
    var settings *Settings

    g.Go(func() error {
        var err error
        profile, err = fetchProfile(ctx, userID)
        return err
    })

    g.Go(func() error {
        var err error
        orders, err = fetchOrders(ctx, userID)
        return err
    })

    g.Go(func() error {
        var err error
        settings, err = fetchSettings(ctx, userID)
        return err
    })

    if err := g.Wait(); err != nil {
        return nil, err // 任一失败,整体失败
    }

    return &User{Profile: profile, Orders: orders, Settings: settings}, nil
}

场景 2:批量处理 + 错误收集(变通)

errgroup 默认“一错就停”,但有时我们想收集所有错误

// 方案:用 channel 收集错误,最后统一判断
func processAll(items []Item) error {
    errs := make(chan error, len(items))
    g := new(errgroup.Group)

    for _, item := range items {
        item := item
        g.Go(func() error {
            if err := process(item); err != nil {
                errs <- err // 发送错误
            }
            return nil // 始终返回 nil,避免提前 cancel
        })
    }

    g.Wait()
    close(errs)

    // 检查是否有错误
    for err := range errs {
        return err // 或收集所有错误
    }
    return nil
}

💡 更优雅的方式:使用 multierror 包收集多个错误。


⚠️ 常见陷阱

❌ 1. 忘记闭包变量捕获

// 错误写法
for _, url := range urls {
    g.Go(func() error {
        return download(url) // 所有 goroutine 共享同一个 url(循环结束后的值)
    })
}

// 正确写法
for _, url := range urls {
    url := url // 创建新变量
    g.Go(func() error {
        return download(url)
    })
}

❌ 2. 在 Go 函数中 panic

errgroup 不会捕获 panic!必须自行 recover。

g.Go(func() error {
    defer func() {
        if r := recover(); r != nil {
            // 记录日志并返回 error
            err = fmt.Errorf("panic: %v", r)
        }
    }()
    // ...
})

❌ 3. 忽略 context 取消

即使使用了 errgroup.WithContext,你的任务函数也必须监听 ctx.Done(),否则无法及时停止。

// 好:检查 ctx.Done()
select {
case result := <-workCh:
    // 处理
case <-ctx.Done():
    return ctx.Err()
}

// 坏:无限循环不检查 context
for {
    doWork() // 无法被取消!
}

✅ 与 sync.WaitGroup 对比

功能 sync.WaitGroup errgroup.Group
等待 goroutine 完成
传播错误
自动取消机制 ✅(配合 context)
API 简洁性 需手动管理 error 内置错误处理

🌟 结论
只要涉及错误处理的并发任务,优先使用 errgroup


✅ 总结

问题 答案
什么时候用? 需要“并发执行 + 一错全停”的场景
是否替代 WaitGroup? 是,当需要错误处理时
必须用 context 吗? 推荐用 errgroup.WithContext,除非任务不可取消
能收集多个错误吗? 默认不能,需配合 channel 或 multierror

💡 最佳实践

g, ctx := errgroup.WithContext(parentCtx)
// 启动 goroutine
if err := g.Wait(); err != nil {
    return err
}

errgroup 是 Go 并发编程中简洁、安全、高效的利器,尤其适合微服务、批处理、数据聚合等场景。