英文:
Idiomatic variable-size worker pool in Go
问题
我正在尝试在Go中实现一个工作池。go-wiki(以及Channels部分的Effective Go)提供了优秀的绑定资源使用示例。只需创建一个缓冲区大小与工作池相同的通道。然后用工作人员填充该通道,并在完成后将它们发送回通道。从通道接收会阻塞,直到有可用的工作人员。因此,通道和循环就是整个实现过程 - 非常酷!
或者,也可以在向通道发送时进行阻塞,但思路相同。
我的问题是关于在工作池运行时更改其大小的方法。我不认为有一种方法可以更改通道的大小。我有一些想法,但其中大多数似乎过于复杂。这个页面实际上使用通道和空结构体以相同的方式实现了一个信号量,但它也有同样的问题(在搜索“golang semaphore”时经常遇到这些问题)。
英文:
I'm trying to implement a pool of workers in Go. The go-wiki (and Effective Go in the Channels section) feature excellent examples of bounding resource use. Simply make a channel with a buffer that's as large as the worker pool. Then fill that channel with workers, and send them back into the channel when they're done. Receiving from the channel blocks until a worker is available. So the channel and a loop is the entire implementation -- very cool!
Alternatively one could block on sending into the channel, but same idea.
My question is about changing the size of the worker pool while it's running. I don't believe there's a way to change the size of a channel. I have some ideas, but most of them seem way too complicated. This page actually implements a semaphore using a channel and empty structs in much the same way, but it has the same problem (these things come up all the time while Googling for "golang semaphore".
答案1
得分: 25
我会用另一种方式来做。不是生成许多 goroutine(仍然需要相当多的内存)并使用通道来阻塞它们,我会将工作模型化为 goroutine,并使用通道来分发工作。类似这样的代码:
package main
import (
"fmt"
"sync"
)
type Task string
func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case task, ok := <-tasks:
if !ok {
return
}
fmt.Println("processing task", task)
case <-quit:
return
}
}
}
func main() {
tasks := make(chan Task, 128)
quit := make(chan bool)
var wg sync.WaitGroup
// 生成 5 个 worker
for i := 0; i < 5; i++ {
wg.Add(1)
go worker(tasks, quit, &wg)
}
// 分发一些任务
tasks <- Task("foo")
tasks <- Task("bar")
// 移除两个 worker
quit <- true
quit <- true
// 添加三个更多的 worker
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(tasks, quit, &wg)
}
// 分发更多的任务
for i := 0; i < 20; i++ {
tasks <- Task(fmt.Sprintf("additional_%d", i+1))
}
// 任务结束后,worker 应该退出
close(tasks)
// 如果你不想等待剩余的任务,可以使用 "close(quit)"
// 等待所有 worker 正确关闭
wg.Wait()
}
也许创建一个单独的 WorkerPool 类型并添加一些方便的方法会是个好主意。另外,使用一个包含 done
通道的结构体来表示任务已成功执行的信号,而不是使用 type Task string
。
**编辑:**我稍微尝试了一下,并得到了以下结果:http://play.golang.org/p/VlEirPRk8V。它基本上是相同的示例,但具有更好的 API。
英文:
I would do it the other way round. Instead of spawning many goroutines (which still require a considerable amount of memory) and use a channel to block them, I would model the workers as goroutines and use a channel to distribute the work. Something like this:
package main
import (
"fmt"
"sync"
)
type Task string
func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case task, ok := <-tasks:
if !ok {
return
}
fmt.Println("processing task", task)
case <-quit:
return
}
}
}
func main() {
tasks := make(chan Task, 128)
quit := make(chan bool)
var wg sync.WaitGroup
// spawn 5 workers
for i := 0; i < 5; i++ {
wg.Add(1)
go worker(tasks, quit, &wg)
}
// distribute some tasks
tasks <- Task("foo")
tasks <- Task("bar")
// remove two workers
quit <- true
quit <- true
// add three more workers
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(tasks, quit, &wg)
}
// distribute more tasks
for i := 0; i < 20; i++ {
tasks <- Task(fmt.Sprintf("additional_%d", i+1))
}
// end of tasks. the workers should quit afterwards
close(tasks)
// use "close(quit)", if you do not want to wait for the remaining tasks
// wait for all workers to shut down properly
wg.Wait()
}
It might be a good idea to create a separate WorkerPool type with some convenient methods. Also, instead of type Task string
it is quite common to use a struct that also contains a done
channel that is used to signal that the task had been executed successfully.
Edit: I've played around a bit more and came up with the following: http://play.golang.org/p/VlEirPRk8V. It's basically the same example, with a nicer API.
答案2
得分: 3
一个简单的改变是添加一个控制信号量大小的通道。相关的部分是select语句。如果队列中有更多的工作,使用当前的信号量来处理它。如果有一个请求改变信号量的大小,改变它并继续使用新的信号量处理请求队列。注意旧的信号量将被垃圾回收。
package main
import "time"
import "fmt"
type Request struct{ num int }
var quit chan struct{} = make(chan struct{})
func Serve(queue chan *Request, resize chan int, semsize int) {
for {
sem := make(chan struct{}, semsize)
var req *Request
select {
case semsize = <-resize:
{
sem = make(chan struct{}, semsize)
fmt.Println("changing semaphore size to ", semsize)
}
case req = <-queue:
{
sem <- struct{}{} // Block until there's capacity to process a request.
go handle(req, sem) // Don't wait for handle to finish.
}
case <-quit:
return
}
}
}
func process(r *Request) {
fmt.Println("Handled Request", r.num)
}
func handle(r *Request, sem chan struct{}) {
process(r) // May take a long time & use a lot of memory or CPU
<-sem // Done; enable next request to run.
}
func main() {
workq := make(chan *Request, 1)
ctrlq := make(chan int)
go func() {
for i := 0; i < 20; i += 1 {
<-time.After(100 * time.Millisecond)
workq <- &Request{i}
}
<-time.After(500 * time.Millisecond)
quit <- struct{}{}
}()
go func() {
<-time.After(500 * time.Millisecond)
ctrlq <- 10
}()
Serve(workq, ctrlq, 1)
}
你可以在这里查看代码:http://play.golang.org/p/AHOLlAv2LH
英文:
A simple change that can think is to have a channel that controls how big is the semaphore.
The relevant part is the select statements. If there is more work from the queue process it with the current semaphore. If there is a request to change the size of the semaphore change it and continue processing the req queue with the new semaphore. Note that the old one is going to be garbage collected.
package main
import "time"
import "fmt"
type Request struct{ num int }
var quit chan struct{} = make(chan struct{})
func Serve(queue chan *Request, resize chan int, semsize int) {
for {
sem := make(chan struct{}, semsize)
var req *Request
select {
case semsize = <-resize:
{
sem = make(chan struct{}, semsize)
fmt.Println("changing semaphore size to ", semsize)
}
case req = <-queue:
{
sem <- struct{}{} // Block until there's capacity to process a request.
go handle(req, sem) // Don't wait for handle to finish.
}
case <-quit:
return
}
}
}
func process(r *Request) {
fmt.Println("Handled Request", r.num)
}
func handle(r *Request, sem chan struct{}) {
process(r) // May take a long time & use a lot of memory or CPU
<-sem // Done; enable next request to run.
}
func main() {
workq := make(chan *Request, 1)
ctrlq := make(chan int)
go func() {
for i := 0; i < 20; i += 1 {
<-time.After(100 * time.Millisecond)
workq <- &Request{i}
}
<-time.After(500 * time.Millisecond)
quit <- struct{}{}
}()
go func() {
<-time.After(500 * time.Millisecond)
ctrlq <- 10
}()
Serve(workq, ctrlq, 1)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论