[翻译] channel 独木难支

原文在此。遗憾的是文章只提出了问题,并没明确提供如何解决这些问题。但无论如何,对于这种可以引起反思的文章,是不能放过的。另外,我得承认,似乎高层次的分布式系统的抽象,用函数式语言的范式来表述更容易一些(实现上其实未必)。

————翻译分隔线————

channel 独木难支
或者说为什么流水线作业没那么容易

勇敢和聪明的 Golang 并发模型

@kachayev 撰写

概述

Go 被设计用于更容易的构建并发系统,因此它有运行独立的计算的 goroutine 和用于它们之间通讯的 channel。我们之前都听过这个故事。所有的例子和指南看起来都挺好的:我们可以创建一个新的 channel,可以向这个 channel 发送数据,可以从 channel 读取,甚至还有漂亮和优雅的 select 语句(顺便提一下,为什么 21 世纪了我们还在用语句?),阻塞读和缓存……
A magical unicorn

主旨:99% 的情况下,我其实并不关心响应是由 channel 传递的,还是一只魔法独角兽从它的角上带来的。

在为初学者撰写指南的时候这确实挺酷的!但是当你尝试实现大型的复杂系统的时候这就很痛苦了。channel 太原始了。它们是低级的构件,我相当怀疑你愿意在日常工作中天天和它们打交道。

看看“高级模式”和“流水作业”。不是那么简单吧?有太多的东西要考虑,并且永远记得:什么时候、如何关闭 channel;如何传递错误;如何释放资源。我抱怨这些是因为我曾尝试实现一些东西,然后失败了。而我每天都在面对这些东西。

你可能会说,对于初学者没必要理解所有细节。不过……描述一个模式真得很“高级”?不幸的是,答案是否定的。它们是基础和常识。

更仔细的了解一下流水作业问题。这真得是流水作业?不,“…对于每个来自目录的路径计算 MD5 校验码,并将结果存入一个 map[string][string]…”。这只是一个 pmap(并行 map)。或具有池化执行器的、有限的并行化的 pmap。而 pmap 不应当需要我输入如此多行代码。想了解真正的流水作业——我将在文章的最后介绍一个(参阅“构建 Twitter 分析器”的段落)。

那么模式如何呢?

为了快速开发真实的应用,我们应当能够提炼出比原始的 channel 层面更高的抽象。它们只是传输层。我们需要应用层的抽象来编写程序(对比 OSI),否则你会发现你总是在低级的 channel 网络的细节上纠结,试图在生产环境中、偶发的、没有任何有效的方法重现的找到它不工作的原因。参阅 Erlang OTP 是如何针对性的解决类似的问题的:将你保护在低级的消息传递代码之外。

低级的代码有什么问题?这里有一篇超棒的文章“爱德华 C++ 手(译注:借‘爱德华剪刀手’)”:

手里有把剪刀并不一定总是那么糟糕。爱德华有许多天赋:例如,他能创造劲爆的狗狗的发型。别误会——它展示了许多劲爆的狗狗的发型(我是说优雅且简单的 C++ 代码),但是主要内容还是关于如何避免剪坏,以及在发生剪坏的情况下进行急救。

Kyiv Go 聚会的时候,我经历了相同的情况:在一页幻灯上那 20 行干净可读的代码。一个不一般的竞态条件和一个可能出现的运行时错误。这对于所有听众来说很明显吗?不。至少一半人不明白。

痛苦的缘由?

好,让我们试着收集一些类似的模式。从工作的经验中、从书中、从其他语言中(是,伙计们,我知道这有点令人难以相信,不过还有许多其他语言同样也有并发的设计)。

Rob Pike 讨论了 Fan-in、Fan-out。在许多情况下,这很有用,不过还是关于网络的 channel。而不是你的应用。在任何情况下,看看(无耻的从这里偷的)。
Rob Pike talks about Fan-in, Fan-out. It’s useful in many ways, but still about the network of channels. Not about your application. In any case, let’s check (shamelessly stolen from here).

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为 cs 中每个输入的 channel 启动一个输出用的 goroutine。
    // 从 c 中复制值出来直到 c 被关闭,然后又调用 wg.Done。
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // 一旦所有输出的 goroutine 完成的,就启动一个 goroutine 来关闭 out。
    // 这必须在 wg.Add 调用后启动。
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

呃…… <-chan int。在我的应用中重用起来没那么抽象(例如,迁移到库中)……并且在每次我需要的时候都重新实现也不是那么清晰。那么如何让其可以重用?<-chan interface{}?欢迎来到类型转换和运行时错误的领地。如果,希望实现一个高级的 fan-in(合并)就必须牺牲类型安全。同样的(不幸的)是其他模式也是一样。

我真正想要的是:

func merge[T](cs ...<-chan T) <-chan T

是,我知道 Go 没有泛型,因为谁需要它们呢?

