如何并行运行10000个goroutine,其中每个goroutine都调用一个API?

huangapple go评论95阅读模式
英文:

how to run 10000 goroutines in parallel where each routine calls an api?

问题

我有以下代码,我试图调用一个API 10000次,但是我遇到了错误:

package main

import (
    "fmt"
    "net/http"
    "runtime"
    "sync"
    "time"
)

func main() {
    nCPU := runtime.NumCPU()
    runtime.GOMAXPROCS(nCPU)

    var wg sync.WaitGroup
    totalRequests := 100000
    wg.Add(totalRequests)

    fmt.Println("Starting Go Routines")

    start := time.Now()
    total := 0

    for i := 0; i < totalRequests; i++ {
        go func(current int) {
            defer wg.Done()

            startFunc := time.Now()
            _, err := http.Get("http://127.0.0.1:8080/event/list")

            if err != nil {
                fmt.Println(err)
            }

            elapsedFunc := time.Since(startFunc)
            fmt.Println("The request (", current, ") took", elapsedFunc, "No of requests completed", total)
            total++
        }(i)
    }

    wg.Wait()
    elapsed := time.Since(start)
    fmt.Println("\nThe total time with cores", elapsed)
    fmt.Println("\nTerminating Program")
}

我遇到的错误是:

Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 5390 ) took 1.619876633s No of requests completed 2781
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 7348 ) took 650.609825ms No of requests completed 1445

请注意,这段代码中的错误是由于打开的文件过多导致的。

英文:

I have the following code, where I am trying to call an api 10000 times but i am getting errors:

package main

import (
    &quot;fmt&quot;

    &quot;net/http&quot;
    &quot;runtime&quot;
    &quot;sync&quot;
    &quot;time&quot;
)

func main() {

    nCPU := runtime.NumCPU()
    runtime.GOMAXPROCS(nCPU)

    var wg sync.WaitGroup
    totalRequests := 100000
    wg.Add(totalRequests)

    fmt.Println(&quot;Starting Go Routines&quot;)

    start := time.Now()
    total := 0

    for i := 0; i &lt; totalRequests; i++ {

        go func(current int) {
            defer wg.Done()

            startFunc := time.Now()
            _, err := http.Get(&quot;http://127.0.0.1:8080/event/list&quot;)
            // resp, err := http.Get(&quot;https://graph.facebook.com/v2.4/me&quot; + &quot;?fields=id%2Cname&amp;access_token=&quot; + &quot;CAACEdEose0cBAEpQvcsvVMQu5oZCyyDjcEPQi9yCdiXimm4F0AYexGHPZAJHgpyrFOJN5X1VMcicNJjlkaCquUqHMZAfRrtxx6K9cRIROrA0OmbqAqCcg8ZA3qJZCHCl68I1n4LtFb5qxPcotlP5ne5PBakK0OUa7sc6FAOWwByOnFtNZBpIe8XDeM4YFa33sDfftVUpZCoBgZDZD&quot;)

            if err != nil {
                fmt.Println(err)
            }
            // defer resp.Body.Close()
            elapsedFunc := time.Since(startFunc)
            fmt.Println(&quot;The request (&quot;, current, &quot;) took&quot;, elapsedFunc, &quot;No of requests completed&quot;, total)
            total++

        }(i)

    }

    wg.Wait()
    elapsed := time.Since(start)
    fmt.Println(&quot;\nThe total time with cores&quot;, elapsed)
    fmt.Println(&quot;\nTerminating Program&quot;)
}

The errors I am getting:

Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 5390 ) took 1.619876633s No of requests completed 2781
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 7348 ) took 650.609825ms No of requests completed 1445

答案1

得分: 5

正如其他人在评论中提到的,你的主要问题是超过了进程的打开文件限制。

你可以使用通道来实现信号量,以限制并发:

totalRequests := 100000
concurrency := 1024
sem := make(chan bool, concurrency)

start := time.Now()
total := int32(0)

for i := 0; i < totalRequests; i++ {
    sem <- true

    go func(current int) {
        startTime := time.Now()

        // 在这里进行请求

        elapsedTime := time.Since(startTime)
        atomic.AddInt32(&total, 1)
        fmt.Printf("请求 %d 耗时 %s。已完成的请求数:%d\n", current, elapsedTime, atomic.LoadInt32(&total))

        <-sem
    }(i)
}

