[翻译]飞翔的 gob

这个题目的原文叫做《Gobs on the wire》,作者巧妙的用了“gob”这个词。gob本来是Golang的一个关于网络通信协议的包。而在这里,我感觉标题也可以翻译为《关于线上的那一大陀……》。好吧,我得承认,这么翻译实在不雅。

————翻译分割线————

飞翔的 gob

这周,我想跟大家谈谈如何用 Go 编写基于同步请求和异步事件通知的 Client/Server 系统。

为了帮助学习 Go,我克隆了一个Conserver 命令行服务。当然,现在对于世界来说没必要有另外一个命令行服务。然而由于Go语言带给开发者的特性,非常适合用来做命令行服务,所以这将会是一次非常有趣的体验。命令行服务的任务是从一个或者多个串口上汇总输出(或者说各种系统上的一个或者多个实现了使用TCP连接的Rtelnet协议的终端服务)。 它将输出记录到文件,同时让输出可以被别人实时的看到。它仅允许唯一一个用户进行读写,用于实际控制设备。

Go版本的命令行服务被称作gocons。先去看看,然后回到这里。跟以往一样,我先说说在编写这个的时候,我学到了Go的哪些东西。

最初,我尝试用netchan来构建这个。我想,如果客户端在某个通道(译注:channel,下同)上向服务器请求,当在某个命令行上有事件发生时接收通知这会是一件很酷的事情。但是,由于netchan不能处理多个通道,它彻底的崩溃了。 我认为,理论上是可能通过某种方法让netchan处理多个通道,但是这看起来确实非常难, 而且多个通道可能永远不同步。所以,我转向了正确的方向……

接下来是想使用rpc包实现客户端和服务器之间的协议。但是,这带来了另外一个问题:RPC,它的定义是同步的。它没有明确定义服务端如何使用RPC包下发异步事件到客户端,如“这些字节已经到达”。我需要自己编写协议,通过两个步骤,一个同步调用,例如“我可以收听这个吗?是的。”以及异步事件“你监视的命令行刚刚输出了这些字节”。 我之前已经编写了这些协议,这并不难,只是有一些繁琐。而Go提供的gob包就是你开发一个协议所需要的全部。

而你需要做的就是描绘出协议消息,并且让gob负责处理打包和解包,就是计算消息的分隔。在我们的例子中,这个类型被称为connReq和connReply。在理想世界里,这些应当是某个库的公共类型,客户端和服务器端同时使用它们。在gocons中,我发懒了,只是复制并粘贴了它们。客户端在net.TCPConn上有gob.Decode,而结果就是connReq(如果不是的话,那说明有什么不对劲,客户端可以干掉连接或解码这个连接上的后续内容)。由于Go没有union(非类型安全的),connReq和connReply即便是给定的协议消息不使用的情况下,也应当包含所有的字段。我并没有仔细思考这个协议, 但由于未使用字段可能是字节段或字符串,不可能有很多;而且空的字节段将被编码为nil,而不是用零填充的整个字节缓冲区。

对此更加精致的设想是构造一个层次化的类型,最简单的类型(仅有一个int指名类型)作为基础,后面跟一个复杂的类型。但是要弄清楚给gob.Decode的是什么类型是非常难的;这就像你必须将协议拆分为两部分,并且分别对其调用go.Encode。第一个可以告诉这是什么类型,而第二个gob可以是包含数据的短语。无论如何,我不会在gocons里这么做。简单就好!

在服务器中,有两个代码片段很有趣。一个是使用JSON作为配置文件的格式。另一个是如何将新的数据发送到所有收听者。

第一个比较简单。仅仅是演示在不使用json.Unmarshall的情况下,如何从JSON文件中获取数据。我搞不清楚json.Unmarshall,所以在我尝试不用它的情况下,让json.Decode工作。我并不是说这么做好,但是它能运行,这可能可以帮助其他在Go中读取JSON的正在寻找例子的人。

希望的输入是这样的:

{
    "consoles": {
        "firewall": "ts.company.com:2070",
        "web01": "ts.company.com:2071",
        "web02": "ts.company.com:2072",
        "web03": "ts.company.com:2073"
    }
}

目标是在consoles中对应的每个键值调用addConsole。