现在天气如何?

回到模式。让我们分析一个假设的项目,服务器端开发会与实际经验非常接近。我们需要一个服务器接收请求,输入一个美国的州,返回从 OpenWeatherMap 收集到的信息。例如这样:

$ http localhost:4912/weather?q=CA
HTTP/1.1 200 OK
Access-Control-Allow-Credentials: true
Access-Control-Allow-Methods: GET, POST
Access-Control-Allow-Origin: *
Connection: keep-alive
Content-Type: application/json; charset=utf-8
[{
    "clouds": {
        "all": 40
    },
    "id": 5391959,
    "main": {
        "temp": 288.89,
        "temp_max": 291.48,
        "temp_min": 286.15
    },
    "name": "San Francisco",
    "weather": [
        {
            "description": "mist",
            "icon": "50d",
            "id": 701,
            "main": "Mist"
        }
    ]
}, {
    "clouds": {
        "all": 90
    },
    "id": 5368361,
    "main": {
        "temp": 292.83,
        "temp_max": 296.15,
        "temp_min": 289.15
    },
    "name": "Los Angeles",
    "weather": [
        {
            "description": "mist",
            "icon": "50d",
            "id": 701,
            "main": "Mist"
        }
    ]
}]

pmap

让我们从一些我们已经知道的东西开始。那么,我们收到了请求 ?q=CA。我不想对从哪里得到相关城市的列表进行解释。我们可以用这个数据库,在内存中缓存以及其他什么合理的东西。假设我们有一个神奇的 findCities(state) 函数,返回 chan City(像通常 go 程序表现的延迟序列那样)。然后呢?每个城市我们都必须调用 OpenWeatherMap API 并解析结果到一个 map[City]Weather 中。我们已经讨论过这个模式了。这是个 pmap。我希望我的代码像这样:

chanCities := findCities(state)
resolver := func(name City) Weather { return openWeatherMap.AskFor(name) }
weather := chanCities.Par.Map(resolver)

或限制并发数:

chanCities := findCities(state)
pool := NewWorkers(20)
resolver := func(w Worker, name City) Weather { return w.AskFor(name) }
weather := chanCities.Par.BoundedMap(pool, resolver)

我希望所有这些 <-done 同步和神圣的 select 完全被隐藏起来。

Futures & Promises

获取当前天气可能需要很长的时间,例如,你有一个很长的城市列表。当然,你不希望重复的 API 调用,因此应当可以用某种方法管理并行的请求:

func collect(state string) Weather {
  calc, ok := calculations.get(state) // check if it's in progress
  if !ok {
      calc.calculations.run(state) // run otherwise
  }
  return calc.Wait() // wait until done
}

这也被叫做 future/promise 。Wiki 的解释:

它们描述了一个对象对于结果扮演了代理的角色,而这在一开始是不可预知的,通常是由于它的值尚未完成计算造成的。

我已经听过太多人说 go 的 future 很简单:

f := make(chan int, 1)

这是错误的,因为所有的等待者都应当得到结果(译注:实现 channel 的订阅与变化值的广播确实另人头大)。而这个版本也是错的:

[C]
f := make(chan int, 1)
v <- f f <- v // 在这里使用 v [/C] 由于不可能用这个方法管理资源。所以,当某个家伙在他的代码里丢了 f <- v 那部分,我希望你能幸运的发现这个 bug。

将数据直接发给所有的等待者来实现 promise 没那么复杂(我不确定这段代码是不是有 bug):

type PromiseDelivery chan interface{}
type Promise struct {
    sync.RWMutex
    value interface{}
    waiters []PromiseDelivery
}

func (p *Promise) Deliver(value interface{}) {
    p.Lock()
    defer p.Unlock()
    p.value = value
    for _, w := range p.waiters {
        locW := w
        go func(){
            locW <- value
        }()
    }
}

func (p *Promise) Value() interface{} {
    if p.value != nil {
        return p.value
    }

    delivery := make(PromiseDelivery)
    p.waiters = append(p.waiters, delivery)
    return <-delivery
}

func NewPromise() *Promise {
    return &Promise{
        value: nil,
        waiters: []PromiseDelivery{},
    }
}

如何使用他呢?

p := NewPromise()
go func(){
  p.Deliver(42)
}()
p.Value().(int) // 阻塞,当有值的时候返回 interface{} 

不过这里有 interface{} 和类型转换。我实际上想要什么呢?

// 在那些经过良好测试的库,甚至 stdlib 中
type PromiseDelivery[T] chan T
type Promise[T] struct {
    sync.RWMutex
    value T
    waiters []PromiseDelivery[T]
}
func (p *Promise[T]) Deliver(value T)
func (p *Promise[T]) Value() T
func NewPromise[T]() *Promise[T]

