Golang:限制阻塞操作的并发级别

huangapple go评论75阅读模式
英文:

Golang: limit concurrency levels of a blocking operation

问题

我有以下情景:
我在一个频道上收到一条消息,告诉我要上传一个文件。上传是通过阻塞函数uploadToServer完成的。zipGen频道可能每秒接收到多条消息,我希望同时上传最多5个文件(不超过5个,但可能更少,这取决于第三个工作人员在zipGen上发送了多少消息,这超出了本问题的范围)。

listenToZips函数在一个go例程中运行(在文件的init函数上使用go listenToZips()):

func listenToZips() {
    for {
        select {
        case zip := <-zipGen:
            uploadToServer(zip) // 这是阻塞的
        }
    }
}

如果我使用go uploadToServer(zip)而不是只使用uploadToServer(zip),会出现太多并发(例如,我的程序将尝试同时上传10个文件,但我只想最多上传5个)。

另一方面,如果没有使用go uploadToServer(zip)(只是像上面的函数中使用uploadToServer(zip)),我一次只能上传一个文件(因为uploadToServer(zip)是阻塞的)。

如何实现这种控制水平,以允许我最多同时上传5个文件?

谢谢!

英文:

I have the following scenario:
I am receiving a message on a channel telling me to upload a file. The upload is made by the blocking function uploadToServer. The zipGen channel may receive several messages per second, and I want to upload maximum 5 files simultaneously (not more, but possibly less - depending on how many messages are sent on zipGen by a third worker that is out of the scope of this question).

The listenToZips function runs inside a go routine (go listenToZips() on the file's init function):

func listenToZips() {
	for {
		select {
		case zip := &lt;-zipGen:
          uploadToServer(zip) // this is blocking
        }
    }
}

If I launch go uploadToServer(zip) instead of just uploadToServer(zip) - I get too much concurrency (so for example my program will try to upload 10 files at the same time, but I want a maximum of 5).

On the other hand, without go uploadToServer(zip) (just using uploadToServer(zip) like in the above function), I only upload one file at a time (since the uploadToServer(zip) is blocking).

How can I achieve this level of control to allow me a max upload of 5 files simultaneously?

Thanks!

答案1

得分: 3

最简单的选择是预先生成N个goroutine,它们从通道中获取输入,并在循环中将其上传。在每个goroutine的上下文中,操作将会阻塞,但是N个goroutine会同时执行这个操作。当然,每个消息只会被一个goroutine接收。

func listenToZips(concurrent int) {
    for i := 0; i < concurrent; i++ {
        // 生成一个监听goroutine
        go func() {
            for {
                select {
                case zip := <-zipGen:
                    uploadToServer(zip) // 这里是阻塞的操作
                }
            }
        }()
    }
}

当然,你可以添加停止条件,可能使用一个不同的通道,但基本思路是一样的。

英文:

The simplest option - prespawn N goroutines that take input from the channel, and upload it, in a loop. In each goroutine's context the operation will be blocking, but N goroutines do this. Only one goroutine will receive each message, of course.

func listenToZips(concurrent int) {

    for i:=0; i &lt; concurrent; i++ {

      // spawn a listener goroutine
      go func() {

         for {
            select {
            case zip := &lt;-zipGen:
               uploadToServer(zip) // this is blocking
            }
         }
      }()

   }

}

Of course you can then add stop condition, probably using a different channel, but the basic idea is just the same.

答案2

得分: 1

请尝试这个链接:https://github.com/korovkin/limiter

limiter := NewConcurrencyLimiter(10)
limiter.Execute(func() {
uploadToServer()
})
limiter.Wait()

英文:

try this:
https://github.com/korovkin/limiter

 limiter := NewConcurrencyLimiter(10)
 limiter.Execute(func() {
  		uploadToServer()
 })
 limiter.Wait()

huangapple
  • 本文由 发表于 2015年9月2日 21:02:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/32353942.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定