英文:
how to run 10000 goroutines in parallel where each routine calls an api?
问题
我有以下代码,我试图调用一个API 10000次,但是我遇到了错误:
package main
import (
"fmt"
"net/http"
"runtime"
"sync"
"time"
)
func main() {
nCPU := runtime.NumCPU()
runtime.GOMAXPROCS(nCPU)
var wg sync.WaitGroup
totalRequests := 100000
wg.Add(totalRequests)
fmt.Println("Starting Go Routines")
start := time.Now()
total := 0
for i := 0; i < totalRequests; i++ {
go func(current int) {
defer wg.Done()
startFunc := time.Now()
_, err := http.Get("http://127.0.0.1:8080/event/list")
if err != nil {
fmt.Println(err)
}
elapsedFunc := time.Since(startFunc)
fmt.Println("The request (", current, ") took", elapsedFunc, "No of requests completed", total)
total++
}(i)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Println("\nThe total time with cores", elapsed)
fmt.Println("\nTerminating Program")
}
我遇到的错误是:
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 5390 ) took 1.619876633s No of requests completed 2781
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 7348 ) took 650.609825ms No of requests completed 1445
请注意,这段代码中的错误是由于打开的文件过多导致的。
英文:
I have the following code, where I am trying to call an api 10000 times but i am getting errors:
package main
import (
"fmt"
"net/http"
"runtime"
"sync"
"time"
)
func main() {
nCPU := runtime.NumCPU()
runtime.GOMAXPROCS(nCPU)
var wg sync.WaitGroup
totalRequests := 100000
wg.Add(totalRequests)
fmt.Println("Starting Go Routines")
start := time.Now()
total := 0
for i := 0; i < totalRequests; i++ {
go func(current int) {
defer wg.Done()
startFunc := time.Now()
_, err := http.Get("http://127.0.0.1:8080/event/list")
// resp, err := http.Get("https://graph.facebook.com/v2.4/me" + "?fields=id%2Cname&access_token=" + "CAACEdEose0cBAEpQvcsvVMQu5oZCyyDjcEPQi9yCdiXimm4F0AYexGHPZAJHgpyrFOJN5X1VMcicNJjlkaCquUqHMZAfRrtxx6K9cRIROrA0OmbqAqCcg8ZA3qJZCHCl68I1n4LtFb5qxPcotlP5ne5PBakK0OUa7sc6FAOWwByOnFtNZBpIe8XDeM4YFa33sDfftVUpZCoBgZDZD")
if err != nil {
fmt.Println(err)
}
// defer resp.Body.Close()
elapsedFunc := time.Since(startFunc)
fmt.Println("The request (", current, ") took", elapsedFunc, "No of requests completed", total)
total++
}(i)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Println("\nThe total time with cores", elapsed)
fmt.Println("\nTerminating Program")
}
The errors I am getting:
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 5390 ) took 1.619876633s No of requests completed 2781
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 7348 ) took 650.609825ms No of requests completed 1445
答案1
得分: 5
正如其他人在评论中提到的,你的主要问题是超过了进程的打开文件限制。
你可以使用通道来实现信号量,以限制并发:
totalRequests := 100000
concurrency := 1024
sem := make(chan bool, concurrency)
start := time.Now()
total := int32(0)
for i := 0; i < totalRequests; i++ {
sem <- true
go func(current int) {
startTime := time.Now()
// 在这里进行请求
elapsedTime := time.Since(startTime)
atomic.AddInt32(&total, 1)
fmt.Printf("请求 %d 耗时 %s。已完成的请求数:%d\n", current, elapsedTime, atomic.LoadInt32(&total))
<-sem
}(i)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
elapsedTotal := time.Since(start)
fmt.Printf("\n总耗时:%s\n", elapsedTotal)
这将限制并行请求的数量,具体数量由 concurrency
指定。
正如你所看到的,total
变量使用 atomic
包进行递增,因为我们从可能并行的 goroutine 中修改该变量,如果不安全地修改,可能会导致错误的总数。
参考这篇博文中的原始示例和关于在 Go 中限制并发的解释:http://jmoiron.net/blog/limiting-concurrency-in-go
编辑:
正如 JimB 在下面提到的,另一种常见的方法是有 concurrency
个 goroutine 来执行工作,同时我们将工作分配给它们。下面是一个通用的 do
函数,可以用于这个目的:
func do(total, concurrency int, fn func(int)) {
workQueue := make(chan int, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
for i := range workQueue {
fn(i)
}
wg.Done()
}()
}
for i := 0; i < total; i++ {
workQueue <- i
}
close(workQueue)
wg.Wait()
}
我们创建了 concurrency
个 goroutine,然后开始向 workQueue
通道发送值,直到发送了 total
个值。通过关闭 workQueue
通道,我们有效地终止了 goroutine 中的循环。之后,我们只需等待所有剩余的 goroutine 运行完毕。
对于这个问题的用例,可以这样使用:
totalRequests := 1000000
concurrency := 1024
do(totalRequests, concurrency, func(i int) {
// 在这里进行请求
fmt.Printf("请求 %d 完成。\n", i)
})
希望对你有帮助!
英文:
As others mentioned in the comments, your main issue is that you are exceeding the open file limit of the process.
You can easily implement a semaphore using channels to limit concurrency:
totalRequests := 100000
concurrency := 1024
sem := make(chan bool, concurrency)
start := time.Now()
total := int32(0)
for i := 0; i < totalRequests; i++ {
sem <- true
go func(current int) {
startTime := time.Now()
// Make request here
elapsedTime := time.Since(startTime)
atomic.AddInt32(&total, 1)
fmt.Printf("Request %d took %s. Requests completed: %d\n", current, elapsedTime, atomic.LoadInt32(&total))
<-sem
}(i)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
elapsedTotal := time.Since(start)
fmt.Printf("\nTotal time elapsed: %s\n", elapsedTotal)
This will limit the number of parallel requests to whatever is specified in concurrency
.
As you can see, the total
variable is incremented using the atomic
package since we are modifying that variable from potentially parallel goroutines, which could have produced an incorrect total when modified unsafely, as you did.
See this blog post for the original example & explanation of limiting concurrency in Go: http://jmoiron.net/blog/limiting-concurrency-in-go
EDIT:
As mentioned by JimB below, another common approach is to have concurrency
number of goroutines doing the work while we feed it to them. Here's a generic do
function that one might use for this:
func do(total, concurrency int, fn func(int)) {
workQueue := make(chan int, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
for i := range workQueue {
fn(i)
}
wg.Done()
}()
}
for i := 0; i < total; i++ {
workQueue <- i
}
close(workQueue)
wg.Wait()
}
We spawn concurrency
goroutines and then start sending values to the workQueue
channel until total
is sent. By closing the workQueue
channel we effectively terminate the range loops in our goroutines. After that we just wait until all the remaining goroutines finish running.
For the use case in question, it could be used like this:
totalRequests := 1000000
concurrency := 1024
do(totalRequests, concurrency, func(i int) {
// Make request here
fmt.Printf("Request %d done.\n", i)
})
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论