是这样做的,如果你不希望这样(或者知道如何做的话)使用json.Unmarshal吧:

  r, err := os.Open(*config, os.O_RDONLY, 0)
  if err != nil {
    log.Exitf("Cannot read config file %v: %v", *config, err)
  }
  dec := json.NewDecoder(r)
  var conf interface{}
  err = dec.Decode(&conf)
  if err != nil {
    log.Exit("JSON decode: ", err)
  }
  hash, ok := conf.(map[string]interface{})
  if !ok {
    log.Exit("JSON format error: got %T", conf)
  }
  consoles, ok := hash["consoles"]
  if !ok {
    log.Exit("JSON format error: key consoles not found")
  }
  c2, ok := consoles.(map[string]interface{})
  if !ok {
    log.Exitf("JSON format error: consoles key wrong type, %T", consoles)
  }
  for k, v := range c2 {
    s, ok := v.(string)
    if ok {
      addConsole(k, s)
    } else {
      log.Exit("Dial string for console %v is not a string.", k)
    }
  }

这里的模式大致如此,json.Decode提供了一个interface{},然后根据结构使用类型选择器,然后获得你期望的在那里获得的内容。

更加简单的办法是使用json.Unmarshal。从文档中很难理解如何使用,幸好这篇文章让它看起来更加清晰。

服务器是在一个循环里处理i/o的一系列的goroutine组成。每个它监视的命令行拥有一个读goroutine和一个写goroutine。读的从其中获取字节,然后向所有收听的gocons客户端分发。它管理了一个包含客户端列表的链表,不过另外一种数据结构可能会工作得更好。不论是在net.TCPConn还是通道,客户端都是没有排序的。等待新数据的通道就像是客户端的代理goroutine。当每个客户端连接时,会创建一对goroutine,一个用于读,一个用于写。这允许我们在输入上实现阻塞读(例子可参看dec.Decode),而不用担心阻塞服务器上的其他任务。

利用一个独立的goroutine保证负责向TCP连接写入,这样可以不使用任何锁。作为练习,可以让多个命令行管理器同时说:“我有一些数据需要TCP连接的多路复用!”而无须关心它们向连接写入数据时相互干扰。(当前的实现一次只可以监听一个命令行。)

下面的片段演示了当有新的内容时,如何打包并且向所有命令行观看者发送通知:

    select {
      // a bunch of other channels to monitor here...
      case data := <-m.dataCh:
        if closed(m.dataCh) {
          break L
        }
        // multicast the data to the listeners
        for l := listeners; l != nil; l = l.next {
          if closed(l.ch) {
            // TODO: need to remove this node from the list, not just mark nil
            l.ch = nil
            log.Print("Marking listener ", l, " no longer active.")
          }
          if l.ch != nil {
            ok := l.ch <- consoleEvent{data}
            if !ok {
              log.Print("Listener ", l, " lost an event.")
            }
          }
        }

这样,我们建立了一个新的consoleEvent,并且将其发送给了每个收听者。这有点浪费:它产生了许多垃圾,这意味着垃圾回收器需要更加努力的工作。可以创建一个consoleEvent,然后向所有的收听者发送这一个。但是,如果这样共享内存,需要开发者决定是让共享内存只读,还是使用互斥控制它的访问。在我们的例子中,使用了只读的方式,像这样:

    // new event arrived from the console manager, so send it down the TCP connection
    case ev := <-evCh:
      reply.code = Data
      reply.data = ev.data
      err := enc.Encode(reply)
      if err != nil {
        log.Print("connection ", c.rw, ", failed to send data: ", err)
        break L
      }

在这个模式下,两个goroutine只负责读取和写入,这就像魔法。这梦幻般的降低了实现gocons需要的代码量。原先的命令行服务需要数百行复杂的代码,关于设置select掩码,等待select,检测fd是否需要accept()或者read()或者其他(同时要找到让fd可用的正确的数据结构)。在gocons中,以及其他Go程序,如http包实现的http服务,可以使用阻塞的读,让Go安排运行使得整个系统并未阻塞。

然而,如果考虑当向客户端TCP连接写入发生阻塞时的情况,会很有趣。当系统在写时发生阻塞,它最终会放弃,并且阻止从命令行上读取,阻塞其他所有客户端。为了对应这种情况,你需要在适当的地方建立防火墙:共享的资源不应当让个体阻塞它们。你需要在客户端代理goroutine和命令行管理者读取goroutine之间的通道设置一个队列,让其在通道上实现非阻塞写,并且当一个阻塞了,就处理它。例如,可以关闭通道,然后说“嘿,你的排水不畅,应该清理一下,然后再来找我。”

用Go编写并且调试这个服务器,让我学到了许多东西。而我仍然还有许多东西需要学习:代码中仍然有一些神秘的东西,例如为什么我需要runtime.Gosched()保证不会阻塞,以及如何处理关闭的通道在select中带来的麻烦。还有更多的隐藏在setOwner中的神秘工作,首要的是:如何发现将从一个地方转发到另一个地方的“pump goroutine”中的bug(在Go运行时环境,或者我能理解的情况下)

Join the Conversation

1 Comment

Leave a comment

Your email address will not be published. Required fields are marked *