英文:
Golang: limit concurrency levels of a blocking operation
问题
我有以下情景:
我在一个频道上收到一条消息,告诉我要上传一个文件。上传是通过阻塞函数uploadToServer
完成的。zipGen
频道可能每秒接收到多条消息,我希望同时上传最多5个文件(不超过5个,但可能更少,这取决于第三个工作人员在zipGen
上发送了多少消息,这超出了本问题的范围)。
listenToZips
函数在一个go例程中运行(在文件的init
函数上使用go listenToZips()
):
func listenToZips() {
for {
select {
case zip := <-zipGen:
uploadToServer(zip) // 这是阻塞的
}
}
}
如果我使用go uploadToServer(zip)
而不是只使用uploadToServer(zip)
,会出现太多并发(例如,我的程序将尝试同时上传10个文件,但我只想最多上传5个)。
另一方面,如果没有使用go uploadToServer(zip)
(只是像上面的函数中使用uploadToServer(zip)
),我一次只能上传一个文件(因为uploadToServer(zip)
是阻塞的)。
如何实现这种控制水平,以允许我最多同时上传5个文件?
谢谢!
英文:
I have the following scenario:
I am receiving a message on a channel telling me to upload a file. The upload is made by the blocking function uploadToServer
. The zipGen
channel may receive several messages per second, and I want to upload maximum 5 files simultaneously (not more, but possibly less - depending on how many messages are sent on zipGen
by a third worker that is out of the scope of this question).
The listenToZips
function runs inside a go routine (go listenToZips()
on the file's init
function):
func listenToZips() {
for {
select {
case zip := <-zipGen:
uploadToServer(zip) // this is blocking
}
}
}
If I launch go uploadToServer(zip)
instead of just uploadToServer(zip)
- I get too much concurrency (so for example my program will try to upload 10 files at the same time, but I want a maximum of 5).
On the other hand, without go uploadToServer(zip)
(just using uploadToServer(zip)
like in the above function), I only upload one file at a time (since the uploadToServer(zip)
is blocking).
How can I achieve this level of control to allow me a max upload of 5 files simultaneously?
Thanks!
答案1
得分: 3
最简单的选择是预先生成N个goroutine,它们从通道中获取输入,并在循环中将其上传。在每个goroutine的上下文中,操作将会阻塞,但是N个goroutine会同时执行这个操作。当然,每个消息只会被一个goroutine接收。
func listenToZips(concurrent int) {
for i := 0; i < concurrent; i++ {
// 生成一个监听goroutine
go func() {
for {
select {
case zip := <-zipGen:
uploadToServer(zip) // 这里是阻塞的操作
}
}
}()
}
}
当然,你可以添加停止条件,可能使用一个不同的通道,但基本思路是一样的。
英文:
The simplest option - prespawn N goroutines that take input from the channel, and upload it, in a loop. In each goroutine's context the operation will be blocking, but N goroutines do this. Only one goroutine will receive each message, of course.
func listenToZips(concurrent int) {
for i:=0; i < concurrent; i++ {
// spawn a listener goroutine
go func() {
for {
select {
case zip := <-zipGen:
uploadToServer(zip) // this is blocking
}
}
}()
}
}
Of course you can then add stop condition, probably using a different channel, but the basic idea is just the same.
答案2
得分: 1
请尝试这个链接:https://github.com/korovkin/limiter
limiter := NewConcurrencyLimiter(10)
limiter.Execute(func() {
uploadToServer()
})
limiter.Wait()
英文:
try this:
https://github.com/korovkin/limiter
limiter := NewConcurrencyLimiter(10)
limiter.Execute(func() {
uploadToServer()
})
limiter.Wait()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论