Go 语言中漂亮的并发特性

Go 语言   2013-05-19 19:58:13 发布
您的评价:
     
3.3
收藏     1收藏
文件夹
标签
(多个标签用逗号分隔)
时不时地学习一门新的编程语言对你来说是大有裨益的,哪怕这门语言并不那么成功甚至有些过时。用新的编程语言来解决老的编程问题会迫使你对自己的某些观点、方法甚至是习惯进行重新思考。

我喜欢尝试新的东西,尤其是编程语言。但是如果只编写了“Hellow World"或者实现了Fibonacci数列,你通常会对这门语言毫无感觉、甚至觉得索然寡味。你应该去试着实现Eratosthenes筛法,尝试一些数据结构或者感受一下它的性能表现。但是我想要的是更加现实的、甚至能够为以后复用的代码。所以不久前我为自己出了个题目,来帮助我只通过几百行代码就可以体会到新的编程语言的感觉。

这个项目涉及一个语言中的几个基本元素:字符串,文件,网络I/O,当然还有并发性。这个项目叫做TCP/IP代理(或者你可以叫做网络调试器)。它的理念是,你拥有一个TCP/IP监听器(单线程或多线程)监听一个指定的端口并接受外来的连接。当他接受到一个接入请求,它就会建立一个连接,并且在远程客户端与服务器之间做双向数据传输。另外,这个代理可以记录日志,以不同的格式来记录,以便日后做分析。

当我需要这个工具的时候,我不再需要到处去找。每次涉及网络编程的时候,这样一个工具是必须的。我已经使用过不同的语言来实现这个工具,包括C, C++, Perl, PHP。最近的两个实现是使用python 和 Erlang。它代表着我正在寻找的答案。

我们可以再具体说说我们的需求。这个应用必须支持同时建立多个连接。对于每一个连接,它需要通过3种途径记录数据:一个以十六进制顺序记录来自于双向传输的数据日志,两个用于分开记录进和出的数据流的二进制日志。

我们在这篇文章中将实现这个程序,我们使用的语言是Go。Go的作者声称,Go骨子里就渗透着并发和多线程特性。我想把它带到我们的世界。

如果我通过C++来实现,我可能就需要main监听线程,和每个连接的线程。所以,单独一个连接就可以通过一个线程,而得到完整的服务(I/O和日志记录)。

以下是我在Go实现中用于服务每个连接的线程:

  • 一个双向十六进制日志记录线程
  • 两个以二进制记录进和出的数据流的线程
  • 两个用于在服务器和远程主机间传输数据的线程

总共5个线程。

5个线程在为一个独立的连接服务。我实现了这些线程,不是为了多线程本身,而是因为Go鼓励多线程,而C++不鼓励(及时最新的C++x11标准也类似)。多线程在Go中是如此自然而简单。我在Go语言中实现TCP/IP代理,没有使用锁和条件变量。同步由Go的channel方式进行优雅的管理。

好吧,这里有源代码,包含解释。如果你不熟悉Go,注释会有帮助。我的本意不仅关注于这个程序的功能,也关注Go语言本身。

现在开始

从 2-11 行我们声明了一些要用到的包。值得注意的是,如果引入的包没有用到,Go 视之为一个错误并且强制删除没用的声明(在C++项目中,你最后完成了,记得什么时候清理过 STL 的头文件吗?)

package main

import (
    "encoding/hex"
    "flag"
    "fmt"
    "net"
    "os"
    "runtime"
    "strings"
    "time"
)
从 12-16行我们声明了一些代表命令行标志的全局变量。后面,我们会看到如何解析他们。
var (
    host *string = flag.String("host", "","target host or address")
    port *string = flag.String("port", "0", "target port")
    listen_port *string = flag.String("listen_port", "0","listen port")
)

从 17-20 行我们看到Go语言中可变参数函数的语法结构。

 
func die(format string, v ...interface{}) {
     os.Stderr.WriteString(fmt.Sprintf(format+"\n", v...))
     os.Exit(1)
}

