英文:
How to perform concurrent downloads in Go
问题
我们有一个流程,用户请求我们从源获取文件。这个源不是最可靠的,所以我们使用了Amazon SQS来实现一个队列。我们将下载URL放入队列中,然后使用Go语言编写的一个小应用程序来轮询队列。这个应用程序简单地获取消息,下载文件,然后将文件推送到我们存储文件的S3中。完成所有这些后,它会回调一个服务,该服务会向用户发送电子邮件,告知他们文件已准备好。
最初,我编写了一个创建_n_个通道的代码,并将一个go协程附加到每个通道上,并让go协程处于无限循环中。这样,我可以确保一次只处理固定数量的下载。
我意识到这不是通道应该使用的方式,如果我现在理解正确的话,实际上应该有一个通道,有_n_个go协程在该通道上接收。每个go协程都处于无限循环中,等待消息,当收到消息时,它将处理数据,执行所有应该执行的操作,完成后它将等待下一条消息。这样可以确保一次只处理_n_个文件。我认为这是正确的做法。我相信这就是所谓的"扇出",对吗?
我不需要将这些进程合并在一起。下载完成后,它会回调一个远程服务来处理剩下的过程。应用程序不需要做其他任何事情。
好的,下面是一些代码:
func main() {
queue, err := ConnectToQueue() // 这部分没问题...
if err != nil {
log.Fatalf("Could not connect to queue: %s\n", err)
}
msgChannel := make(chan sqs.Message, 10)
for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
go processMessage(msgChannel, queue)
}
for {
response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)
for _, m := range response.Messages {
msgChannel <- m
}
}
}
func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {
for {
m := <-ch
// 处理消息 m
// 处理完消息后从队列中删除消息
queue.DeleteMessage(&m)
}
}
我离正确的方式有多近?我有_n_个运行中的go协程(其中MAX_CONCURRENT_ROUTINES
= n),在循环中我们将继续将消息传递给单个通道。这样做对吗?我需要关闭任何东西吗,还是可以让它无限运行下去?
我注意到一个问题,SQS返回了消息,但是一旦我将10条消息传递给processMessage()
(10是通道缓冲区的大小),就不再处理其他消息了。
谢谢大家。
英文:
We have a process whereby users request files that we need to get from our source. This source isn't the most reliable so we implemented a queue using Amazon SQS. We put the download URL into the queue and then we poll it with a small app that we wrote in Go. This app simply retrieves the messages, downloads the file and then pushes it to S3 where we store it. Once all of this is complete it calls back a service which will email the user to let them know that the file is ready.
Originally I wrote this to create n channels and then attached 1 go-routine to each and had the go-routine in an infinite loop. This way I could ensure that I was only ever processing a fixed number of downloads at a time.
I realised that this isn't the way that channels are supposed to be used and, if I'm understanding correctly now, there should actually be one channel with n go-routines receiving on that channel. Each go-routine is in an infinite loop, waiting on a message and when it receives it will process the data, do everything that it's supposed to and when it's done it will wait on the next message. This allows me to ensure that I'm only ever processing n files at a time. I think this is the right way to do it. I believe this is fan-out, right?
What I don't need to do, is to merge these processes back together. Once the download is done it is calling back a remote service so that handles the remainder of the process. There is nothing else that the app needs to do.
OK, so some code:
func main() {
queue, err := ConnectToQueue() // This works fine...
if err != nil {
log.Fatalf("Could not connect to queue: %s\n", err)
}
msgChannel := make(chan sqs.Message, 10)
for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
go processMessage(msgChannel, queue)
}
for {
response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)
for _, m := range response.Messages {
msgChannel <- m
}
}
}
func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {
for {
m := <-ch
// Do something with message m
// Delete message from queue when we're done
queue.DeleteMessage(&m)
}
}
Am I anywhere close here? I have n running go-routines (where MAX_CONCURRENT_ROUTINES
= n) and in the loop we will keep passing messages in to the single channel. Is this the right way to do it? Do I need to close anything or can I just leave this running indefinitely?
One thing that I'm noticing is that SQS is returning messages but once I've had 10 messages passed into processMessage()
(10 being the size of the channel buffer) that no further messages are actually processed.
Thanks all
答案1
得分: 3
看起来不错。几点说明:
-
你可以通过其他方式来限制工作的并行性,而不仅仅是限制你生成的工作例程的数量。例如,你可以为每个接收到的消息创建一个 goroutine,然后让生成的 goroutine 等待一个限制并行性的信号量。当然,这样做会有一些权衡,但你不仅限于你所描述的方式。
sem := make(chan struct{}, n) work := func(m sqs.Message) { sem <- struct{}{} // 当有空间时,我们可以继续 // 做工作 <-sem // 释放信号量中的空间 }() for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) { for _, m0 := range m { go work(m0) } }
-
只处理 10 条消息的限制是由你的堆栈中的其他地方引起的。可能是因为前 10 条消息填满了通道,然后工作没有完成,或者你意外地从工作例程中返回。如果你的工作例程是根据你所描述的模型持久存在的,你需要确保它们不会返回。
-
不清楚你是否希望在处理了一定数量的消息后返回。如果你确实希望这个进程退出,你需要等待所有的工作例程完成它们当前的任务,然后可能发出信号让它们返回。查看
sync.WaitGroup
来同步它们的完成,并在你的工作例程中有另一个通道来表示没有更多的工作,或者关闭msgChannel
,并在工作例程中处理它(查看 2 元组返回通道接收表达式)。
英文:
That looks fine. A few notes:
-
You can limit the work parallelism by means other than limiting the number of worker routines you spawn. For example you can create a goroutine for every message received, and then have the spawned goroutine wait for a semaphore that limits the parallelism. Of course there are tradeoffs, but you aren't limited to just the way you've described.
sem := make(chan struct{}, n) work := func(m sqs.Message) { sem <- struct{}{} // When there's room we can proceed // do the work <-sem // Free room in the channel }() for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) { for _, m0 := range m { go work(m0) } }
-
The limit of only 10 messages being processed is being caused elsewhere in your stack. Possibly you're seeing a race where the first 10 fill the channel, and then the work isn't completing, or perhaps you're accidentally returning from the worker routines. If your workers are persistent per the model you've described, you'll want to be certain that they don't return.
-
It's not clear if you want the process to return after you've processed some number of messages. If you do want this process to exit, you'll need to wait for all the workers to finish their current tasks, and probably signal them to return afterwards. Take a look at
sync.WaitGroup
for synchronizing their completion, and having another channel to signal that there's no more work, or closemsgChannel
, and handle that in your workers. (Take a look at the 2-tuple return channel receive expression.)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论