如何有效地“最大化”并发的HTTP请求?

huangapple go评论82阅读模式
英文:

How Can I Effectively 'Max Out' Concurrent HTTP Requests?

问题

我目前正在尝试使用Go进行一项实验。以下是我尝试做的事情:

我有一个正在运行的REST API服务,并且我想尽可能多地使用Goroutine查询特定的URL,以查看这些响应的性能(通过查看我的REST API服务器日志)。在退出程序之前,我想发送总共100万个HTTP请求,尽可能并发地执行这些请求,以充分利用我的计算机。

我知道有一些工具可以做到这一点,但我主要关注如何在Go中使用goroutine来最大化我的HTTP并发。

以下是我的代码:

package main

import (
    "fmt"
    "net/http"
    "runtime"
    "time"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    transport := &http.Transport{}

    for i := 0; i < 1000000; i++ {
        go func() {
            req, _ := http.NewRequest("GET", "http://myapi.com", nil)
            req.Header.Set("User-Agent", "custom-agent")
            req.SetBasicAuth("xxx", "xxx")
            resp, err := transport.RoundTrip(req)
            if err != nil {
                panic("HTTP request failed.")
            }
            defer resp.Body.Close()

            if resp.StatusCode != 302 {
                panic("Unexpected response returned.")
            }

            location := resp.Header.Get("Location")
            if location == "" {
                panic("No location header returned.")
            }
            fmt.Println("Location Header Value:", location)
        }()
    }

    time.Sleep(60 * time.Second)
}

我期望这段代码能够实现以下功能:

  • 启动100万个goroutine,每个goroutine都向我的API服务发送HTTP请求。
  • 并发地在所有CPU上运行这些goroutine(因为我使用了runtime包来增加GOMAXPROCS设置)。

然而,实际发生的是,我得到了以下错误(错误太多,无法全部粘贴,所以我只包含了一部分输出):

