[翻译]绝妙的 channel

在编写 golang 程序的过程中,channel 会经常使用。本文对 channel 的使用的确很特别,同时也非常实用。

原文在此:http://dave.cheney.net/2013/04/30/curious-channels

————翻译分隔线————

绝妙的 channel

Go 编程语言中,channel 是一个闪耀的特性。它提供了一种强大的、在不使用锁或临界区的情况下,从某个 goroutine 向其他 goroutine 发送数据流的方法。

今天我想讨论关于 channel 的两个重要的特性,这些特性不但使其在控制数据流方面极为有用,而且用在流程控制方面也十分有效。

一个已经被关闭的 channel 永远都不会阻塞

第一个特性,我想谈一谈已经被关闭的 channel。当一个 channel 一旦被关闭,就不能再向这个 channel 发送数据,不过你仍然可以尝试从 channel 中获取值。

package main

import "fmt"

func main() {
        ch := make(chan bool, 2)
        ch <- true
        ch <- true
        close(ch)

        for i := 0; i < cap(ch) +1 ; i++ {
                v, ok := <- ch
                fmt.Println(v, ok)
        }
}

在这个例子里,我们创建了一个缓冲区为两个值的 channel,填充缓冲区并且关闭掉它。

true true
true true
false false

执行这个程序,首先会向我们展示那两个发送到 channel 的值,然后第三次在 channel 上的尝试会返回 flase 和 false。第一个 false 是 channel 类型的零值,channel 的类型是 chan bool,那么就是 false。第二个表示 channel 的启用状态,当前是 false,表示 channel 被关闭。channel 会一直返回这些值。作为尝试,可以修改这个例子使其从 channel 里取 100 次值看看。

能够检测 channel 是否关闭是一个很有用的特性,可用于对 channel 进行 range 操作,并且当 channel 清空后退出循环。

package main

import "fmt"

func main() {
        ch := make(chan bool, 2)
        ch <- true
        ch <- true
        close(ch)

        for v := range ch {
                fmt.Println(v) // 被调用两次
        }
}

但是其真正的价值是与 select 联合时体现的。先从这个例子开始

package main

import (
        "fmt"
        "sync"
        "time"
)

func main() {
        finish := make(chan bool)
        var done sync.WaitGroup
        done.Add(1)
        go func() {
                select {
                case <-time.After(1 * time.Hour):
                case <-finish:
                }
                done.Done()
        }()
        t0 := time.Now()
        finish <- true // 发送关闭信号
        done.Wait()    // 等待 goroutine 结束
        fmt.Printf("Waited %v for goroutine to stop\n", time.Since(t0))
}

在我的系统上,这个程序的运行用了很短的等待延迟,因此很明显 goroutine 不会等待整整一个小时,然后调用 done.Done()

Waited 129.607us for goroutine to stop

但是这个程序里存在一些问题。首先是 finish channel 是不带缓冲的,因此如果接收方忘记在其 select 语句中添加 finish,向其发送数据可能会导致阻塞。可以通过对要发送到的 select 块进行封装,以确保不会阻塞,或者设置 finish channel 带有缓冲来解决这个问题。然而,如果有许多 goroutine 都监听在 finish channel 上,那就需要跟踪这个情况,并记得发送正确数量的数据到 finish channel。如果无法控制 goroutine 的创建会很棘手;同时它们也可能是由程序的另一部分来创建的,例如在响应网络请求的时候。

对于这个问题,一个很好的解决方案是利用已经被关闭的 channel 会实时返回这一机制。使用这个特性改写程序,现在包含了 100 个 goroutine,而无需跟踪 goroutine 生成的数量,或调整 finish channel 的大小。

package main

import (
        "fmt"
        "sync"
        "time"
)

func main() {
        const n = 100
        finish := make(chan bool)
        var done sync.WaitGroup
        for i := 0; i < n; i++ { 
                done.Add(1)
                go func() {
                        select {
                        case <-time.After(1 * time.Hour):
                        case <-finish:
                        }
                        done.Done()
                }()
        }
        t0 := time.Now()
        close(finish)    // 关闭 finish 使其立即返回
        done.Wait()      // 等待所有的 goroutine 结束
        fmt.Printf("Waited %v for %d goroutines to stop\n", time.Since(t0), n)
}

