Golang:有趣的 channel 应用

严格意义上说,本文是我另外一片文章《Golang Funny: Play with Channel》的中文版本。不过,毕竟是用中文当母语的,所以就不翻译了,重新按照那个内容写过吧。

channel 是 golang 里相当有趣的一个功能,在我使用 golang 编码的经验里,大部分事件都会是在享受 channel 和 goroutine 配合的乐趣。所以本文主要介绍 channel 的一些有趣的用法。

这里有 Oling Cat 翻译的Go编程语言规范里关于 channel(信道)的描述:

信道提供了一种机制,它在两个并发执行的函数之间进行同步,并通过传递(与该信道元素类型相符的)值来进行通信。

这个个描述又乏味、又枯燥。在我第一次阅读的时候,完全不明白这到底是个什么玩意。事实上,可以认为 channel 是一个管道或者先进先出队列,非常简单且轻量。channel 并不是 Golang 首创的。它同样作为内置功能出现在其他语言中。在大多数情况下,它是一个又大、又笨、又复杂的消息队列系统的一个功能。

下面就来一起找点乐子吧!

最常见的方式:生产者/消费者

生产者产生一些数据将其放入 channel;然后消费者按照顺序,一个一个的从 channel 中取出这些数据进行处理。这是最常见的 channel 的使用方式。当 channel 的缓冲用尽时,生产者必须等待(阻塞)。换句话说,若是 channel 中没有数据,消费者就必须等待了。

这个例子的源代码在这里。最好下载到本地运行。

生产者

<br />
func producer(c chan int64, max int) {<br />
    defer close(c)<br />
    for i:= 0; i &lt; max; i ++ {<br />
        c &lt;- time.Now().Unix()<br />
    }<br />
}<br />

生产者生成“max”个 int64 的数字,并且将其放入 channel “c” 中。需要注意的是,这里用 defer 在函数推出的时候关闭了 channel。

消费者

<br />
func consumer(c chan int64) {<br />
    var v int64<br />
    ok := true<br />
    for ok {<br />
        if v, ok = &lt;-c; ok {<br />
            fmt.Println(v)<br />
        }<br />
    }<br />
}<br />

从 channel 中一个一个的读取 int64 的数字,然后将其打印在屏幕上。当 channel 被关闭后,变量“ok”将被设置为“false”。

自增长 ID 生成器

当生让产者可以顺序的生成整数。它就是一个自增长 ID 生成器。我将这个功能封装成了一个包。并将其代码托管在这里。使用示例可以参考这里的代码。

<br />
type AutoInc struct {<br />
    start, step int<br />
    queue chan int<br />
    running bool<br />
}</p>
<p>func New(start, step int) (ai *AutoInc) {<br />
    ai = &amp;AutoInc{<br />
        start: start,<br />
        step: step,<br />
        running: true,<br />
        queue: make(chan int, 4),<br />
    }<br />
    go ai.process()<br />
    return<br />
}</p>
<p>func (ai *AutoInc) process() {<br />
    defer func() {recover()}()<br />
    for i := ai.start; ai.running ; i=i+ai.step {<br />
        ai.queue &lt;- i<br />
    }<br />
}</p>
<p>func (ai *AutoInc) Id() int {<br />
    return &lt;-ai.queue<br />
}</p>
<p>func (ai *AutoInc) Close() {<br />
    ai.running = false<br />
    close(ai.queue)<br />
}<br />

信号量

信号量也是 channel 的一个有趣的应用。这里有一个来自“高效Go编程”的例子。你应当读过了吧?如果还没有,现在就开始读吧……

我在 Gearman 服务的 API 包 gearman-go 中使用了信号量。在 worker/worker.go 的 232 行,在并行的 Worker.exec 的数量达到 Worker.limit 时,将被阻塞。

<br />
var sem = make(chan int, MaxOutstanding)</p>
<p>func handle(r *Request) {<br />
    sem &lt;- 1    // 等待放行;<br />
    process(r)  // 可能需要一个很长的处理过程;<br />
    &lt;-sem       // 完成,放行另一个过程。<br />
}</p>
<p>func Serve(queue chan *Request) {<br />
    for {<br />
        req := &lt;-queue<br />
        go handle(req)  // 无需等待 handle 完成。<br />
    }<br />
}<br />

