goroutine & channel 深度解析
goroutine 是 Go 并发的基本单元,channel 是 goroutine 间通信的管道。两者结合,构成了 Go 独特的并发模型。
goroutine 的代价
go
// goroutine 非常轻量
// 初始栈:2KB(Go 1.4+),可动态增长到 1GB
// 创建开销:约 2-5 微秒
// 内存占用:约 2-8KB(初始)
// 可以轻松创建百万级 goroutine
func main() {
var wg sync.WaitGroup
for i := 0; i < 1_000_000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Second)
}()
}
wg.Wait()
// 内存约 2-4GB,但可以运行
}sync.WaitGroup — 等待一组 goroutine
go
func processItems(items []Item) error {
var (
wg sync.WaitGroup
mu sync.Mutex
errs []error
)
for _, item := range items {
wg.Add(1)
go func(item Item) {
defer wg.Done()
if err := process(item); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}(item) // 注意:传参避免闭包捕获问题
}
wg.Wait()
return errors.Join(errs...)
}并发安全的常见模式
Mutex 保护共享状态
go
type SafeCounter struct {
mu sync.Mutex
count map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.count[key]++
}
func (c *SafeCounter) Value(key string) int {
c.mu.RLock() // 读锁,允许并发读
defer c.mu.RUnlock()
return c.count[key]
}sync.Map — 并发安全的 Map
go
// 适合读多写少,或 key 集合稳定的场景
var m sync.Map
// 写
m.Store("key", "value")
// 读
if v, ok := m.Load("key"); ok {
fmt.Println(v.(string))
}
// 读或写(原子操作)
actual, loaded := m.LoadOrStore("key", "default")
// 删除
m.Delete("key")
// 遍历
m.Range(func(k, v interface{}) bool {
fmt.Println(k, v)
return true // 返回 false 停止遍历
})Once — 只执行一次
go
type Singleton struct {
db *sql.DB
}
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
db, _ := sql.Open("postgres", dsn)
instance = &Singleton{db: db}
})
return instance
}并发模式
Worker Pool(工作池)
go
type WorkerPool struct {
workers int
jobs chan func()
wg sync.WaitGroup
}
func NewWorkerPool(workers int) *WorkerPool {
p := &WorkerPool{
workers: workers,
jobs: make(chan func(), workers*10),
}
p.start()
return p
}
func (p *WorkerPool) start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for job := range p.jobs {
job()
}
}()
}
}
func (p *WorkerPool) Submit(job func()) {
p.jobs <- job
}
func (p *WorkerPool) Shutdown() {
close(p.jobs)
p.wg.Wait()
}
// 使用
pool := NewWorkerPool(runtime.NumCPU())
defer pool.Shutdown()
for _, url := range urls {
url := url // 捕获变量
pool.Submit(func() {
resp, err := http.Get(url)
// 处理响应
})
}带结果的并发
go
type Result[T any] struct {
Value T
Err error
}
func ConcurrentMap[T, R any](
ctx context.Context,
items []T,
concurrency int,
fn func(context.Context, T) (R, error),
) ([]R, error) {
sem := make(chan struct{}, concurrency)
results := make([]Result[R], len(items))
var wg sync.WaitGroup
for i, item := range items {
wg.Add(1)
go func(i int, item T) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
v, err := fn(ctx, item)
results[i] = Result[R]{Value: v, Err: err}
}(i, item)
}
wg.Wait()
// 收集结果
var (
values []R
errs []error
)
for _, r := range results {
if r.Err != nil {
errs = append(errs, r.Err)
} else {
values = append(values, r.Value)
}
}
return values, errors.Join(errs...)
}超时与取消
go
func doWithTimeout(ctx context.Context, timeout time.Duration, fn func() error) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
done := make(chan error, 1)
go func() {
done <- fn()
}()
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}常见并发陷阱
数据竞争
go
// ❌ 数据竞争
var counter int
for i := 0; i < 1000; i++ {
go func() {
counter++ // 并发写,未加锁
}()
}
// ✅ 使用 atomic
var counter int64
for i := 0; i < 1000; i++ {
go func() {
atomic.AddInt64(&counter, 1)
}()
}
// 检测数据竞争
// go test -race ./...
// go run -race main.go闭包捕获循环变量
go
// ❌ 所有 goroutine 共享同一个 i
for i := 0; i < 5; i++ {
go func() {
fmt.Println(i) // 可能全部打印 5
}()
}
// ✅ 方式1:传参
for i := 0; i < 5; i++ {
go func(i int) {
fmt.Println(i)
}(i)
}
// ✅ 方式2:局部变量(Go 1.22+ 循环变量已修复此问题)
for i := 0; i < 5; i++ {
i := i // 创建新变量
go func() {
fmt.Println(i)
}()
}并发设计原则
- 优先使用 channel 传递数据所有权
- 用 Mutex 保护共享状态,不要用 channel 模拟锁
- 始终用
-race标志运行测试 - 用 context 控制 goroutine 生命周期
- goroutine 泄漏是最常见的 Go 性能问题