在失败时进行重试的队列处理

huangapple go评论84阅读模式
英文:

Go queue processing with retry on failure

问题

我们在处理后需要将一堆文件上传到远程 Blob 存储。

目前,前端(PHP)创建了一个 Redis 列表来存储这些文件,并为其分配了一个唯一的 ID,称为 JobID。然后,它将这个唯一的 ID 传递给一个 beanstalk 管道,由 Go 进程接收。它使用一个名为 Go workers 的库来处理每个作业 ID,类似于 net/http 的方式。它接收作业 ID,检索 Redis 列表并开始处理文件。

然而,目前一次只处理一个文件。由于这里的操作是 I/O 绑定的,而不是 CPU 绑定的,直觉告诉我们每个文件使用一个 goroutine 可能会有益处。

然而,我们希望在失败时重试上传,并跟踪每个作业处理的项目数量。我们不能启动无限数量的 goroutine,因为一个作业可能包含大约 10k 个要处理的文件,并且在高峰时段每秒可能会发送数百个这样的作业。对于这种情况,应该采取什么正确的方法?

注:如果需要的话,我们可以稍微更改技术栈(例如,将 beanstalkd 替换为其他东西)。

英文:

We have a bunch of files to be uploaded to remote blob store after processing.

Currently, the frontend (PHP) creates a redis list of such files and gives it a unique ID, called JobID. It then passes the unique ID to a beanstalk tube, which is received by a Go process. It uses a library called Go workers to process each job ID in the fashion of what net/http does. It receives the job ID, retrieves the redis list and starts processing files.

However, currently only one file is processed at a time. Since the operation here is I/O bound, not CPU bound, intuition suggests that it would be benefitial to use a goroutine per file.

However, we want to retry uploading on failure, as well as track the number of items processed per job. We cannot start a unbound number of goroutines because a single Job can contain about ~10k files to process and 100s of such Jobs can be sent per second during peak times. What would be the correct approach for this?

NB: We can change the technology stack a bit if needed (such as swapping out beanstalkd for something)

答案1

得分: 2

你可以通过使用带有最大协程数的缓冲通道来限制协程的数量。如果通道达到最大容量,你可以在该通道上进行阻塞。当协程完成时,它们将释放出空槽,以允许新的协程运行。

示例代码如下:

package main

import (
	"fmt"
	"sync"
)

var (
	concurrent    = 5
	semaphoreChan = make(chan struct{}, concurrent)
)

func doWork(wg *sync.WaitGroup, item int) {
	// 当通道已满时进行阻塞
	semaphoreChan <- struct{}{}

	go func() {
		defer func() {
			// 读取以释放一个槽位
			<-semaphoreChan
			wg.Done()
		}()
		// 在这里执行实际的工作
		fmt.Println(item)
	}()
}

func main() {
	// 为了示例需要,我们需要阻塞直到所有协程完成
	var wg sync.WaitGroup
	wg.Add(10)

	// 启动工作
	for i := 0; i < 10; i++ {
		doWork(&wg, i)
	}

	// 阻塞直到所有工作完成
	wg.Wait()
}

Go Playground链接:https://play.golang.org/p/jDMYuCe7HV

这个方法受到了Golang UK Conference的这个演讲的启发:https://youtu.be/yeetIgNeIkc?t=1413

英文:

You can limit the number of goroutines by using a buffered chan with a size of the maximum number of goroutines you want. You can then block on this chan if it reaches maximum capacity. As your goroutines finish, they will free up slots to allow new goroutines to run.

Example:

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

var (
	concurrent    = 5
	semaphoreChan = make(chan struct{}, concurrent)
)

func doWork(wg *sync.WaitGroup, item int) {
	// block while full
	semaphoreChan &lt;- struct{}{}

	go func() {
		defer func() {
			// read to release a slot
			&lt;-semaphoreChan
			wg.Done()
		}()
		// This is where your work actually gets done
		fmt.Println(item)
	}()
}

func main() {
	// we need this for the example so that we can block until all goroutines finish
	var wg sync.WaitGroup
	wg.Add(10)

	// start the work
	for i := 0; i &lt; 10; i++ {
		doWork(&amp;wg, i)
	}

	// block until all work is done
	wg.Wait()
}

Go Playground link: https://play.golang.org/p/jDMYuCe7HV

Inspired by this Golang UK Conference talk: https://youtu.be/yeetIgNeIkc?t=1413

huangapple
  • 本文由 发表于 2017年2月23日 13:44:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/42407988.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定