随机序列生成器

当然可以修改自增长 ID 生成器。让生产者生成随机数放入 channel。不过这挺无聊的,不是吗?

这里是随机序列生成器的另一个实现。灵感来自语言规范。它会随机的生成 0/1 序列:

<br />
func producer(c chan int64, max int) {<br />
    defer close(c)<br />
    for i:= 0; i &lt; max; i ++ {<br />
        select { // randomized select<br />
            case c &lt;- 0:<br />
            case c &lt;- 1:<br />
        }<br />
    }<br />
}<br />

超时定时器

当一个 channel 被 read/write 阻塞时,它会被永远阻塞下去,直到 channel 被关闭,这时会产生一个 panic。channel 没有内建用于超时的定时器。并且似乎也没有计划向 channel 添加一个这样的功能。但在大多数情况下,我们需要一个超时机制。例如,由于生产者执行的时候发生了错误,所以没有向 channel 放入数据。消费者会被阻塞到 channel 被关闭。每次出错都关闭 channel?这绝对不是一个好主意。

这里有一个解决方案:

<br />
    c := make(chan int64, 5)<br />
    defer close(c)<br />
    timeout := make(chan bool)<br />
    defer close(timeout)<br />
    go func() {<br />
        time.Sleep(time.Second) // 等一秒<br />
        timeout &lt;- true // 向超时队列中放入标志<br />
    }()<br />
    select {<br />
        case &lt;-timeout: // 超时<br />
            fmt.Println(&quot;timeout...&quot;)<br />
        case &lt;-c: // 收到数据<br />
            fmt.Println(&quot;Read a date.&quot;)<br />
    }<br />

你注意到 select 语句了吗?哪个 channel 先有数据,哪个分支先执行。因此……还需要更多的解释吗?

这同样被使用在gearman-go 的客户端 API 实现中,第 238 行。

在本文的英文版本发布后,@mjq 提醒我说可以用 time.After。在项目中,这确实是更好的写法。我得向他道谢!同时我也阅读了 src/pkg/time/sleep.go 第 74 行,time.After 的实现。其内部实现与上面的代码完全一致。

还有更多……

上面提到的各种有趣的应用当然也可以在其他消息队列中实现,不过由于 channel 的简单和轻量,使得 golang 的 channel 来实现这些有趣的功能具有实际意义,并有真实的应用场景。其实,我觉得有趣的 channel 用法远不止这些。如果你发现了其他有趣的玩法,请务必告诉我。谢谢啦!

9 thoughts on “Golang:有趣的 channel 应用”

  1. 如果用多个 goroutine (CPU数调大于一)给一个 chan 发信,可能同时吗?(在 select 部分提到多个 chan 可能同时)会发生什么事?

  2. 要是有点 “图” 就完美了.
    虽然貌似会在coding中用chan,但还是缺乏一个”直观”的理解.
    几张chan图配合code或许理解更easy更彻底.

  3. 不会在同时有两个值进入 chan,一定会有先后顺序。select 多个 chan 同时有数据来的时候,实际上是会随机触发某个分支的执行。这也是本文中随机序列的保证随机的原理所在。

  4. 认真读了一遍,绝对是好文!对select随机选择执行的运用真是点睛之笔!不过貌似躺着中枪了= =?

    A channel provides a mechanism for two concurrently executing functions to synchronize execution and communicate by passing a value of a specified element type.

    原文很好理解,但译成中文就….给出另一个不太精确但依然足够严谨的版本:
    信道提供了一种机制,它在两个并发执行的函数之间进行同步,并通过传递(与该信道元素类型相符的)值来进行通信。

    这点在《The Go Memory Model》中给出了更精确严谨的形式化定义,有兴趣的话可以读一下。

    另附一个小typo:“channle (channel) 并不是 Golang 首创的”
    以及一个语句的小修改:会被阻塞,直到 channel 被关闭。

  5. defer func() {recover()}()
    recover() 是什么?
    defer func() {xxx}的意思是process()执行结束执行么?没见过这个用法

  6. recover 属于 golang 的错误处理机制的一部分,详细内容可以看:http://blog.golang.org/2010/08/defer-panic-and-recover.html,记得 golang-china 上有人翻译过。

Leave a Reply

Your email address will not be published. Required fields are marked *