goroutine 16680 [IO wait]:
net.runtime_pollWait(0xcb1d878, 0x77, 0x0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/runtime/netpoll.goc:116 +0x6a
net.(*pollDesc).Wait(0xc212a86ca0, 0x77, 0x55d0c0, 0x24)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:81 +0x34
net.(*pollDesc).WaitWrite(0xc212a86ca0, 0x24, 0x55d0c0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:90 +0x30
net.(*netFD).connect(0xc212a86c40, 0x0, 0x0, 0xb4c97e8, 0xc212a84500, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:86 +0x166
net.(*netFD).dial(0xc212a86c40, 0xb4c87d8, 0x0, 0xb4c87d8, 0xc212a878d0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:121 +0x2fd
net.socket(0x2402c0, 0x3, 0x2, 0x1, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:91 +0x40b
net.internetSocket(0x2402c0, 0x3, 0xb4c87d8, 0x0, 0xb4c87d8, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/ipsock_posix.go:136 +0x161
net.dialTCP(0x2402c0, 0x3, 0x0, 0xc212a878d0, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/tcpsock_posix.go:155 +0xef
net.dialSingle(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:225 +0x3d8
net.func·015(0x0, 0x0, 0x0, 0x2402c0, 0x3, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:158 +0xde
net.dial(0x2402c0, 0x3, 0xb4c8748, 0xc212a878d0, 0xafbbcd8, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:40 +0x45
net.(*Dialer).Dial(0xafbbd78, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:165 +0x3e0
net.Dial(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:138 +0x75
net/http.(*Transport).dial(0xc210057280, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:401 +0xd4
net/http.(*Transport).dialConn(0xc210057280, 0xc2112efa80, 0x0, 0x0, 0x0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:444 +0x6e
net/http.func·014()
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:419 +0x3e
created by net/http.(*Transport).getConn
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:421 +0x11a

我在一台配有16GB内存和2.6GHz英特尔Core i5处理器的Mac OSX 10.9.2笔记本电脑上运行此脚本。

我该如何尽可能多地向我的笔记本电脑发送并发的HTTP请求?

英文:

I'm currently trying a bit of an experiment with Go. Here's what I'm attempting to do:

I've got a REST API service running, and I'd like to query a specific URL over and over again in as many Goroutines as possible, to see how performant these responses are (by viewing my REST API server logs). I'd like to send off a total of 1 million HTTP requests before quitting the program -- executing as many concurrently as my computer will allow.

I'm aware that there are tools which are meant to do this, but I'm primarily interested in how to maximize my HTTP concurrency in Go with goroutines.

Here's my code:

package main

import (
    &quot;fmt&quot;
    &quot;net/http&quot;
    &quot;runtime&quot;
    &quot;time&quot;
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    transport := &amp;http.Transport{}

    for i := 0; i &lt; 1000000; i++ {
        go func() {
            req, _ := http.NewRequest(&quot;GET&quot;, &quot;http://myapi.com&quot;, nil)
            req.Header.Set(&quot;User-Agent&quot;, &quot;custom-agent&quot;)
            req.SetBasicAuth(&quot;xxx&quot;, &quot;xxx&quot;)
            resp, err := transport.RoundTrip(req)
            if err != nil {
                panic(&quot;HTTP request failed.&quot;)
            }
            defer resp.Body.Close()

            if resp.StatusCode != 302 {
                panic(&quot;Unexpected response returned.&quot;)
            }

            location := resp.Header.Get(&quot;Location&quot;)
            if location == &quot;&quot; {
                panic(&quot;No location header returned.&quot;)
            }
            fmt.Println(&quot;Location Header Value:&quot;, location)
        }()
    }

    time.Sleep(60 * time.Second)
}

What I'm expecting this code to do is:

  • Start 1,000,000 goroutines, each one making HTTP requests to my API service.
  • Run the goroutines concurrently across all of my CPUs (since I used the runtime package to increase the GOMAXPROCS setting).

What happens, however, is that I get the following errors (too many to paste, so I'm only including a bit of the output):

goroutine 16680 [IO wait]:
net.runtime_pollWait(0xcb1d878, 0x77, 0x0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/runtime/netpoll.goc:116 +0x6a
net.(*pollDesc).Wait(0xc212a86ca0, 0x77, 0x55d0c0, 0x24)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:81 +0x34
net.(*pollDesc).WaitWrite(0xc212a86ca0, 0x24, 0x55d0c0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:90 +0x30
net.(*netFD).connect(0xc212a86c40, 0x0, 0x0, 0xb4c97e8, 0xc212a84500, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:86 +0x166
net.(*netFD).dial(0xc212a86c40, 0xb4c87d8, 0x0, 0xb4c87d8, 0xc212a878d0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:121 +0x2fd
net.socket(0x2402c0, 0x3, 0x2, 0x1, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:91 +0x40b
net.internetSocket(0x2402c0, 0x3, 0xb4c87d8, 0x0, 0xb4c87d8, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/ipsock_posix.go:136 +0x161
net.dialTCP(0x2402c0, 0x3, 0x0, 0xc212a878d0, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/tcpsock_posix.go:155 +0xef
net.dialSingle(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:225 +0x3d8
net.func&#183;015(0x0, 0x0, 0x0, 0x2402c0, 0x3, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:158 +0xde
net.dial(0x2402c0, 0x3, 0xb4c8748, 0xc212a878d0, 0xafbbcd8, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:40 +0x45
net.(*Dialer).Dial(0xafbbd78, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:165 +0x3e0
net.Dial(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:138 +0x75
net/http.(*Transport).dial(0xc210057280, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:401 +0xd4
net/http.(*Transport).dialConn(0xc210057280, 0xc2112efa80, 0x0, 0x0, 0x0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:444 +0x6e
net/http.func&#183;014()
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:419 +0x3e
created by net/http.(*Transport).getConn
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:421 +0x11a

I'm running this script on a Mac OSX 10.9.2 laptop with 16GB of RAM and a 2.6GHz Intel Core i5 processor.

What can I do to 'flood' my laptop with as many concurrent HTTP requests as possible?

答案1

得分: 18

根据Rob Napier的建议,你几乎肯定是遇到了文件描述符限制。

__编辑:__改进的并发版本:

该程序创建了一个最大为max的goroutine工作池,它从一个通道中获取请求,处理它们,并将它们发送到响应通道。请求由dispatcher排队,goroutine由workerPool启动,worker每次处理一个作业,直到请求通道为空,consumer处理响应通道,直到成功响应的数量等于请求数量。

package main

import (
	"flag"
	"fmt"
	"log"
	"net/http"
	"runtime"
	"time"
)

var (
	reqs int
	max  int
)

func init() {
	flag.IntVar(&reqs, "reqs", 1000000, "Total requests")
	flag.IntVar(&max, "concurrent", 200, "Maximum concurrent requests")
}

type Response struct {
	*http.Response
	err error
}

// Dispatcher
func dispatcher(reqChan chan *http.Request) {
	defer close(reqChan)
	for i := 0; i < reqs; i++ {
		req, err := http.NewRequest("GET", "http://localhost/", nil)
		if err != nil {
			log.Println(err)
		}
		reqChan <- req
	}
}

// Worker Pool
func workerPool(reqChan chan *http.Request, respChan chan Response) {
	t := &http.Transport{}
	for i := 0; i < max; i++ {
		go worker(t, reqChan, respChan)
	}
}

// Worker
func worker(t *http.Transport, reqChan chan *http.Request, respChan chan Response) {
	for req := range reqChan {
		resp, err := t.RoundTrip(req)
		r := Response{resp, err}
		respChan <- r
	}
}

// Consumer
func consumer(respChan chan Response) (int64, int64) {
	var (
		conns int64
		size  int64
	)
	for conns < int64(reqs) {
		select {
		case r, ok := <-respChan:
			if ok {
				if r.err != nil {
					log.Println(r.err)
				} else {
					size += r.ContentLength
					if err := r.Body.Close(); err != nil {
						log.Println(r.err)
					}
				}
				conns++
			}
		}
	}
	return conns, size
}

func main() {
	flag.Parse()
	runtime.GOMAXPROCS(runtime.NumCPU())
	reqChan := make(chan *http.Request)
	respChan := make(chan Response)
	start := time.Now()
	go dispatcher(reqChan)
	go workerPool(reqChan, respChan)
	conns, size := consumer(respChan)
	took := time.Since(start)
	ns := took.Nanoseconds()
	av := ns / conns
	average, err := time.ParseDuration(fmt.Sprintf("%d", av) + "ns")
	if err != nil {
		log.Println(err)
	}
	fmt.Printf("Connections:\t%d\nConcurrent:\t%d\nTotal size:\t%d bytes\nTotal time:\t%s\nAverage time:\t%s\n", conns, max, size, took, average)
}

输出结果为:

Connections: 1000000
Concurrent: 200
Total size: 15000000 bytes
Total time: 36m39.6778103s
Average time: 2.199677ms

警告:这个程序会非常快地达到系统资源限制。在我的笔记本电脑上,超过206个并发工作线程会导致我的本地测试Web服务器崩溃!

Playground

原始答案:
下面的程序使用带缓冲的chan bool作为信号量通道,限制并发请求的数量。你可以调整这个数字和总请求数量,以对系统进行压力测试并确定极限。

package main

import (
	"fmt"
	"net/http"
	"runtime"
	"time"
)

type Resp struct {
	*http.Response
	err error
}

func makeResponses(reqs int, rc chan Resp, sem chan bool) {
	defer close(rc)
	defer close(sem)
	for reqs > 0 {
		select {
		case sem <- true:
			req, _ := http.NewRequest("GET", "http://localhost/", nil)
			transport := &http.Transport{}
			resp, err := transport.RoundTrip(req)
			r := Resp{resp, err}
			rc <- r
			reqs--
		default:
			<-sem
		}
	}
}

func getResponses(rc chan Resp) int {
	conns := 0
	for {
		select {
		case r, ok := <-rc:
			if ok {
				conns++
				if r.err != nil {
					fmt.Println(r.err)
				} else {
					// Do something with response
					if err := r.Body.Close(); err != nil {
						fmt.Println(r.err)
					}
				}
			} else {
				return conns
			}
		}
	}
}

func main() {
	reqs := 100000
	maxConcurrent := 1000
	runtime.GOMAXPROCS(runtime.NumCPU())
	rc := make(chan Resp)
	sem := make(chan bool, maxConcurrent)
	start := time.Now()
	go makeResponses(reqs, rc, sem)
	conns := getResponses(rc)
	end := time.Since(start)
	fmt.Printf("Connections: %d\nTotal time: %s\n", conns, end)
}

这将打印类似以下的结果:

Connections: 100000
Total time: 6m8.2554629s

这个测试是在一个本地Web服务器上进行的,每个请求返回的总响应大小为85B,所以这不是一个真实的结果。此外,我对响应没有进行任何处理,只是关闭了它的body。

在最大并发请求为1000的情况下,我的笔记本电脑花费了6分钟多来完成100,000个请求,所以我猜测一百万个请求可能需要一个多小时。调整maxConcurrent变量应该有助于找到系统的最佳性能。

英文:

As Rob Napier suggested, your almost certainly hitting file descriptor limits.

EDIT: Improved, concurrent version:

This program creates a worker pool of max goroutines, which pull requests off a channel, process them, and send them on a response channel. The requests are queued by a dispatcher, the goroutines are started by a workerPool, the workers each process one job at a time until the request channel is empty, and the consumer processes the response channel until the number of successful responses equals the number of requests.

package main
import (
&quot;flag&quot;
&quot;fmt&quot;
&quot;log&quot;
&quot;net/http&quot;
&quot;runtime&quot;
&quot;time&quot;
)
var (
reqs int
max  int
)
func init() {
flag.IntVar(&amp;reqs, &quot;reqs&quot;, 1000000, &quot;Total requests&quot;)
flag.IntVar(&amp;max, &quot;concurrent&quot;, 200, &quot;Maximum concurrent requests&quot;)
}
type Response struct {
*http.Response
err error
}
// Dispatcher
func dispatcher(reqChan chan *http.Request) {
defer close(reqChan)
for i := 0; i &lt; reqs; i++ {
req, err := http.NewRequest(&quot;GET&quot;, &quot;http://localhost/&quot;, nil)
if err != nil {
log.Println(err)
}
reqChan &lt;- req
}
}
// Worker Pool
func workerPool(reqChan chan *http.Request, respChan chan Response) {
t := &amp;http.Transport{}
for i := 0; i &lt; max; i++ {
go worker(t, reqChan, respChan)
}
}
// Worker
func worker(t *http.Transport, reqChan chan *http.Request, respChan chan Response) {
for req := range reqChan {
resp, err := t.RoundTrip(req)
r := Response{resp, err}
respChan &lt;- r
}
}
// Consumer
func consumer(respChan chan Response) (int64, int64) {
var (
conns int64
size  int64
)
for conns &lt; int64(reqs) {
select {
case r, ok := &lt;-respChan:
if ok {
if r.err != nil {
log.Println(r.err)
} else {
size += r.ContentLength
if err := r.Body.Close(); err != nil {
log.Println(r.err)
}
}
conns++
}
}
}
return conns, size
}
func main() {
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())
reqChan := make(chan *http.Request)
respChan := make(chan Response)
start := time.Now()
go dispatcher(reqChan)
go workerPool(reqChan, respChan)
conns, size := consumer(respChan)
took := time.Since(start)
ns := took.Nanoseconds()
av := ns / conns
average, err := time.ParseDuration(fmt.Sprintf(&quot;%d&quot;, av) + &quot;ns&quot;)
if err != nil {
log.Println(err)
}
fmt.Printf(&quot;Connections:\t%d\nConcurrent:\t%d\nTotal size:\t%d bytes\nTotal time:\t%s\nAverage time:\t%s\n&quot;, conns, max, size, took, average)
}

Produces:

>Connections: 1000000
Concurrent: 200
Total size: 15000000 bytes
Total time: 36m39.6778103s
Average time: 2.199677ms

WARNING: This very rapidly hits system resource limits. On my laptop, anything more than 206 concurrent workers caused my local test web server to crash!

Playground

ORIGINAL ANSWER:
The program below uses a buffered chan bool as a semaphore channel, which limits the number of concurrent requests. You can tweak this number, and the total number of requests in order to stress test your system and determine maxima.

package main
import (
&quot;fmt&quot;
&quot;net/http&quot;
&quot;runtime&quot;
&quot;time&quot;
)
type Resp struct {
*http.Response
err error
}
func makeResponses(reqs int, rc chan Resp, sem chan bool) {
defer close(rc)
defer close(sem)
for reqs &gt; 0 {
select {
case sem &lt;- true:
req, _ := http.NewRequest(&quot;GET&quot;, &quot;http://localhost/&quot;, nil)
transport := &amp;http.Transport{}
resp, err := transport.RoundTrip(req)
r := Resp{resp, err}
rc &lt;- r
reqs--
default:
&lt;-sem
}
}
}
func getResponses(rc chan Resp) int {
conns := 0
for {
select {
case r, ok := &lt;-rc:
if ok {
conns++
if r.err != nil {
fmt.Println(r.err)
} else {
// Do something with response
if err := r.Body.Close(); err != nil {
fmt.Println(r.err)
}
}
} else {
return conns
}
}
}
}
func main() {
reqs := 100000
maxConcurrent := 1000
runtime.GOMAXPROCS(runtime.NumCPU())
rc := make(chan Resp)
sem := make(chan bool, maxConcurrent)
start := time.Now()
go makeResponses(reqs, rc, sem)
conns := getResponses(rc)
end := time.Since(start)
fmt.Printf(&quot;Connections: %d\nTotal time: %s\n&quot;, conns, end)
}

This will print something like:

>Connections: 100000
Total time: 6m8.2554629s

This test was done on a local web server, which returned a total response size of 85B per request, so it's not a realistic result. Also, I'm doing no processing on the response, except to close it's body.

At a maximum of 1000 concurrent requests it took my laptop just over 6 minutes to do 100,000 requests so I'm guessing a million would take over an hour. Tweaking the maxConcurrent variable should help you home in the maximum performance for your system.

答案2

得分: 3

你几乎肯定遇到了文件描述符限制。默认限制是2560(旧限制是256,但我认为他们在某个时候将其扩大了10倍)。我相当确定你可以将其设置为最高的10000。

我不知道你是否能够通过这种方式从一台机器上获得一百万个同时连接。你可以尝试使用进程和goroutine的混合方式:每个进程1000个goroutine,但我不会感到惊讶,如果你仍然遇到系统范围的限制。

为了达到你的目标,我认为你需要进行速率限制(使用带缓冲通道的信号量),这样如果目标只是尽可能快地从一个主机(和一个网络卡)访问API,你就不会同时建立超过几千个连接。

英文:

You're almost certainly running into a file descriptor limit. The default limit is 2560 (the old limit was 256, but I think they x10'd it at some point). I'm fairly certain the highest you can set it is 10,000.

I don't know if you'll ever be able to get a million simultaneous connections out of one machine this way. You may want to try a hybrid of processes and goroutines: 10k processes at 1000 goroutines per process, but I would not be surprised if you run into the systemwide limits anyway.

To get what you want, I believe you're going to need to rate limit (with a buffered channel semaphore) so that you're not making more than several thousand connections at the same time if the goal is just to hit the API as hard as you can simply and from one host (and one network card).

huangapple
  • 本文由 发表于 2014年4月27日 09:38:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/23318419.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定