从 21-28 行有两个函数分别启动十六进制数据日志和二进制流日志。他们唯一的区别是日志名称不同。

 
func connection_logger(data chan []byte, conn_n int,
 	local_info, remote_info string) {
    log_name := fmt.Sprintf("log-%s-%04d-%s-%s.log",format_time(time.Now()), conn_n, local_info, remote_info)
    logger_loop(data, log_name)
}
func binary_logger(data chan []byte, conn_n int, peer string) {
    log_name := fmt.Sprintf("log-binary-%s-%04d-%s.log",
 	format_time(time.Now()), conn_n, peer)
    logger_loop(data, log_name)
}

在第 29-43 行,你能看到 Go 真正的乐趣。函数 logger_loop 创建一个日志文件,然无限循环的写入(35-42行)。在第36行,代码等待来自通道 data 的消息。在第34行有一个很很有趣的技巧。操作符 defer 允许我们定义一段在函数功能结束时执行的代码(类似Java中的 finally)。如果接受到空数据,函数将退出。

func logger_loop(data chan []byte, log_name string) {
    f, err := os.Create(log_name)
    if err != nil {
        die("Unable to create file %s, %v\n", log_name, err)
    }
    defer f.Close()
    for {
        b := <-data
        if len(b) == 0 {
             break
        }
        f.Write(b)
        f.Sync()
    }
}
func format_time(t time.Time) string {
    return t.Format("2006.01.02-15.04.05")
}
func printable_addr(a net.Addr) string {
    return strings.Replace(a.String(), ":", "-", -1)
}
type Channel struct {
    from, to net.Conn
    logger, binary_logger chan []byte
    ack chan bool
}

在第55-88行中有一个函数用来从源套接字读取数据并写入到日志中,最后将其发送到目的套接字。对于每个连接,有两个pass_through函数的实例,在相反方向的本地和远程之间复制数据。当一个I / O错误发生时,它被处理成连接的断开。最后,在第79行此函数中目标套接字发送确认信号到主线程,信号终止。

func pass_through(c *Channel){ 
    from_peer := printable_addr(c.from.LocalAddr())
    to_peer := printable_addr(c.to.LocalAddr())
 	
    b := make([]byte, 10240)
    offset := 0
    packet_n := 0
    for {
        n, err := c.from.Read(b)
        if err != nil {
            c.logger <- []byte(fmt.Sprintf("Disconnected from %s\n",from_peer))
            break
        }
        if n > 0 {
            c.logger <- []byte(fmt.Sprintf("Received (#%d, %08X)%d bytes from %s\n",packet_n, offset, n, from_peer))
            c.logger <- []byte(hex.Dump(b[:n]))
            c.binary_logger <- b[:n]
            c.to.Write(b[:n])
            c.logger <- []byte(fmt.Sprintf("Sent (#%d) to %s\n",packet_n, to_peer))
            offset += n
            packet_n += 1
        }
    }
    c.from.Close()
    c.to.Close()
    c.ack <- true
}

在81-107行有一个负责处理实际连接的函数。这个函数连接远程的socket(第82行),对连接计时(第88行,第101-103行),开启日志(第93-95行),最后启动两个传输数据的线程(第97-98行)。只要两个连接都还可用,pass_through就会一直执行下去。在第99-100行我们等待数据传输线程返回的确认信息。在第104-106行我们关闭日志。

