这个题目的原文叫做《Gobs on the wire》,作者巧妙的用了“gob”这个词。gob本来是Golang的一个关于网络通信协议的包。而在这里,我感觉标题也可以翻译为《关于线上的那一大陀……》。好吧,我得承认,这么翻译实在不雅。
————翻译分割线————
飞翔的 gob
这周,我想跟大家谈谈如何用 Go 编写基于同步请求和异步事件通知的 Client/Server 系统。
为了帮助学习 Go,我克隆了一个Conserver 命令行服务。当然,现在对于世界来说没必要有另外一个命令行服务。然而由于Go语言带给开发者的特性,非常适合用来做命令行服务,所以这将会是一次非常有趣的体验。命令行服务的任务是从一个或者多个串口上汇总输出(或者说各种系统上的一个或者多个实现了使用TCP连接的Rtelnet协议的终端服务)。 它将输出记录到文件,同时让输出可以被别人实时的看到。它仅允许唯一一个用户进行读写,用于实际控制设备。
Go版本的命令行服务被称作gocons。先去看看,然后回到这里。跟以往一样,我先说说在编写这个的时候,我学到了Go的哪些东西。
最初,我尝试用netchan来构建这个。我想,如果客户端在某个通道(译注:channel,下同)上向服务器请求,当在某个命令行上有事件发生时接收通知这会是一件很酷的事情。但是,由于netchan不能处理多个通道,它彻底的崩溃了。 我认为,理论上是可能通过某种方法让netchan处理多个通道,但是这看起来确实非常难, 而且多个通道可能永远不同步。所以,我转向了正确的方向……
接下来是想使用rpc包实现客户端和服务器之间的协议。但是,这带来了另外一个问题:RPC,它的定义是同步的。它没有明确定义服务端如何使用RPC包下发异步事件到客户端,如“这些字节已经到达”。我需要自己编写协议,通过两个步骤,一个同步调用,例如“我可以收听这个吗?是的。”以及异步事件“你监视的命令行刚刚输出了这些字节”。 我之前已经编写了这些协议,这并不难,只是有一些繁琐。而Go提供的gob包就是你开发一个协议所需要的全部。
而你需要做的就是描绘出协议消息,并且让gob负责处理打包和解包,就是计算消息的分隔。在我们的例子中,这个类型被称为connReq和connReply。在理想世界里,这些应当是某个库的公共类型,客户端和服务器端同时使用它们。在gocons中,我发懒了,只是复制并粘贴了它们。客户端在net.TCPConn上有gob.Decode,而结果就是connReq(如果不是的话,那说明有什么不对劲,客户端可以干掉连接或解码这个连接上的后续内容)。由于Go没有union(非类型安全的),connReq和connReply即便是给定的协议消息不使用的情况下,也应当包含所有的字段。我并没有仔细思考这个协议, 但由于未使用字段可能是字节段或字符串,不可能有很多;而且空的字节段将被编码为nil,而不是用零填充的整个字节缓冲区。
对此更加精致的设想是构造一个层次化的类型,最简单的类型(仅有一个int指名类型)作为基础,后面跟一个复杂的类型。但是要弄清楚给gob.Decode的是什么类型是非常难的;这就像你必须将协议拆分为两部分,并且分别对其调用go.Encode。第一个可以告诉这是什么类型,而第二个gob可以是包含数据的短语。无论如何,我不会在gocons里这么做。简单就好!
在服务器中,有两个代码片段很有趣。一个是使用JSON作为配置文件的格式。另一个是如何将新的数据发送到所有收听者。
第一个比较简单。仅仅是演示在不使用json.Unmarshall的情况下,如何从JSON文件中获取数据。我搞不清楚json.Unmarshall,所以在我尝试不用它的情况下,让json.Decode工作。我并不是说这么做好,但是它能运行,这可能可以帮助其他在Go中读取JSON的正在寻找例子的人。
希望的输入是这样的:
{ "consoles": { "firewall": "ts.company.com:2070", "web01": "ts.company.com:2071", "web02": "ts.company.com:2072", "web03": "ts.company.com:2073" } }
目标是在consoles中对应的每个键值调用addConsole。
是这样做的,如果你不希望这样(或者知道如何做的话)使用json.Unmarshal吧:
r, err := os.Open(*config, os.O_RDONLY, 0) if err != nil { log.Exitf("Cannot read config file %v: %v", *config, err) } dec := json.NewDecoder(r) var conf interface{} err = dec.Decode(&conf) if err != nil { log.Exit("JSON decode: ", err) } hash, ok := conf.(map[string]interface{}) if !ok { log.Exit("JSON format error: got %T", conf) } consoles, ok := hash["consoles"] if !ok { log.Exit("JSON format error: key consoles not found") } c2, ok := consoles.(map[string]interface{}) if !ok { log.Exitf("JSON format error: consoles key wrong type, %T", consoles) } for k, v := range c2 { s, ok := v.(string) if ok { addConsole(k, s) } else { log.Exit("Dial string for console %v is not a string.", k) } }
这里的模式大致如此,json.Decode提供了一个interface{},然后根据结构使用类型选择器,然后获得你期望的在那里获得的内容。
更加简单的办法是使用json.Unmarshal。从文档中很难理解如何使用,幸好这篇文章让它看起来更加清晰。
服务器是在一个循环里处理i/o的一系列的goroutine组成。每个它监视的命令行拥有一个读goroutine和一个写goroutine。读的从其中获取字节,然后向所有收听的gocons客户端分发。它管理了一个包含客户端列表的链表,不过另外一种数据结构可能会工作得更好。不论是在net.TCPConn还是通道,客户端都是没有排序的。等待新数据的通道就像是客户端的代理goroutine。当每个客户端连接时,会创建一对goroutine,一个用于读,一个用于写。这允许我们在输入上实现阻塞读(例子可参看dec.Decode),而不用担心阻塞服务器上的其他任务。
利用一个独立的goroutine保证负责向TCP连接写入,这样可以不使用任何锁。作为练习,可以让多个命令行管理器同时说:“我有一些数据需要TCP连接的多路复用!”而无须关心它们向连接写入数据时相互干扰。(当前的实现一次只可以监听一个命令行。)
下面的片段演示了当有新的内容时,如何打包并且向所有命令行观看者发送通知:
select { // a bunch of other channels to monitor here... case data := <-m.dataCh: if closed(m.dataCh) { break L } // multicast the data to the listeners for l := listeners; l != nil; l = l.next { if closed(l.ch) { // TODO: need to remove this node from the list, not just mark nil l.ch = nil log.Print("Marking listener ", l, " no longer active.") } if l.ch != nil { ok := l.ch <- consoleEvent{data} if !ok { log.Print("Listener ", l, " lost an event.") } } }
这样,我们建立了一个新的consoleEvent,并且将其发送给了每个收听者。这有点浪费:它产生了许多垃圾,这意味着垃圾回收器需要更加努力的工作。可以创建一个consoleEvent,然后向所有的收听者发送这一个。但是,如果这样共享内存,需要开发者决定是让共享内存只读,还是使用互斥控制它的访问。在我们的例子中,使用了只读的方式,像这样:
// new event arrived from the console manager, so send it down the TCP connection case ev := <-evCh: reply.code = Data reply.data = ev.data err := enc.Encode(reply) if err != nil { log.Print("connection ", c.rw, ", failed to send data: ", err) break L }
在这个模式下,两个goroutine只负责读取和写入,这就像魔法。这梦幻般的降低了实现gocons需要的代码量。原先的命令行服务需要数百行复杂的代码,关于设置select掩码,等待select,检测fd是否需要accept()或者read()或者其他(同时要找到让fd可用的正确的数据结构)。在gocons中,以及其他Go程序,如http包实现的http服务,可以使用阻塞的读,让Go安排运行使得整个系统并未阻塞。
然而,如果考虑当向客户端TCP连接写入发生阻塞时的情况,会很有趣。当系统在写时发生阻塞,它最终会放弃,并且阻止从命令行上读取,阻塞其他所有客户端。为了对应这种情况,你需要在适当的地方建立防火墙:共享的资源不应当让个体阻塞它们。你需要在客户端代理goroutine和命令行管理者读取goroutine之间的通道设置一个队列,让其在通道上实现非阻塞写,并且当一个阻塞了,就处理它。例如,可以关闭通道,然后说“嘿,你的排水不畅,应该清理一下,然后再来找我。”
用Go编写并且调试这个服务器,让我学到了许多东西。而我仍然还有许多东西需要学习:代码中仍然有一些神秘的东西,例如为什么我需要runtime.Gosched()保证不会阻塞,以及如何处理关闭的通道在select中带来的麻烦。还有更多的隐藏在setOwner中的神秘工作,首要的是:如何发现将从一个地方转发到另一个地方的“pump goroutine”中的bug(在Go运行时环境,或者我能理解的情况下)
Leave a Reply