在我的系统上,它返回

Waited 231.385us for 100 goroutines to stop

那么这里发生了什么?当 finish channel 被关闭后,它会立刻返回。那么所有等待接收 time.After channel 或 finish 的 goroutine 的 select 语句就立刻完成了,并且 goroutine 在调用 done.Done() 来减少 WaitGroup 计数器后退出。这个强大的机制在无需知道未知数量的 goroutine 的任何细节而向它们发送信号而成为可能,同时也不用担心死锁。

在进入下一个话题前,再来看一个许多 Go 程序员都喜爱的简单示例。在上面的例子中,从未向 finish channel 发送数据,接受方也将收到的任何数据全部丢弃。因此将程序写成这样就很正常了:

package main

import (
        "fmt"
        "sync"
        "time"
)

func main() {
        finish := make(chan struct{})
        var done sync.WaitGroup
        done.Add(1)
        go func() {
                select {
                case <-time.After(1 * time.Hour):
                case <-finish:
                }
                done.Done()
        }()
        t0 := time.Now()
        close(finish)
        done.Wait()
        fmt.Printf("Waited %v for goroutine to stop\n", time.Since(t0))
}

当 close(finish) 依赖于关闭 channel 的消息机制,而没有数据收发时,将 finish 定义为 type chan struct{} 表示 channel 没有任何数据;只对其关闭的特性感兴趣。

一个 nil channel 永远都是阻塞的

我想谈的第二个特性正好与已经关闭的 channel 的特性正好相反。一个 nil channel;当 channel 的值尚未进行初始化或赋值为 nil 是,永远都是阻塞的。例如

package main

func main() {
        var ch chan bool
        ch <- true // 永远阻塞
}

当 ch 为 nil 时将会死锁,并且永远都不会发送数据。对于接收是一样的

package main

func main() {
        var ch chan bool
        <- ch // 永远阻塞
}

这看起来似乎并不怎么重要,但是当使用已经关闭的 channel 机制来等待多个 channel 关闭的时候,这确实是一个很有用的特性。例如

// WaitMany 等待 a 和 b 关闭。
func WaitMany(a, b chan bool) {
        var aclosed, bclosed bool
        for !aclosed || !bclosed {
                select {
                case <-a:
                        aclosed = true
                case <-b:
                        bclosed = true
                }
        }
}

WaitMany() 用于等待 channel a 和 b 关闭是个不错的方法,但是有一个问题。假设 channel a 首先被关闭,然后它会立刻返回。但是由于 bclosed 仍然是 false,程序会进入死循环,而让 channel b 永远不会被判定为关闭。

一个解决这个问题的安全的方法是利用 nil channel 的阻塞特性,并且将程序重写如下

package main

import (
        "fmt"
        "time"
)

func WaitMany(a, b chan bool) {
        for a != nil || b != nil {
                select {
                case <-a:
                        a = nil 
                case <-b:
                        b = nil
                }
        }
}

func main() {
        a, b := make(chan bool), make(chan bool)
        t0 := time.Now()
        go func() {
                close(a)
                close(b)
        }()
        WaitMany(a, b)
        fmt.Printf("waited %v for WaitMany\n", time.Since(t0))
}

在重写的 WaitMany() 中,一旦接收到一个值,就将 a 或 b 的引用设置为 nil。当 nil channel 是 select 语句的一部分时,它实际上会被忽略,因此,将 a 设置为 nil 便会将其从 select 中移除,仅仅留下 b 等待它被关闭,进而退出循环。

在我的系统上运行得到

waited 54.912us for WaitMany

总结来说,正是关闭和 nil channlechannel 这些特性非常简单,使得它们成为创建高并发的程序的强有力的构件。

Join the Conversation

7 Comments

  1. 代码有些问题吧

    我看原文没有错误,比如第一个代码

  2. nilchannel阻塞,即使chan初始化了,一个无缓冲信道在单goroutine里进行读写操作一定会阻塞当前过程

  3. 多谢了,一直不太清楚select的具体意义,现在才知道是阻塞的

Leave a comment

Your email address will not be published. Required fields are marked *