for i := 0; i < cap(sem); i++ {
    sem <- true
}
elapsedTotal := time.Since(start)
fmt.Printf("\n总耗时:%s\n", elapsedTotal)

这将限制并行请求的数量,具体数量由 concurrency 指定。

正如你所看到的,total 变量使用 atomic 包进行递增,因为我们从可能并行的 goroutine 中修改该变量,如果不安全地修改,可能会导致错误的总数。

参考这篇博文中的原始示例和关于在 Go 中限制并发的解释:http://jmoiron.net/blog/limiting-concurrency-in-go

编辑:

正如 JimB 在下面提到的,另一种常见的方法是有 concurrency 个 goroutine 来执行工作,同时我们将工作分配给它们。下面是一个通用的 do 函数,可以用于这个目的:

func do(total, concurrency int, fn func(int)) {
    workQueue := make(chan int, concurrency)

    var wg sync.WaitGroup
    wg.Add(concurrency)

    for i := 0; i < concurrency; i++ {
        go func() {
            for i := range workQueue {
                fn(i)
            }
            wg.Done()
        }()
    }
    for i := 0; i < total; i++ {
        workQueue <- i
    }
    close(workQueue)
    wg.Wait()
}

我们创建了 concurrency 个 goroutine,然后开始向 workQueue 通道发送值,直到发送了 total 个值。通过关闭 workQueue 通道,我们有效地终止了 goroutine 中的循环。之后,我们只需等待所有剩余的 goroutine 运行完毕。

对于这个问题的用例,可以这样使用:

totalRequests := 1000000
concurrency := 1024

do(totalRequests, concurrency, func(i int) {
    // 在这里进行请求

    fmt.Printf("请求 %d 完成。\n", i)
})

希望对你有帮助!

英文:

As others mentioned in the comments, your main issue is that you are exceeding the open file limit of the process.

You can easily implement a semaphore using channels to limit concurrency:

totalRequests := 100000
concurrency := 1024
sem := make(chan bool, concurrency)

start := time.Now()
total := int32(0)

for i := 0; i &lt; totalRequests; i++ {
	sem &lt;- true

	go func(current int) {
		startTime := time.Now()

		// Make request here

		elapsedTime := time.Since(startTime)
		atomic.AddInt32(&amp;total, 1)
		fmt.Printf(&quot;Request %d took %s. Requests completed: %d\n&quot;, current, elapsedTime, atomic.LoadInt32(&amp;total))

		&lt;-sem
	}(i)
}

for i := 0; i &lt; cap(sem); i++ {
	sem &lt;- true
}
elapsedTotal := time.Since(start)
fmt.Printf(&quot;\nTotal time elapsed: %s\n&quot;, elapsedTotal)

This will limit the number of parallel requests to whatever is specified in concurrency.

As you can see, the total variable is incremented using the atomic package since we are modifying that variable from potentially parallel goroutines, which could have produced an incorrect total when modified unsafely, as you did.

See this blog post for the original example & explanation of limiting concurrency in Go: http://jmoiron.net/blog/limiting-concurrency-in-go

EDIT:

As mentioned by JimB below, another common approach is to have concurrency number of goroutines doing the work while we feed it to them. Here's a generic do function that one might use for this:

func do(total, concurrency int, fn func(int)) {
	workQueue := make(chan int, concurrency)

	var wg sync.WaitGroup
	wg.Add(concurrency)

	for i := 0; i &lt; concurrency; i++ {
		go func() {
			for i := range workQueue {
				fn(i)
			}
			wg.Done()
		}()
	}
	for i := 0; i &lt; total; i++ {
		workQueue &lt;- i
	}
	close(workQueue)
	wg.Wait()
}

We spawn concurrency goroutines and then start sending values to the workQueue channel until total is sent. By closing the workQueue channel we effectively terminate the range loops in our goroutines. After that we just wait until all the remaining goroutines finish running.

For the use case in question, it could be used like this:

totalRequests := 1000000
concurrency := 1024

do(totalRequests, concurrency, func(i int) {
    // Make request here

	fmt.Printf(&quot;Request %d done.\n&quot;, i)
})

huangapple
  • 本文由 发表于 2015年10月13日 21:38:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/33104192.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定