英文:
Go Golang select statement cannot receive sended value
问题
我是你的中文翻译助手,以下是你提供的代码的翻译:
package main
import (
"fmt"
"time"
"container/heap"
)
type Request struct {
fn func(*Worker) int
c chan int
}
func requester(work chan<- Request) {
c := make(chan int)
work <- Request{workFn, c}
result := <-c
furtherProcess(result)
}
func workFn(w *Worker) int {
time.Sleep(1000 * time.Millisecond)
return w.index
}
func furtherProcess(result int) {
fmt.Println(result)
}
type Worker struct {
request chan Request
pending int
index int
}
func (w *Worker) work(done chan *Worker) {
for req := range w.request {
req.c <- req.fn(w)
fmt.Println("sending to done:", done)
done <- w
fmt.Println("sended to done")
}
}
type Pool []*Worker
type Balancer struct {
pool Pool
done chan *Worker
}
func (b *Balancer) balance(work chan Request) {
for {
fmt.Println("selecting, done:", b.done)
select {
case req := <-work:
b.dispatch(req)
case w := <-b.done:
fmt.Println("completed")
b.completed(w)
}
}
}
func (p Pool) Len() int {
return len(p)
}
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
func (p Pool) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
func (p *Pool) Push(x interface{}) {
*p = append(*p, x.(*Worker))
}
func (p *Pool) Pop() interface{} {
old := *p
n := len(old)
x := old[n-1]
*p = old[0 : n-1]
return x
}
func (b *Balancer) dispatch(req Request) {
w := heap.Pop(&b.pool).(*Worker)
w.request <- req
w.pending++
heap.Push(&b.pool, w)
fmt.Println("dispatched to worker", w.index)
}
func (b *Balancer) completed(w *Worker) {
w.pending--
heap.Remove(&b.pool, w.index)
heap.Push(&b.pool, w)
}
func Run() {
NumWorkers := 4
req := make(chan Request)
done := make(chan *Worker)
b := Balancer{make([]*Worker, NumWorkers), done}
for i := 0; i < NumWorkers; i++ {
w := Worker{make(chan Request), 0, i}
b.pool[i] = &w
go w.work(done)
}
go b.balance(req)
for i := 0; i < NumWorkers*4; i++ {
go requester(req)
}
time.Sleep(200000 * time.Millisecond)
}
func main() {
Run()
}
当你运行这段代码时,你得到了以下输出:
selecting, done: 0xc0820082a0
dispatched to worker 0
selecting, done: 0xc0820082a0
dispatched to worker 3
selecting, done: 0xc0820082a0
dispatched to worker 2
selecting, done: 0xc0820082a0
dispatched to worker 1
selecting, done: 0xc0820082a0
sending to done: 0xc0820082a0
sending to done: 0xc0820082a0
3
sending to done: 0xc0820082a0
2
1
0
sending to done: 0xc0820082a0
正如你所看到的,它在同一个管道上进行选择和发送(done: 0xc0820082a0),但是选择语句没有接收到发送的值,并且一直阻塞。这是怎么发生的?上面的代码有什么问题?谢谢!
英文:
I'm new to Go and trying to implement a simple load balancer as seen in this slides:
http://concur.rspace.googlecode.com/hg/talk/concur.html#slide-42
The complete code:
package main
import (
"fmt"
"time"
"container/heap"
)
type Request struct {
fn func(*Worker) int
c chan int
}
func requester(work chan <-Request) {
c := make(chan int)
work <- Request{workFn, c}
result := <-c
furtherProcess(result)
}
func workFn(w *Worker) int {
time.Sleep(1000 * time.Millisecond)
return w.index
}
func furtherProcess(result int) {
fmt.Println(result)
}
type Worker struct {
request chan Request
pending int
index int
}
func (w *Worker) work(done chan *Worker) {
for req := range w.request {
req.c <- req.fn(w)
fmt.Println("sending to done:", done)
done <- w
fmt.Println("sended to done")
}
}
type Pool []*Worker
type Balancer struct {
pool Pool
done chan *Worker
}
func (b *Balancer) balance(work chan Request) {
for {
fmt.Println("selecting, done:", b.done)
select {
case req := <-work:
b.dispatch(req)
case w := <-b.done:
fmt.Println("completed")
b.completed(w)
}
}
}
func (p Pool) Len() int {
return len(p)
}
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
func (p Pool) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
func (p *Pool) Push(x interface{}) {
*p = append(*p, x.(*Worker))
}
func (p *Pool) Pop() interface{} {
old := *p
n := len(old)
x := old[n - 1]
*p = old[0 : n - 1]
return x
}
func (b *Balancer) dispatch(req Request) {
w := heap.Pop(&b.pool).(*Worker)
w.request <- req
w.pending++
heap.Push(&b.pool, w)
fmt.Println("dispatched to worker", w.index)
}
func (b *Balancer) completed(w *Worker) {
w.pending--
heap.Remove(&b.pool, w.index)
heap.Push(&b.pool, w)
}
func Run() {
NumWorkers := 4
req := make(chan Request)
done := make(chan *Worker)
b := Balancer{make([]*Worker, NumWorkers), done}
for i := 0; i < NumWorkers; i++ {
w := Worker{make(chan Request), 0, i}
b.pool[i] = &w
go w.work(done)
}
go b.balance(req)
for i := 0; i < NumWorkers * 4; i++ {
go requester(req)
}
time.Sleep(200000 * time.Millisecond)
}
func main() {
Run()
}
When I ran it, I got following outputs:
selecting, done: 0xc0820082a0
dispatched to worker 0
selecting, done: 0xc0820082a0
dispatched to worker 3
selecting, done: 0xc0820082a0
dispatched to worker 2
selecting, done: 0xc0820082a0
dispatched to worker 1
selecting, done: 0xc0820082a0
sending to done: 0xc0820082a0
sending to done: 0xc0820082a0
3
sending to done: 0xc0820082a0
2
1
0
sending to done: 0xc0820082a0
As you can see, it was selecting on and sending to the same pipe (done: 0xc0820082a0), but the select didn't receive the sended value and was blocking forever. How could this happen? What's the problem with the above code? Thanks!
答案1
得分: 0
使用kill -ABRT <PID>
命令,你可以看到所有的工作线程都在done <- w
这一行被阻塞,而负载均衡器则在w.request <- req
这一行被阻塞,导致了死锁(工作线程无法继续执行直到负载均衡器接收到它们的“完成”信号,而负载均衡器无法继续执行直到选定的工作线程接收到请求)。
如果你将done <- w
替换为go func() { done <- w }()
,你会发现你的程序可以处理16个请求而不会挂起。
另外注意:你可以考虑使用sync.WaitGroup
来替代time.Sleep(200000 * time.Millisecond)
。
英文:
Using kill -ABRT <PID>
you can see that all your Workers are blocked on done <- w
while your Balancer is blocked on w.request <- req
, creating a deadlock (workers can't go further until the balancer receives their "done" signals, and the balancer can't go further until the selected worker takes the request).
If you replace done <- w
by go func() { done <- w }()
, you can see that your program will process the 16 requests without hanging.
Side note: instead of time.Sleep(200000 * time.Millisecond)
, look into sync.WaitGroup
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论