Channel 底层实现
"不要通过共享内存来通信,而要通过通信来共享内存。" — Go 谚语。Channel 是 Go 并发的核心原语。
Channel 的底层结构
go
// runtime/chan.go 中的 hchan 结构(简化)
type hchan struct {
qcount uint // 当前队列中的元素数
dataqsiz uint // 环形缓冲区大小(cap)
buf unsafe.Pointer // 环形缓冲区指针
elemsize uint16 // 元素大小
closed uint32 // 是否已关闭
elemtype *_type // 元素类型
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 等待接收的 goroutine 队列
sendq waitq // 等待发送的 goroutine 队列
lock mutex // 互斥锁
}三种 Channel 类型
go
// 1. 无缓冲 channel(同步)
ch1 := make(chan int)
// 发送方阻塞,直到接收方就绪(反之亦然)
// 用于 goroutine 同步
// 2. 有缓冲 channel(异步)
ch2 := make(chan int, 10)
// 缓冲区未满时发送不阻塞
// 缓冲区为空时接收阻塞
// 3. nil channel(特殊)
var ch3 chan int
// 发送和接收都永远阻塞
// 在 select 中用于禁用某个 case发送与接收的底层流程
发送(ch <- v)
1. 加锁
2. 如果 recvq 有等待的接收者 → 直接拷贝数据给接收者,唤醒它,解锁
3. 如果缓冲区有空间 → 数据写入缓冲区,解锁
4. 否则 → 当前 goroutine 加入 sendq,挂起(gopark),解锁接收(v := <-ch)
1. 加锁
2. 如果 sendq 有等待的发送者:
- 无缓冲:直接从发送者拷贝数据,唤醒发送者
- 有缓冲:从缓冲区取数据,将发送者数据写入缓冲区,唤醒发送者
3. 如果缓冲区有数据 → 从缓冲区取数据,解锁
4. 否则 → 当前 goroutine 加入 recvq,挂起,解锁常用模式
信号通知
go
// 用 struct{} 节省内存(零大小类型)
done := make(chan struct{})
go func() {
doWork()
close(done) // 广播通知所有等待者
}()
<-done // 等待完成生产者-消费者
go
func producer(ch chan<- int, n int) { // 只写 channel
defer close(ch)
for i := 0; i < n; i++ {
ch <- i
}
}
func consumer(ch <-chan int) { // 只读 channel
for v := range ch { // range 在 channel 关闭后自动退出
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 10)
go producer(ch, 100)
consumer(ch)
}Fan-out(一对多)
go
func fanOut(input <-chan int, outputs ...chan<- int) {
go func() {
defer func() {
for _, out := range outputs {
close(out)
}
}()
for v := range input {
for _, out := range outputs {
out <- v
}
}
}()
}Fan-in(多对一合并)
go
func fanIn(channels ...<-chan int) <-chan int {
merged := make(chan int, 10)
var wg sync.WaitGroup
forward := func(ch <-chan int) {
defer wg.Done()
for v := range ch {
merged <- v
}
}
wg.Add(len(channels))
for _, ch := range channels {
go forward(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}Pipeline(流水线)
go
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// 组合流水线
c := generate(2, 3, 4, 5)
out := square(square(c))
for v := range out {
fmt.Println(v) // 16, 81, 256, 625
}
}select 语句
go
// select 随机选择一个就绪的 case
select {
case v := <-ch1:
fmt.Println("ch1:", v)
case ch2 <- 42:
fmt.Println("发送到 ch2")
case <-time.After(time.Second):
fmt.Println("超时")
default:
fmt.Println("没有就绪的 case") // 非阻塞
}超时控制
go
func fetchWithTimeout(url string, timeout time.Duration) (string, error) {
result := make(chan string, 1)
errCh := make(chan error, 1)
go func() {
resp, err := http.Get(url)
if err != nil {
errCh <- err
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
result <- string(body)
}()
select {
case r := <-result:
return r, nil
case err := <-errCh:
return "", err
case <-time.After(timeout):
return "", fmt.Errorf("请求超时")
}
}用 nil channel 动态禁用 case
go
func merge(a, b <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for a != nil || b != nil {
select {
case v, ok := <-a:
if !ok {
a = nil // 禁用这个 case
continue
}
out <- v
case v, ok := <-b:
if !ok {
b = nil // 禁用这个 case
continue
}
out <- v
}
}
}()
return out
}Channel 关闭规则
go
// 规则:只有发送方应该关闭 channel
// 关闭已关闭的 channel → panic
// 向已关闭的 channel 发送 → panic
// 从已关闭的 channel 接收 → 返回零值,ok=false
ch := make(chan int, 3)
ch <- 1
ch <- 2
close(ch)
// 安全接收
for v := range ch {
fmt.Println(v) // 1, 2
}
// 检查是否关闭
v, ok := <-ch
fmt.Println(v, ok) // 0 false
// 多个发送者时,用 sync.Once 安全关闭
type SafeChan struct {
ch chan int
once sync.Once
}
func (s *SafeChan) Close() {
s.once.Do(func() { close(s.ch) })
}性能考量
go
// 有缓冲 vs 无缓冲性能对比
// 无缓冲:每次发送都需要等待接收者,有上下文切换开销
// 有缓冲:批量处理,减少同步次数
// 高吞吐场景:使用有缓冲 channel + 批量处理
func batchProcess(input <-chan []byte) {
batch := make([][]byte, 0, 100)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
flush := func() {
if len(batch) > 0 {
processBatch(batch)
batch = batch[:0]
}
}
for {
select {
case item, ok := <-input:
if !ok {
flush()
return
}
batch = append(batch, item)
if len(batch) >= 100 {
flush()
}
case <-ticker.C:
flush()
}
}
}Channel vs Mutex
- 传递数据所有权 → Channel
- 保护共享状态 → Mutex
- 等待事件 → Channel
- 性能敏感的计数器 → atomic
不要为了"Go 风格"强行用 Channel,Mutex 在很多场景更简单高效。