func process_connection(local net.Conn, conn_n int, target string) {
    remote, err := net.Dial("tcp", target)
    if err != nil {
        fmt.Printf("Unable to connect to %s, %v\n", target, err)
    }
    
    local_info := printable_addr(remote.LocalAddr())
    remote_info := printable_addr(remote.RemoteAddr())
    
    started := time.Now()
    
    logger := make(chan []byte)
    from_logger := make(chan []byte)
    to_logger := make(chan []byte)
    ack := make(chan bool)
    
    go connection_logger(logger, conn_n, local_info, remote_info)
    go binary_logger(from_logger, conn_n, local_info)
    go binary_logger(to_logger, conn_n, remote_info)
    
    logger <- []byte(fmt.Sprintf("Connected to %s at %s\n",
    target, format_time(started)))
    
    go pass_through(&Channel{remote, local, logger, to_logger, ack})
    go pass_through(&Channel{local, remote, logger, from_logger, ack})
    <-ack // Make sure that the both copiers gracefully finish.
    <-ack //
    
    finished := time.Now()
    duration := finished.Sub(started)
    logger <- []byte(fmt.Sprintf("Finished at %s, duration %s\n",
    format_time(started), duration.String()))
    
    logger <- []byte{} // Stop logger
    from_logger <- []byte{} // Stop "from" binary logger
    to_logger <- []byte{} // Stop "to" binary logger
}

在第108-132行是运行TCP / IP侦听的主要入口函数。在第109行,我们要求Go运行时使用所有的物理可用CPU。

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    flag.Parse()
    if flag.NFlag() != 3 {
        fmt.Printf("usage: gotcpspy -host target_host -port target_port -listen_post=local_port\n")
        flag.PrintDefaults()
        os.Exit(1)
    }
    target := net.JoinHostPort(*host, *port)
    fmt.Printf("Start listening on port %s and forwarding data to %s\n",*listen_port, target)
    ln, err := net.Listen("tcp", ":"+*listen_port)
    if err != nil {
        fmt.Printf("Unable to start listener, %v\n", err)
        os.Exit(1)
    }
    conn_n := 1
    for {
        if conn, err := ln.Accept(); err ==nil {
            go process_connection(conn, conn_n, target)
            conn_n += 1
        } else {
 	     fmt.Printf("Accept failed, %v\n", err)        }
    }
}

这个程序只有132行。请注意:我们只使用了标准库。
现在,我们已经准备好运行:

go run gotcpspy.go -host pop.yandex.ru -port 110 -local_port 8080
它应该会打印出:
Start listening on port 8080 and forwarding data to pop.yandex.ru:110
然后你就可以在另一个窗口中运行:
telnet localhost 8080
接下来输入,例如用户 test[ENTER]键和密码(空)[ENTER] 。三个日志文件即将被创建(当然在每人 不同 情况下 生成的时间戳不同 )。

双向十六进制日志 log-2012.04.20-19.55.17-0001-192.168.1.41 -49544-213.180.204.37-110.log:

