英文:
processing jobs from a neverending queue with a fixed number of workers
问题
这让我头疼,我无法弄清楚如何解决它:
- 我想要同时运行固定数量 N 的 goroutines。
- 从一个无限队列中获取关于要处理的作业的 X 个消息。
- 我想让这 N 个 goroutines 处理这些 X 个作业,一旦其中一个例程没有更多工作要做,我就想从无限队列中获取另外 X 个作业。
下面的答案中的代码(参见链接)非常出色地处理了这些任务,但是一旦任务列表为空,工作线程就会停止,我希望它们保持活动状态,并以某种方式通知主代码它们没有工作可做,这样我就可以获取更多的作业来填充任务列表。
使用下面的用户 Jsor 的示例代码,我尝试创建一个简单的程序,但我感到困惑。
import (
"fmt"
"strconv"
)
//workChan - 只读通道,用于传递工作
//requestChan - ??? 这是什么
func Worker(myid string, workChan <- chan string, requestChan chan<- struct{}) {
for {
select {
case work := <-workChan:
fmt.Println("Channel: " + myid + " do some work: " + work)
case requestChan <- struct{}{}:
//hm? requestChan 如何使用?
}
}
}
func Logic(){
workChan := make(chan string)
requestChan := make(chan struct{})
//创建工作线程
for i:=1; i < 5; i++ {
Worker(strconv.Itoa(i), workChan, requestChan)
}
//给工作线程一些工作
for i:=100; i < 115; i++ {
workChan <- "workid"+strconv.Itoa(i)
}
}
英文:
This is doing my head in, I cant figure out how to solve it;
- I want to have a fixed number N of goroutines running in parallell
- From a never-ending queue I will fetch X msg about jobs to process
- I want to let the N goroutines process these X jobs, and as soon as one of the routines have nothing more to do, I want to fetch another X jobs from the neverending queue
The code in the answer below (see url) works brilliantly to process the tasks, but the workers will die once that tasks list is empty, I want them to stay alive and somehow notify the main code that they are out of work so I can fetch more jobs to fill the tasks list with tasks
Using user:Jsor example code from below, I try to create a simple program, but I am confused.
import (
"fmt"
"strconv"
)
//workChan - read only that delivers work
//requestChan - ??? what is this
func Worker(myid string, workChan <- chan string, requestChan chan<- struct{}) {
for {
select {
case work := <-workChan:
fmt.Println("Channel: " + myid + " do some work: " + work)
case requestChan <- struct{}{}:
//hm? how is the requestChan used?
}
}
}
func Logic(){
workChan := make(chan string)
requestChan := make(chan struct{})
//Create the workers
for i:=1; i < 5; i++ {
Worker( strconv.Itoa( i), workChan, requestChan)
}
//Give the workers some work
for i:=100; i < 115; i++ {
workChan<- "workid"+strconv.Itoa( i)
}
}
答案1
得分: 2
这是select
语句的用法。
func Worker(workChan chan<- Work, requestChan chan<- struct{}) {
for {
select {
case work := <-workChan:
// 做一些工作
case requestChan <- struct{}{}:
}
}
}
这个工作线程将一直运行下去。如果有可用的工作,它将从工作通道中获取。如果没有可用的工作,它将发送一个请求。
请注意,由于它会一直运行下去,如果你想要终止一个工作线程,你需要做一些其他的事情。一种可能的方法是始终检查workChan
的ok
值,如果该通道已关闭,则退出函数。另一种选择是为每个工作线程使用一个单独的退出通道。
英文:
This is what the select
statement is for.
func Worker(workChan chan<- Work, requestChan chan<- struct{}) {
for {
select {
case work := <-workChan:
// Do work
case requestChan <- struct{}{}:
}
}
}
This worker will run forever and ever. If work is available, it will pull it from the worker channel. If there's nothing left it will send a request.
Not that since it runs forever and ever, if you want to be able to kill a worker you need to do something else. One possibility is to always check ok
with workChan and if that channel is closed quit the function. Another option is to use an individual quit channel for each worker.
答案2
得分: 0
与您之前发布的[其他解决方案][1]相比,您只需要(首先)_不要_关闭通道,只需不断向其中添加项目。
然后,您需要回答以下问题:是否绝对必要 (a) 仅在其中一个工作线程“没有更多工作要做”时(或者说,一旦前 X 个项目完全处理完毕或分配给工作线程),您才从队列中获取下一个 X 个项目;或者 (b) 如果您将第二组 X 个项目保留在内存中,并在需要新的工作项目时继续向工作线程提供它们,是否可以接受?
据我理解,只有 (a) 需要您所疑惑的 requestChan(请参见下文)。对于 (b),以下简单的代码就足够了:
// B 版本
type WorkItem int
const (
N = 5 // 工作线程数量
X = 15 // 一次从无限队列中获取的工作项目数量
)
func Worker(id int, workChan <-chan WorkItem) {
for {
item := <-workChan
doWork(item)
fmt.Printf("Worker %d processes item #%v\n", id, item)
}
}
func Dispatch(workChan chan<- WorkItem) {
for {
items := GetNextFromQueue(X)
for _, item := range items {
workChan <- item
fmt.Printf("Dispatched item #%v\n", item)
}
}
}
func main() {
workChan := make(chan WorkItem) // 所有工作线程共享的通道;如果 GetNextFromQueue() 操作较慢,可以将其设置为带缓冲的通道。
// 启动 N 个工作线程。
for i := 0; i < N; i++ {
go Worker(i, workChan)
}
// 将项目分派给工作线程。
go Dispatch(workChan)
time.Sleep(20 * time.Second) // 确保 main() 函数和我们的程序完成。
}
(我已经在 Playground 上上传了[b版本的完整工作解决方案][2]。)
至于 (a),工作线程的更改如下:进行工作,_或者_如果没有更多工作,则通过 reqChan 通信通道告知调度程序获取更多工作。这个 “或” 是通过 select 实现的。然后,在调用 GetNextFromQueue()
之前,调度程序在 reqChan 上 等待。这需要更多的代码,但可以确保您可能感兴趣的语义。 (尽管前一个版本更简单。)
// A 版本
func Worker(id int, workChan <-chan WorkItem, reqChan chan<- int) {
for {
select {
case item := <-workChan:
doWork(item)
fmt.Printf("Worker %d processes item #%v\n", id, item)
case reqChan <- id:
fmt.Printf("Worker %d thinks they requested more work\n", id)
}
}
}
func Dispatch(workChan chan<- WorkItem, reqChan <-chan int) {
for {
items := GetNextFromQueue(X)
for _, item := range items {
workChan <- item
fmt.Printf("Dispatched item #%v\n", item)
}
id := <-reqChan
fmt.Printf("Polling the queue in Dispatch() at the request of worker %d\n", id)
}
}
(我还在 Playground 上上传了[a版本的完整工作解决方案][3]。)
[1]: https://stackoverflow.com/questions/18405023/how-would-you-define-a-pool-of-goroutines-to-be-executed-at-once-in-golang#18406762
[2]: http://play.golang.org/p/H8IkGbl07I
[3]: http://play.golang.org/p/QrEogt_Xcj
<details>
<summary>英文:</summary>
Compared to the [other solution you posted][1], you just need (first) _not_ to close the channel, and just keep feeding items to it.
Then you need to answer the following question: is it absolutely necessary that **(a)** you fetch the next X items from your queue _only_ once one of the workers has “nothing more to do” (or, what is the same, once the first X items are either fully processed, or assigned to a worker); or **(b)** is it okay if you keep the second set of X items in memory, and go feeding them to the workers as new work items are needed?
As I understand it, only **(a)** needs the _requestChan_ you’re wondering about (see below). For **(b)**, something as simple as the following would suffice:
<!-- language: go -->
# B version
type WorkItem int
const (
N = 5 // Number of workers
X = 15 // Number of work items to get from the infinite queue at once
)
func Worker(id int, workChan <-chan WorkItem) {
for {
item := <-workChan
doWork(item)
fmt.Printf("Worker %d processes item #%v\n", id, item)
}
}
func Dispatch(workChan chan<- WorkItem) {
for {
items := GetNextFromQueue(X)
for _, item := range items {
workChan <- item
fmt.Printf("Dispatched item #%v\n", item)
}
}
}
func main() {
workChan := make(chan WorkItem) // Shared amongst all workers; could make it buffered if GetNextFromQueue() is slow.
// Start N workers.
for i := 0; i < N; i++ {
go Worker(i, workChan)
}
// Dispatch items to the workers.
go Dispatch(workChan)
time.Sleep(20 * time.Second) // Ensure main(), and our program, finish.
}
(I’ve uploaded to the Playground a [full working solution for (b)][2].)
As for **(a)**, the workers change to say: do work, _or_ if there’s no more work, tell the dispatcher to get more via the _reqChan_ communication channel. That _“or”_ is implemented via _select_. Then, the dispatcher _waits_ on _reqChan_ before making another call to `GetNextFromQueue()`. It’s more code, but ensures the semantics that you might be interested in. (The previous version is overall simpler, though.)
<!-- language: go -->
# A version
func Worker(id int, workChan <-chan WorkItem, reqChan chan<- int) {
for {
select {
case item := <-workChan:
doWork(item)
fmt.Printf("Worker %d processes item #%v\n", id, item)
case reqChan <- id:
fmt.Printf("Worker %d thinks they requested more work\n", id)
}
}
}
func Dispatch(workChan chan<- WorkItem, reqChan <-chan int) {
for {
items := GetNextFromQueue(X)
for _, item := range items {
workChan <- item
fmt.Printf("Dispatched item #%v\n", item)
}
id := <-reqChan
fmt.Printf("Polling the queue in Dispatch() at the request of worker %d\n", id)
}
}
(I’ve also uploaded to the Playground a [full working solution for (a)][3].)
[1]: https://stackoverflow.com/questions/18405023/how-would-you-define-a-pool-of-goroutines-to-be-executed-at-once-in-golang#18406762
[2]: http://play.golang.org/p/H8IkGbl07I
[3]: http://play.golang.org/p/QrEogt_Xcj
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论