英文:
Worker pool for a potentially recursive task (i.e., each job can queue other jobs)
问题
我正在编写一个应用程序,用户可以使用一些“作业”(实际上是URL)启动。在开始(主程序)时,我将这些URL添加到一个队列中,然后启动x个goroutine来处理这些URL。
在特殊情况下,URL指向的资源可能包含更多的URL,这些URL必须添加到队列中。这3个工作线程正在等待新的作业并处理它们。问题是:一旦每个工作线程都在等待作业(没有任何线程在产生作业),工作线程应该全部停止。因此,要么全部工作,要么全部停止。
我的当前实现大致如下,我不认为它很优雅。不幸的是,我想不到一个更好的方法,它不会包含竞争条件,而且我也不确定这个实现是否按预期工作:
var queue // 来自某个地方
const WORKER_COUNT = 3
var done chan struct{}
func work(working chan int) {
absent := make(chan struct{}, 1)
// 如果连续弹出x个作业,则只向“absent”通道发送1个结构体。
// 这个实现还假设select语句将按“顺序”进行评估(仅在通道1没有产生任何内容时才使用通道2)- 这是正确的吗?编辑:根据规范,是正确的。
one := false
for {
select {
case u, ok := <-queue.Pop():
if !ok {
close(absent)
return
}
if !one {
// 我已经开始工作(delta + 1)
working <- 1
absent <- struct{}{}
one = true
}
// 使用u进行工作(可能会导致queue.Push(urls...))
case <-absent: // 当前没有作业。消耗absent => 等待
one = false
working <- -1
}
}
}
func Start() {
working := make(chan int)
for i := 0; i < WORKER_COUNT; i++ {
go work(working)
}
// 实际工作的工作线程数量...
sum := 0
for {
delta := <-working
sum += delta
if sum == 0 {
queue.Close() // 关闭通道 => 终止工作线程。
done <- struct{}{}
return
}
}
}
有没有更好的方法来解决这个问题?
英文:
I'm writing an application that the user can start with a number of "jobs" (URLs actually). At the beginning (main routine), I add these URLs to a queue, then start x goroutines that work on these URLs.
In special cases, the resource a URL points to may contain even more URLs which have to be added to the queue. The 3 workers are waiting for new jobs to come in and process them. The problem is: once EVERY worker is waiting for a job (and none is producing any), the workers should stop altogether. So either all of them work or no one works.
My current implementation looks something like this and I don't think it's elegant. Unfortunately I couldn't think of a better way that wouldn't include race conditions and I'm not entirely sure if this implementation actually works as intended:
var queue // from somewhere
const WORKER_COUNT = 3
var done chan struct{}
func work(working chan int) {
absent := make(chan struct{}, 1)
// if x>1 jobs in sequence are popped, send to "absent" channel only 1 struct.
// This implementation also assumes that the select statement will be evaluated "in-order" (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.
one := false
for {
select {
case u, ok := <-queue.Pop():
if !ok {
close(absent)
return
}
if !one {
// I have started working (delta + 1)
working <- 1
absent <- struct{}{}
one = true
}
// do work with u (which may lead to queue.Push(urls...))
case <-absent: // no jobs at the moment. consume absent => wait
one = false
working <- -1
}
}
}
func Start() {
working := make(chan int)
for i := 0; i < WORKER_COUNT; i++ {
go work(working)
}
// the amount of actually working workers...
sum := 0
for {
delta := <-working
sum += delta
if sum == 0 {
queue.Close() // close channel -> kill workers.
done <- struct{}{}
return
}
}
}
Is there a better way to tackle this problem?
答案1
得分: 4
你可以使用sync.WaitGroup(参见文档https://golang.org/pkg/sync/#WaitGroup)来控制工作线程的生命周期,并使用非阻塞发送,这样工作线程在尝试排队更多任务时就不会发生死锁。
package main
import "sync"
const workers = 4
type job struct{}
func (j *job) do(enqueue func(job)) {
// do the job, calling enqueue() for subtasks as needed
}
func main() {
jobs, wg := make(chan job), new(sync.WaitGroup)
var enqueue func(job)
// workers
for i := 0; i < workers; i++ {
go func() {
for j := range jobs {
j.do(enqueue)
wg.Done()
}
}()
}
// how to queue a job
enqueue = func(j job) {
wg.Add(1)
select {
case jobs <- j: // another worker took it
default: // no free worker; do the job now
j.do(enqueue)
wg.Done()
}
}
todo := make([]job, 1000)
for _, j := range todo {
enqueue(j)
}
wg.Wait()
close(jobs)
}
尝试使用缓冲通道来避免死锁的困难在于你必须预先分配足够大的通道,以确保不会阻塞地容纳所有待处理的任务。除非你有一个小且已知数量的要爬取的URL,否则这可能会成为问题。
当你回退到在当前线程中使用普通递归时,你就没有了静态缓冲区大小限制。当然,仍然存在限制:如果有太多待处理的工作,你可能会耗尽内存;理论上,如果递归太深,你可能会耗尽堆栈(但这很困难!)。因此,如果你要爬取整个网络,你需要以一种更复杂的方式跟踪待处理的任务。
最后,作为一个更完整的示例,我不是特别自豪于这段代码,但我碰巧写了一个函数来启动并行排序,它以与你的URL获取相同的递归方式实现。
英文:
You can use a sync.WaitGroup (see docs) to control the lifetime of the workers, and use a non-blocking send so workers can't deadlock when they try to queue up more jobs:
package main
import "sync"
const workers = 4
type job struct{}
func (j *job) do(enqueue func(job)) {
// do the job, calling enqueue() for subtasks as needed
}
func main() {
jobs, wg := make(chan job), new(sync.WaitGroup)
var enqueue func(job)
// workers
for i := 0; i < workers; i++ {
go func() {
for j := range jobs {
j.do(enqueue)
wg.Done()
}
}()
}
// how to queue a job
enqueue = func(j job) {
wg.Add(1)
select {
case jobs <- j: // another worker took it
default: // no free worker; do the job now
j.do(enqueue)
wg.Done()
}
}
todo := make([]job, 1000)
for _, j := range todo {
enqueue(j)
}
wg.Wait()
close(jobs)
}
The difficulty with trying to avoid deadlocks with a buffered channel is that you have to allocate a big enough channel up front to definitely hold all pending tasks without blocking. Problematic unless, say, you have a small and known number of URLs to crawl.
When you fall back to doing ordinary recursion in the current thread, you don't have that static buffer-size limit. Of course, there are still limits: you'd probably run out of RAM if too much work were pending, and theoretically you could exhaust the stack with deep recursion (but that's hard!). So you'd need to track pending tasks some more sophisticated way if you were, say, crawling the Web at large.
Finally, as a more complete example, I'm not super proud of this code, but I happened to write a function to kick off a parallel sort that's recursive in the same way your URL fetching is.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论