Connected to pop.yandex.ru:110 at 2012.04.20-19.55.17
Received (#0, 00000000) 38 bytes from 192.168.1.41-49544
00000000 2b 4f 4b 20 50 4f 50 20 59 61 21 20 76 31 2e 30
|+OK POP Ya! v1.0|
00000010 2e 30 6e 61 40 32 36 20 48 74 6a 4a 69 74 63 50
|.0na@26 HtjJitcP|
00000020 52 75 51 31 0d 0a
|RuQ1..|
Sent (#0) to [--1]-8080
Received (#0, 00000000) 11 bytes from [--1]-8080
00000000 55 53 45 52 20 74 65 73 74 0d 0a
|USER test..|
Sent (#0) to 192.168.1.41-49544
Received (#1, 00000026) 23 bytes from 192.168.1.41-49544
00000000 2b 4f 4b 20 70 61 73 73 77 6f 72 64 2c 20 70 6c
|+OK password, pl|
00000010 65 61 73 65 2e 0d 0a
|ease...|
Sent (#1) to [--1]-8080
Received (#1, 0000000B) 11 bytes from [--1]-8080
00000000 50 41 53 53 20 6e 6f 6e 65 0d 0a
|PASS none..|
Sent (#1) to 192.168.1.41-49544
Received (#2, 0000003D) 72 bytes from 192.168.1.41-49544
00000000 2d 45 52 52 20 5b 41 55 54 48 5d 20 6c 6f 67 69
|-ERR [AUTH] logi|
00000010 6e 20 66 61 69 6c 75 72 65 20 6f 72 20 50 4f 50
|n failure or POP|
00000020 33 20 64 69 73 61 62 6c 65 64 2c 20 74 72 79 20
|3 disabled, try |
00000030 6c 61 74 65 72 2e 20 73 63 3d 48 74 6a 4a 69 74
|later. sc=HtjJit|
00000040 63 50 52 75 51 31 0d 0a
|cPRuQ1..|
Sent (#2) to [--1]-8080
Disconnected from 192.168.1.41-49544
Disconnected from [--1]-8080
Finished at 2012.04.20-19.55.17, duration 5.253979s

传出数据二进制日志 log-binary-2012.04.20-19.55.17-0001 -192.168.1.41-49544.log:

USER test
PASS none

传入数据二进制日志 log-binary-2012.04.20-19.55.17 -0001-213.180.204.37-110.log:

+OK POP Ya! v1.0.0na@26 HtjJitcPRuQ1
+OK password, please.

-ERR [AUTH] login failure or POP3 disabled, try later. sc=HtjJitcPRuQ1

看起来有用,现在我们试试下载更大的二进制文件来测试性能,先直接下载,再通过代理。

直接下载(文件大小约72MB):

time wget http://www.erlang.org/download/otp_src_R15B01.tar.gz

 ...

Saving to: `otp_src_R15B01.tar.gz'

 ...

real 1m2.819s

现在,试试通过代理下载:

go run gotcpspy.go -host=www.erlang.org -port=80 -listen_port=8080

下载:

time wget http://localhost:8080/download/otp_src_R15B01.tar.gz

...

Saving to: `otp_src_R15B01.tar.gz.1'

...

real 0m56.209s

比较一下结果:

diff otp_src_R15B01.tar.gz otp_src_R15B01.tar.gz.1

两者匹配,程序运行正确。

现在来看看性能,我在我的 Mac Air 上将这个实验重复了几次。惊讶的是,对我来说,通过代理下载居然比直接下载还要快一些。在上面的实验中:1m2819s (直接) VS. 0m.56209s (代理)。我能想到的唯一解释就是 wget是单线程的,它在一个线程内复用输入和输出流。反过来,代理以独立线程处理每一个流,可能因此速度稍稍快一些。此中差异甚微,几乎不能察觉,或许在另一台电脑或另一个网路中,这点差异就会完全消失。主要的观察是,尽管通过代理下载会额外产生庞大的日志开销,下载速度并不会因此减慢。

综上所述,我希望你从简单和清晰的角度来看这个程序。我在上面已经指出,但我想再次强调:我已经开始逐渐在这个应用程序中使用线程(goroutine)。问题的本质只需我在一个正在运行的连接中标记并发任务,然后利用Go的易用性和安全性的并发机制已经很好地实现了它,而且我 最终不用在效率与复杂的并发(和调试的难度)顾此失彼。

有时候同意一个简单的问题只需输入比特和字节, 你唯一关心的是代码的线性效率。但你在并发,多线程处理能力中遇到的逐渐增多的问题将成为关键因素,而对于这种应用,Go的魅力即将闪耀。

我希望作为一个有代表性的例子来炫耀Go的 方便甚至并发美。

扩展阅读

在 Go 语言中,正确的使用并发
Go语言的Http并发
Go 语言简介(下) - 特性
Go语言的漂亮着色打印器:pp
Go并发编程之Go语言概述

为您推荐

Maven搭建SpringMVC+Hibernate项目详解
HTML5拖拽文件到浏览器并实现文件上传下载
MapReduce 模式、算法和用例(MapReduce Patterns, Algorithms, and Use Cases)
Vue.js开发实践:实现精巧的无限加载与分页功能
C++开源日志库log4cplus

更多

Go 语言
Google Go开发