// 我的代码:
v := NewPromise[int]()
go func(){
  v.Deliver("woooow!") // 错误
  v.Deliver(42)
}()
v.Value() // 阻塞并返回 42,而不是 interface{}

是的,当然,没有人需要泛型。我在讨论什么鬼玩意啊?

也可以通过使用 select 来避免 p.Lock() 来监听 deliver,并在一个 goroutinewait 操作。还可以引入对最终用户极为有用的 .ValueWithTimeout 方法。还有许多许多其他“你可以……”。尽管我们实际上是在讨论一个 20 行的代码(它的长度可能在每次你发现 future/promise 交互更多细节的时候就开始增长了)。我真得需要知道(或想到) channel 为我传递值吗?不!

pub/sub

假设我们想要构建一个实时服务。那么我们的客户端现在可以开启一个 websocket 连接,传递 q=CA 请求,并即刻获得加利福尼亚的天气变化情况。它看起来应该像:

// deliverer
calculation.WhenDone(func(state string, w Weather) {
  broker.Publish("CA", w)
})

// client
ch := broker.Subscribe("CA")
for update := range ch {
  w.Write(update.Serialize())
}

这是一个典型的 pub/sub(译注:公告/订阅)。你可以从高级 Go 模式的演讲中学习它,甚至可以找到即刻可用的实现。问题是,它们全都基于接口的。

有没有可能实现:

broker := NewBroker[String, Weather]()
// so that
broker.Subs(42) // compilation failure
// and
broker.Subs("CA") // returns (chan Weather) not (chan interface{})

当然!如果你能勇敢的在项目之间复制粘贴代码,并到处进行修改。

map/filter

假设希望给与我们的用户更多的弹性,从而引入了新的查询参数:show,它的值可以是 all|temp|wind|icon

可能你可以从基础开始:

ch := broker.Subscribe("CA")
for update := range ch {
  temps := []Temp
  for _, t := update.Temp {
    temps = append(temps, t)
  }

  w.Write(temps)
}

不过,在写了 10 个这样的方法之后,你会意识到它没那么模块化,并且也很无聊。可能你需要:

ch := broker.Subscribe("CA").Map(func(w Weather) Temp { return w.Temp })
for update := range ch {
  w.Write(update)
}

等等,我有提过 channel 是一个 functor(译注:函子)吗?跟 future/promise 一样。

p := NewPromise().Map(func(w Weather) Temp { return w.Temp })
go func(){
  p.Deliver(Weather{Temp{42}})
}()
p.Value().(Temp) // Temp, not Weather

这意味着我重用了 future 的 channel 的相同代码。你也可以用想 transducers 这样的东西来完成它。我经常在 ClojureScript 的代码中使用的技巧:

