Skip to content

goroutine & channel 深度解析

goroutine 是 Go 并发的基本单元,channel 是 goroutine 间通信的管道。两者结合,构成了 Go 独特的并发模型。

goroutine 的代价

go
// goroutine 非常轻量
// 初始栈:2KB(Go 1.4+),可动态增长到 1GB
// 创建开销:约 2-5 微秒
// 内存占用:约 2-8KB(初始)

// 可以轻松创建百万级 goroutine
func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1_000_000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            time.Sleep(time.Second)
        }()
    }
    wg.Wait()
    // 内存约 2-4GB,但可以运行
}

sync.WaitGroup — 等待一组 goroutine

go
func processItems(items []Item) error {
    var (
        wg   sync.WaitGroup
        mu   sync.Mutex
        errs []error
    )

    for _, item := range items {
        wg.Add(1)
        go func(item Item) {
            defer wg.Done()
            if err := process(item); err != nil {
                mu.Lock()
                errs = append(errs, err)
                mu.Unlock()
            }
        }(item)  // 注意:传参避免闭包捕获问题
    }

    wg.Wait()
    return errors.Join(errs...)
}

并发安全的常见模式

Mutex 保护共享状态

go
type SafeCounter struct {
    mu    sync.Mutex
    count map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count[key]++
}

func (c *SafeCounter) Value(key string) int {
    c.mu.RLock()  // 读锁,允许并发读
    defer c.mu.RUnlock()
    return c.count[key]
}

sync.Map — 并发安全的 Map

go
// 适合读多写少,或 key 集合稳定的场景
var m sync.Map

// 写
m.Store("key", "value")

// 读
if v, ok := m.Load("key"); ok {
    fmt.Println(v.(string))
}

// 读或写(原子操作)
actual, loaded := m.LoadOrStore("key", "default")

// 删除
m.Delete("key")

// 遍历
m.Range(func(k, v interface{}) bool {
    fmt.Println(k, v)
    return true  // 返回 false 停止遍历
})

Once — 只执行一次

go
type Singleton struct {
    db *sql.DB
}

var (
    instance *Singleton
    once     sync.Once
)

func GetInstance() *Singleton {
    once.Do(func() {
        db, _ := sql.Open("postgres", dsn)
        instance = &Singleton{db: db}
    })
    return instance
}

并发模式

Worker Pool(工作池)

go
type WorkerPool struct {
    workers int
    jobs    chan func()
    wg      sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    p := &WorkerPool{
        workers: workers,
        jobs:    make(chan func(), workers*10),
    }
    p.start()
    return p
}

func (p *WorkerPool) start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func() {
            defer p.wg.Done()
            for job := range p.jobs {
                job()
            }
        }()
    }
}

func (p *WorkerPool) Submit(job func()) {
    p.jobs <- job
}

func (p *WorkerPool) Shutdown() {
    close(p.jobs)
    p.wg.Wait()
}

// 使用
pool := NewWorkerPool(runtime.NumCPU())
defer pool.Shutdown()

for _, url := range urls {
    url := url  // 捕获变量
    pool.Submit(func() {
        resp, err := http.Get(url)
        // 处理响应
    })
}

带结果的并发

go
type Result[T any] struct {
    Value T
    Err   error
}

func ConcurrentMap[T, R any](
    ctx context.Context,
    items []T,
    concurrency int,
    fn func(context.Context, T) (R, error),
) ([]R, error) {
    sem := make(chan struct{}, concurrency)
    results := make([]Result[R], len(items))
    var wg sync.WaitGroup

    for i, item := range items {
        wg.Add(1)
        go func(i int, item T) {
            defer wg.Done()
            sem <- struct{}{}
            defer func() { <-sem }()

            v, err := fn(ctx, item)
            results[i] = Result[R]{Value: v, Err: err}
        }(i, item)
    }

    wg.Wait()

    // 收集结果
    var (
        values []R
        errs   []error
    )
    for _, r := range results {
        if r.Err != nil {
            errs = append(errs, r.Err)
        } else {
            values = append(values, r.Value)
        }
    }
    return values, errors.Join(errs...)
}

超时与取消

go
func doWithTimeout(ctx context.Context, timeout time.Duration, fn func() error) error {
    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    done := make(chan error, 1)
    go func() {
        done <- fn()
    }()

    select {
    case err := <-done:
        return err
    case <-ctx.Done():
        return ctx.Err()
    }
}

常见并发陷阱

数据竞争

go
// ❌ 数据竞争
var counter int
for i := 0; i < 1000; i++ {
    go func() {
        counter++  // 并发写,未加锁
    }()
}

// ✅ 使用 atomic
var counter int64
for i := 0; i < 1000; i++ {
    go func() {
        atomic.AddInt64(&counter, 1)
    }()
}

// 检测数据竞争
// go test -race ./...
// go run -race main.go

闭包捕获循环变量

go
// ❌ 所有 goroutine 共享同一个 i
for i := 0; i < 5; i++ {
    go func() {
        fmt.Println(i)  // 可能全部打印 5
    }()
}

// ✅ 方式1:传参
for i := 0; i < 5; i++ {
    go func(i int) {
        fmt.Println(i)
    }(i)
}

// ✅ 方式2:局部变量(Go 1.22+ 循环变量已修复此问题)
for i := 0; i < 5; i++ {
    i := i  // 创建新变量
    go func() {
        fmt.Println(i)
    }()
}

并发设计原则

  1. 优先使用 channel 传递数据所有权
  2. 用 Mutex 保护共享状态,不要用 channel 模拟锁
  3. 始终用 -race 标志运行测试
  4. 用 context 控制 goroutine 生命周期
  5. goroutine 泄漏是最常见的 Go 性能问题

本站内容由 褚成志 整理编写,仅供学习参考