Skip to content

Channel 底层实现

"不要通过共享内存来通信,而要通过通信来共享内存。" — Go 谚语。Channel 是 Go 并发的核心原语。

Channel 的底层结构

go
// runtime/chan.go 中的 hchan 结构(简化)
type hchan struct {
    qcount   uint           // 当前队列中的元素数
    dataqsiz uint           // 环形缓冲区大小(cap)
    buf      unsafe.Pointer // 环形缓冲区指针
    elemsize uint16         // 元素大小
    closed   uint32         // 是否已关闭
    elemtype *_type         // 元素类型
    sendx    uint           // 发送索引
    recvx    uint           // 接收索引
    recvq    waitq          // 等待接收的 goroutine 队列
    sendq    waitq          // 等待发送的 goroutine 队列
    lock     mutex          // 互斥锁
}

三种 Channel 类型

go
// 1. 无缓冲 channel(同步)
ch1 := make(chan int)
// 发送方阻塞,直到接收方就绪(反之亦然)
// 用于 goroutine 同步

// 2. 有缓冲 channel(异步)
ch2 := make(chan int, 10)
// 缓冲区未满时发送不阻塞
// 缓冲区为空时接收阻塞

// 3. nil channel(特殊)
var ch3 chan int
// 发送和接收都永远阻塞
// 在 select 中用于禁用某个 case

发送与接收的底层流程

发送(ch <- v)

1. 加锁
2. 如果 recvq 有等待的接收者 → 直接拷贝数据给接收者,唤醒它,解锁
3. 如果缓冲区有空间 → 数据写入缓冲区,解锁
4. 否则 → 当前 goroutine 加入 sendq,挂起(gopark),解锁

接收(v := <-ch)

1. 加锁
2. 如果 sendq 有等待的发送者:
   - 无缓冲:直接从发送者拷贝数据,唤醒发送者
   - 有缓冲:从缓冲区取数据,将发送者数据写入缓冲区,唤醒发送者
3. 如果缓冲区有数据 → 从缓冲区取数据,解锁
4. 否则 → 当前 goroutine 加入 recvq,挂起,解锁

常用模式

信号通知

go
// 用 struct{} 节省内存(零大小类型)
done := make(chan struct{})

go func() {
    doWork()
    close(done)  // 广播通知所有等待者
}()

<-done  // 等待完成

生产者-消费者

go
func producer(ch chan<- int, n int) {  // 只写 channel
    defer close(ch)
    for i := 0; i < n; i++ {
        ch <- i
    }
}

func consumer(ch <-chan int) {  // 只读 channel
    for v := range ch {  // range 在 channel 关闭后自动退出
        fmt.Println(v)
    }
}

func main() {
    ch := make(chan int, 10)
    go producer(ch, 100)
    consumer(ch)
}

Fan-out(一对多)

go
func fanOut(input <-chan int, outputs ...chan<- int) {
    go func() {
        defer func() {
            for _, out := range outputs {
                close(out)
            }
        }()
        for v := range input {
            for _, out := range outputs {
                out <- v
            }
        }
    }()
}

Fan-in(多对一合并)

go
func fanIn(channels ...<-chan int) <-chan int {
    merged := make(chan int, 10)
    var wg sync.WaitGroup

    forward := func(ch <-chan int) {
        defer wg.Done()
        for v := range ch {
            merged <- v
        }
    }

    wg.Add(len(channels))
    for _, ch := range channels {
        go forward(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

Pipeline(流水线)

go
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // 组合流水线
    c := generate(2, 3, 4, 5)
    out := square(square(c))
    for v := range out {
        fmt.Println(v)  // 16, 81, 256, 625
    }
}

select 语句

go
// select 随机选择一个就绪的 case
select {
case v := <-ch1:
    fmt.Println("ch1:", v)
case ch2 <- 42:
    fmt.Println("发送到 ch2")
case <-time.After(time.Second):
    fmt.Println("超时")
default:
    fmt.Println("没有就绪的 case")  // 非阻塞
}

超时控制

go
func fetchWithTimeout(url string, timeout time.Duration) (string, error) {
    result := make(chan string, 1)
    errCh := make(chan error, 1)

    go func() {
        resp, err := http.Get(url)
        if err != nil {
            errCh <- err
            return
        }
        defer resp.Body.Close()
        body, _ := io.ReadAll(resp.Body)
        result <- string(body)
    }()

    select {
    case r := <-result:
        return r, nil
    case err := <-errCh:
        return "", err
    case <-time.After(timeout):
        return "", fmt.Errorf("请求超时")
    }
}

用 nil channel 动态禁用 case

go
func merge(a, b <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for a != nil || b != nil {
            select {
            case v, ok := <-a:
                if !ok {
                    a = nil  // 禁用这个 case
                    continue
                }
                out <- v
            case v, ok := <-b:
                if !ok {
                    b = nil  // 禁用这个 case
                    continue
                }
                out <- v
            }
        }
    }()
    return out
}

Channel 关闭规则

go
// 规则:只有发送方应该关闭 channel
// 关闭已关闭的 channel → panic
// 向已关闭的 channel 发送 → panic
// 从已关闭的 channel 接收 → 返回零值,ok=false

ch := make(chan int, 3)
ch <- 1
ch <- 2
close(ch)

// 安全接收
for v := range ch {
    fmt.Println(v)  // 1, 2
}

// 检查是否关闭
v, ok := <-ch
fmt.Println(v, ok)  // 0 false

// 多个发送者时,用 sync.Once 安全关闭
type SafeChan struct {
    ch   chan int
    once sync.Once
}

func (s *SafeChan) Close() {
    s.once.Do(func() { close(s.ch) })
}

性能考量

go
// 有缓冲 vs 无缓冲性能对比
// 无缓冲:每次发送都需要等待接收者,有上下文切换开销
// 有缓冲:批量处理,减少同步次数

// 高吞吐场景:使用有缓冲 channel + 批量处理
func batchProcess(input <-chan []byte) {
    batch := make([][]byte, 0, 100)
    ticker := time.NewTicker(10 * time.Millisecond)
    defer ticker.Stop()

    flush := func() {
        if len(batch) > 0 {
            processBatch(batch)
            batch = batch[:0]
        }
    }

    for {
        select {
        case item, ok := <-input:
            if !ok {
                flush()
                return
            }
            batch = append(batch, item)
            if len(batch) >= 100 {
                flush()
            }
        case <-ticker.C:
            flush()
        }
    }
}

Channel vs Mutex

  • 传递数据所有权 → Channel
  • 保护共享状态 → Mutex
  • 等待事件 → Channel
  • 性能敏感的计数器 → atomic

不要为了"Go 风格"强行用 Channel,Mutex 在很多场景更简单高效。

本站内容由 褚成志 整理编写,仅供学习参考