我目前正在尝试使用Go进行一些实验。这是我正在尝试做的事情:
我有一个REST API服务正在运行,我想在尽可能多的Goroutine中反复查询特定的URL,以查看这些响应的性能如何(通过查看我的REST API服务器日志)。在退出程序之前,我想发送总计100万个HTTP请求-在计算机允许的范围内同时执行。
我知道有一些工具可以做到这一点,但是我主要对如何使用goroutines在Go中最大化HTTP并发性感兴趣。
这是我的代码:
package main import ( "fmt" "net/http" "runtime" "time" ) func main() { runtime.GOMAXPROCS(runtime.NumCPU()) transport := &http.Transport{} for i := 0; i < 1000000; i++ { go func() { req, _ := http.NewRequest("GET", "http://myapi.com", nil) req.Header.Set("User-Agent", "custom-agent") req.SetBasicAuth("xxx", "xxx") resp, err := transport.RoundTrip(req) if err != nil { panic("HTTP request failed.") } defer resp.Body.Close() if resp.StatusCode != 302 { panic("Unexpected response returned.") } location := resp.Header.Get("Location") if location == "" { panic("No location header returned.") } fmt.Println("Location Header Value:", location) }() } time.Sleep(60 * time.Second) }
我期望这段代码能做的是:
GOMAXPROCS
但是,发生的是我遇到以下错误(要粘贴的错误太多,因此只包含了一部分输出):
goroutine 16680 [IO wait]: net.runtime_pollWait(0xcb1d878, 0x77, 0x0) /usr/local/Cellar/go/1.2/libexec/src/pkg/runtime/netpoll.goc:116 +0x6a net.(*pollDesc).Wait(0xc212a86ca0, 0x77, 0x55d0c0, 0x24) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:81 +0x34 net.(*pollDesc).WaitWrite(0xc212a86ca0, 0x24, 0x55d0c0) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:90 +0x30 net.(*netFD).connect(0xc212a86c40, 0x0, 0x0, 0xb4c97e8, 0xc212a84500, ...) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:86 +0x166 net.(*netFD).dial(0xc212a86c40, 0xb4c87d8, 0x0, 0xb4c87d8, 0xc212a878d0, ...) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:121 +0x2fd net.socket(0x2402c0, 0x3, 0x2, 0x1, 0x0, ...) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:91 +0x40b net.internetSocket(0x2402c0, 0x3, 0xb4c87d8, 0x0, 0xb4c87d8, ...) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/ipsock_posix.go:136 +0x161 net.dialTCP(0x2402c0, 0x3, 0x0, 0xc212a878d0, 0x0, ...) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/tcpsock_posix.go:155 +0xef net.dialSingle(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:225 +0x3d8 net.func·015(0x0, 0x0, 0x0, 0x2402c0, 0x3, ...) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:158 +0xde net.dial(0x2402c0, 0x3, 0xb4c8748, 0xc212a878d0, 0xafbbcd8, ...) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:40 +0x45 net.(*Dialer).Dial(0xafbbd78, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:165 +0x3e0 net.Dial(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:138 +0x75 net/http.(*Transport).dial(0xc210057280, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:401 +0xd4 net/http.(*Transport).dialConn(0xc210057280, 0xc2112efa80, 0x0, 0x0, 0x0) /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:444 +0x6e net/http.func·014() /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:419 +0x3e created by net/http.(*Transport).getConn /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:421 +0x11a
我正在具有16GB RAM和2.6GHz Intel Core i5处理器的Mac OSX 10.9.2笔记本电脑上运行此脚本。
我该怎么办才能用尽可能多的并发HTTP请求“淹没”我的笔记本电脑?
正如Rob Napier所建议的那样,您几乎可以肯定达到文件描述符限制。
编辑: 改进的并发版本:
该程序创建了maxgoroutine 的工作程序池,该程序将请求从通道中拉出,处理并在响应通道上发送。请求由a排队dispatcher,goroutines 由a排队workerPool,workers每次一次处理一个作业,直到请求通道为空,然后consumer处理响应通道,直到成功响应的数量等于请求的数量。
max
dispatcher
workerPool
worker
consumer
package main import ( "flag" "fmt" "log" "net/http" "runtime" "time" ) var ( reqs int max int ) func init() { flag.IntVar(&reqs, "reqs", 1000000, "Total requests") flag.IntVar(&max, "concurrent", 200, "Maximum concurrent requests") } type Response struct { *http.Response err error } // Dispatcher func dispatcher(reqChan chan *http.Request) { defer close(reqChan) for i := 0; i < reqs; i++ { req, err := http.NewRequest("GET", "http://localhost/", nil) if err != nil { log.Println(err) } reqChan <- req } } // Worker Pool func workerPool(reqChan chan *http.Request, respChan chan Response) { t := &http.Transport{} for i := 0; i < max; i++ { go worker(t, reqChan, respChan) } } // Worker func worker(t *http.Transport, reqChan chan *http.Request, respChan chan Response) { for req := range reqChan { resp, err := t.RoundTrip(req) r := Response{resp, err} respChan <- r } } // Consumer func consumer(respChan chan Response) (int64, int64) { var ( conns int64 size int64 ) for conns < int64(reqs) { select { case r, ok := <-respChan: if ok { if r.err != nil { log.Println(r.err) } else { size += r.ContentLength if err := r.Body.Close(); err != nil { log.Println(r.err) } } conns++ } } } return conns, size } func main() { flag.Parse() runtime.GOMAXPROCS(runtime.NumCPU()) reqChan := make(chan *http.Request) respChan := make(chan Response) start := time.Now() go dispatcher(reqChan) go workerPool(reqChan, respChan) conns, size := consumer(respChan) took := time.Since(start) ns := took.Nanoseconds() av := ns / conns average, err := time.ParseDuration(fmt.Sprintf("%d", av) + "ns") if err != nil { log.Println(err) } fmt.Printf("Connections:\t%d\nConcurrent:\t%d\nTotal size:\t%d bytes\nTotal time:\t%s\nAverage time:\t%s\n", conns, max, size, took, average) }
产生:
连接:1000000 并发:200 总大小:15000000字节 总时间:36m39.6778103s 平均时间:2.199677ms
警告:这 非常 迅速打系统资源限制。在我的笔记本电脑上,超过206位并发工作人员导致我的本地测试Web服务器崩溃!
操场
原始答案: 下面的程序使用缓冲区chan bool作为信号量通道,它限制了并发请求的数量。您可以调整此数字以及请求的总数,以便对系统进行压力测试并确定最大值。
chan bool
package main import ( "fmt" "net/http" "runtime" "time" ) type Resp struct { *http.Response err error } func makeResponses(reqs int, rc chan Resp, sem chan bool) { defer close(rc) defer close(sem) for reqs > 0 { select { case sem <- true: req, _ := http.NewRequest("GET", "http://localhost/", nil) transport := &http.Transport{} resp, err := transport.RoundTrip(req) r := Resp{resp, err} rc <- r reqs-- default: <-sem } } } func getResponses(rc chan Resp) int { conns := 0 for { select { case r, ok := <-rc: if ok { conns++ if r.err != nil { fmt.Println(r.err) } else { // Do something with response if err := r.Body.Close(); err != nil { fmt.Println(r.err) } } } else { return conns } } } } func main() { reqs := 100000 maxConcurrent := 1000 runtime.GOMAXPROCS(runtime.NumCPU()) rc := make(chan Resp) sem := make(chan bool, maxConcurrent) start := time.Now() go makeResponses(reqs, rc, sem) conns := getResponses(rc) end := time.Since(start) fmt.Printf("Connections: %d\nTotal time: %s\n", conns, end) }
这将打印如下内容:
连接数:100000 总时间:6m8.2554629s
该测试是在本地Web服务器上完成的,每个请求返回的总响应大小为85B,因此这不是实际的结果。另外,除了关闭它的主体之外,我不对响应进行任何处理。
在最多1000个并发请求中,我的笔记本电脑仅花了6分钟以上的时间就完成了100,000个请求,因此我估计一百万个将花费一个小时。调整maxConcurrent变量将帮助您获得系统的最佳性能。
maxConcurrent