(->> (send url) ;; returns chan, put single value to it {:status 200 :result 42} when ready
     (async/filter< #(= 200 (:status %))) ;; check that :status is 200
     (async/map< :result)) ;; expose only 42 to end user
;; note, that it will close all channels (including implicit intermediate one) properly

当我可以简单的进行 x.Map(transformation) 并得到相同类型的值的时候,我真得需要关心 x 是个 channel 还是个 future 吗?在这个例子里,为什么允许我创建 make(chan int) 而不能创建 make(Future int) 呢?

Request/Reply

假设我们的用户喜欢这个服务,并且频繁的使用它。那么就需要引入一些简单的 API 限制:每天、每个 IP 请求的数量。收集这个数量保存在一个 map[string]int 中很简单。Go 的文档说“不要通过共享内存来通讯,用通讯来共享内存”。好吧,听起来是个好主意。

req := make(chan string)
go func() { // wow, look here - it's an actor!
  m := map[string]int{}
  for r := range req {
    if v, ok := m[r]; !ok {
      m[r] = 1
    } else {
      m[r] = v + 1
    }
  } 
}()

go func() {
  req <- "127.0.0.2"
}()

go func() {
  req <- "127.0.0.1"
}()

这很容易。现在可以计算每个 IP 请求的数量了。不但如此……同时也可以要求执行请求需要权限。

type Req struct {
  ip string
  resp chan int
}

func NewRequest(ip string) *Req {
  return &Req{ip, make(chan int)}
}

requests := make(chan *Req)

go func() {
  m := map[string]int{}
  for r := range requests {
    if v, ok := m[r.ip]; !ok {
      m[r.ip] = 1
    } else {
      m[r.ip] = v + 1
    }
    r.resp <- m[r.ip]
  } 
}()

go func() {
  r := NewRequest("127.0.0.2")
  requests <- r
  fmt.Println(<- r.resp)
}()

go func() {
  r := NewRequest("127.0.0.1")
  requests <- r
  fmt.Println(<- r.resp)
}()

我不会再问你要泛型的解决方案(没有写死的 string 和 int)。换而言之,我希望你检查一下这段代码中是不是都正确?真得这么简单吗?

你确定 r.resp <- m[r.ip] 是个好办法?不,肯定不是!我希望有任何人等待那些很慢的客户端。是吗?而如果我有许多很慢的客户端的时候会怎么样呢?可能我需要对此进行一些处理。

requests <- r 这部分简单吗?如果我的 actor(服务器)过载无法响应的时候呢?可能我需要在这里处理超时……

时不时的我就需要特定的初始化和清理过程……都需要超时机制。并且需要能保持请求,直到初始化完成。

那么调用的优先权呢?例如,当我需要为了分析系统实现 Dump 方法,但是又不想让所有的用户暂停来收集需要的数据。

还有……看看 Erlang 中的 gen_server。为了保险起见,我希望它一被实现就可以是具有良好的文档的,经过高度覆盖测试的库。98% 的情况下,我不希望看到这样的介绍:make(chan int, ?) 而我不想思考到底我应该将 ? 替换成多少。

99% 的情况下,我其实并不关心响应是由 channel 传递的,还是一只魔法独角兽从它的角上带来的。

数不胜数

还有许多其他常见的并发的情况。我想你已经明白了。

苦难

你可以说,这些模式都不常见。不过……我在我的项目中不得不实现它们中的大多数。每!一!次!可能我不怎么走运,而你的项目会跟写给初学者的指南一样简单。

我知道,你们中的大多数会说“世界是艰辛的,编程是苦难的”。我会继续打击你:至少有一些语言展示了部分解决这些问题的示例。至少,在尝试解决它。Haskell 和 Scala 的类型系统提供了构建强大的高级抽象的能力,甚至自定义控制流处理并发。而另一阵营的 Clojure 利用动态类型鼓励和共享高级的抽象。Rust 有 channel泛型

让它工作 -> 让它优雅 -> 让它可重用。

现在,第一步已经完成。接下来呢?不要误会,go 是一个有远见的语言:channel 和 goroutine 比起例如 pthread 来说更好,不过是不是真得就停留在此?

补充:构建 Twitter 分析器

关于真实的流水作业。

你可能已经看过 Twitter 的分析了,它真得很棒。假设它尚未出现,而我们需要自己的分析工具:提供一个用户名,来统计有多少用户看过(至少是理论上)他的 tweet。应该如何做呢?其实不难:读取用户的时间轴,过滤掉所有的 retweet 和回复,然后请求其他 tweet 的 retweeter,为每个 retweeter 请求 follower 的列表,合并所有 retweeter 的 follower 在一起,然后加上这个用户的 follower。对于这个步骤我想要的结果是:
map[TweetId][]Username(retweeter)和 map[Username][]Username。这些用于构造一个向请求者展示的魔幻的表格是足够了。

有一些技术细节你应当留意:

      Twitter API 需要每个调用都使用 OAuth,并且设定了很强的限制(每个用户每 15 分钟 450 次调用)。为了对付这个限制,我们将用预定义的一个 OAuth token 列表(例如 50 个)组织在一个池中供 worker 使用,每个 worker 在达到限制之前都可以让自己休息一会。
      大多数 Twitter API 调用通过 since_id 或 max_id 使用了结果分页。因此你不能依赖一个请求就可以获取完整的结果。

一个粗糙的实现的例子。注意,你没必要理解这个文件中所有的内容。相反,如果你无法理解的话,这恰恰说明我们做对了。

那么我们现在有什么?

  • 一些步骤的计算:TimelineReader -> RetweetersReader -> FollowersReader -> FinalReducer。
  • 自供消息。由于分页所有阶段都是递归的。这意味着每个步骤都会向下一个阶段和其本身发出消息。在这个情况下,很难处理取消的情况。甚至无法发现某个步骤的工作全部完成。
  • 尽早传播。至少有两种情况:首先为了通过 TweetId 来收集 []Username,我们需要将收集到的信息直接从 RetweetersReader 发送到 FinalReducer。然后,一开始我们就知道,需要获得初始用户的 follower,因此他的用户名应当从 TimelineReader 传递到
    RetweetersReader 步骤。
  • 中间收缩。FollowersReader 不只是一个管道。它会过滤我们已经见过的用户名(因为总不想重复工作吧)。
  • 持续工作的 worker。在许多情况下,你无法等待 worker 退出。例如,当你实现了一个服务,它会同时响应许多客户端的时候。

Join the Conversation

2 Comments

  1. 我选择移植reactivecocoa到go,这是个promise方案。不过go没有objc里面用协变处理回调函数参数类型的功能(曲线解决泛型问题)

  2. 看完之后感觉好忧伤啊,因为我在实际项目中也遇到了类似的问题,每遇到一次就纠结一次,总想给出一个更加高级的抽象实现,但是每次都要这么来一遍。

Leave a comment

Your email address will not be published. Required fields are marked *