一个潜在递归任务的工作池(即,每个作业可以排队其他作业)

huangapple go评论72阅读模式
英文:

Worker pool for a potentially recursive task (i.e., each job can queue other jobs)

问题

我正在编写一个应用程序,用户可以使用一些“作业”(实际上是URL)启动。在开始(主程序)时,我将这些URL添加到一个队列中,然后启动x个goroutine来处理这些URL。

在特殊情况下,URL指向的资源可能包含更多的URL,这些URL必须添加到队列中。这3个工作线程正在等待新的作业并处理它们。问题是:一旦每个工作线程都在等待作业(没有任何线程在产生作业),工作线程应该全部停止。因此,要么全部工作,要么全部停止。

我的当前实现大致如下,我不认为它很优雅。不幸的是,我想不到一个更好的方法,它不会包含竞争条件,而且我也不确定这个实现是否按预期工作:

var queue // 来自某个地方
const WORKER_COUNT = 3
var done chan struct{}

func work(working chan int) {
  absent := make(chan struct{}, 1)
  // 如果连续弹出x个作业,则只向“absent”通道发送1个结构体。
  // 这个实现还假设select语句将按“顺序”进行评估(仅在通道1没有产生任何内容时才使用通道2)- 这是正确的吗?编辑:根据规范,是正确的。
  one := false
  for {
    select {
    case u, ok := <-queue.Pop():
      if !ok {
        close(absent)
        return
      }
      if !one {
        // 我已经开始工作(delta + 1)
        working <- 1
        absent <- struct{}{}
        one = true
      }
      // 使用u进行工作(可能会导致queue.Push(urls...))
    case <-absent: // 当前没有作业。消耗absent => 等待
      one = false
      working <- -1
    }
  }
}

func Start() {
  working := make(chan int)
  for i := 0; i < WORKER_COUNT; i++ {
    go work(working)
  }
  // 实际工作的工作线程数量...
  sum := 0
  for {
    delta := <-working
    sum += delta
    if sum == 0 {
      queue.Close() // 关闭通道 => 终止工作线程。
      done <- struct{}{}
      return
    }
  }
}

有没有更好的方法来解决这个问题?

英文:

I'm writing an application that the user can start with a number of "jobs" (URLs actually). At the beginning (main routine), I add these URLs to a queue, then start x goroutines that work on these URLs.

In special cases, the resource a URL points to may contain even more URLs which have to be added to the queue. The 3 workers are waiting for new jobs to come in and process them. The problem is: once EVERY worker is waiting for a job (and none is producing any), the workers should stop altogether. So either all of them work or no one works.

My current implementation looks something like this and I don't think it's elegant. Unfortunately I couldn't think of a better way that wouldn't include race conditions and I'm not entirely sure if this implementation actually works as intended:

var queue // from somewhere
const WORKER_COUNT = 3
var done chan struct{}

func work(working chan int) {
  absent := make(chan struct{}, 1)
  // if x&gt;1 jobs in sequence are popped, send to &quot;absent&quot; channel only 1 struct.
  // This implementation also assumes that the select statement will be evaluated &quot;in-order&quot; (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.
  one := false
  for {
    select {
    case u, ok := &lt;-queue.Pop():
      if !ok {
        close(absent)
        return
      }
      if !one {
        // I have started working (delta + 1)
        working &lt;- 1
        absent &lt;- struct{}{}
        one = true
      }
      // do work with u (which may lead to queue.Push(urls...))
    case &lt;-absent: // no jobs at the moment. consume absent =&gt; wait
      one = false
      working &lt;- -1
    }
  }
}

func Start() {
  working := make(chan int)
  for i := 0; i &lt; WORKER_COUNT; i++ {
    go work(working)
  }
  // the amount of actually working workers...
  sum := 0
  for {
    delta := &lt;-working
    sum += delta
    if sum == 0 {
      queue.Close() // close channel -&gt; kill workers.
      done &lt;- struct{}{}
      return
    }
  }
}

Is there a better way to tackle this problem?

答案1

得分: 4

你可以使用sync.WaitGroup(参见文档https://golang.org/pkg/sync/#WaitGroup)来控制工作线程的生命周期,并使用非阻塞发送,这样工作线程在尝试排队更多任务时就不会发生死锁。

package main

import "sync"

const workers = 4

type job struct{}

func (j *job) do(enqueue func(job)) {
    // do the job, calling enqueue() for subtasks as needed
}

func main() {
    jobs, wg := make(chan job), new(sync.WaitGroup)
    var enqueue func(job)

    // workers
    for i := 0; i < workers; i++ {
        go func() {
            for j := range jobs {
                j.do(enqueue)
                wg.Done()
            }
        }()
    }

    // how to queue a job
    enqueue = func(j job) {
        wg.Add(1)
        select {
        case jobs <- j: // another worker took it
        default: // no free worker; do the job now
            j.do(enqueue)
            wg.Done()
        }
    }

    todo := make([]job, 1000)
    for _, j := range todo {
        enqueue(j)
    }
    wg.Wait()
    close(jobs)
}

尝试使用缓冲通道来避免死锁的困难在于你必须预先分配足够大的通道,以确保不会阻塞地容纳所有待处理的任务。除非你有一个小且已知数量的要爬取的URL,否则这可能会成为问题。

当你回退到在当前线程中使用普通递归时,你就没有了静态缓冲区大小限制。当然,仍然存在限制:如果有太多待处理的工作,你可能会耗尽内存;理论上,如果递归太深,你可能会耗尽堆栈(但这很困难!)。因此,如果你要爬取整个网络,你需要以一种更复杂的方式跟踪待处理的任务。

最后,作为一个更完整的示例,我不是特别自豪于这段代码,但我碰巧写了一个函数来启动并行排序,它以与你的URL获取相同的递归方式实现。

英文:

You can use a sync.WaitGroup (see docs) to control the lifetime of the workers, and use a non-blocking send so workers can't deadlock when they try to queue up more jobs:

package main

import &quot;sync&quot;

const workers = 4

type job struct{}

func (j *job) do(enqueue func(job)) {
	// do the job, calling enqueue() for subtasks as needed
}

func main() {
	jobs, wg := make(chan job), new(sync.WaitGroup)
	var enqueue func(job)

	// workers
	for i := 0; i &lt; workers; i++ {
		go func() {
			for j := range jobs {
				j.do(enqueue)
				wg.Done()
			}
		}()
	}

	// how to queue a job
	enqueue = func(j job) {
		wg.Add(1)
		select {
		case jobs &lt;- j: // another worker took it
		default: // no free worker; do the job now
			j.do(enqueue)
			wg.Done()
		}
	}

	todo := make([]job, 1000)
	for _, j := range todo {
		enqueue(j)
	}
	wg.Wait()
	close(jobs)
}

The difficulty with trying to avoid deadlocks with a buffered channel is that you have to allocate a big enough channel up front to definitely hold all pending tasks without blocking. Problematic unless, say, you have a small and known number of URLs to crawl.

When you fall back to doing ordinary recursion in the current thread, you don't have that static buffer-size limit. Of course, there are still limits: you'd probably run out of RAM if too much work were pending, and theoretically you could exhaust the stack with deep recursion (but that's hard!). So you'd need to track pending tasks some more sophisticated way if you were, say, crawling the Web at large.

Finally, as a more complete example, I'm not super proud of this code, but I happened to write a function to kick off a parallel sort that's recursive in the same way your URL fetching is.

huangapple
  • 本文由 发表于 2015年4月14日 06:37:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/29616241.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定