Go语言的select语句无法接收到发送的值。

huangapple go评论132阅读模式
英文:

Go Golang select statement cannot receive sended value

问题

我是你的中文翻译助手,以下是你提供的代码的翻译:

package main

import (
	"fmt"
	"time"
	"container/heap"
)

type Request struct {
	fn func(*Worker) int
	c  chan int
}

func requester(work chan<- Request) {
	c := make(chan int)
	work <- Request{workFn, c}
	result := <-c
	furtherProcess(result)
}

func workFn(w *Worker) int {
	time.Sleep(1000 * time.Millisecond)
	return w.index
}

func furtherProcess(result int) {
	fmt.Println(result)
}

type Worker struct {
	request chan Request
	pending int
	index   int
}

func (w *Worker) work(done chan *Worker) {
	for req := range w.request {
		req.c <- req.fn(w)
		fmt.Println("sending to done:", done)
		done <- w
		fmt.Println("sended to done")
	}
}

type Pool []*Worker

type Balancer struct {
	pool Pool
	done chan *Worker
}

func (b *Balancer) balance(work chan Request) {
	for {
		fmt.Println("selecting, done:", b.done)
		select {
		case req := <-work:
			b.dispatch(req)
		case w := <-b.done:
			fmt.Println("completed")
			b.completed(w)
		}
	}
}

func (p Pool) Len() int {
	return len(p)
}

func (p Pool) Less(i, j int) bool {
	return p[i].pending < p[j].pending
}

func (p Pool) Swap(i, j int) {
	p[i], p[j] = p[j], p[i]
}

func (p *Pool) Push(x interface{}) {
	*p = append(*p, x.(*Worker))
}

func (p *Pool) Pop() interface{} {
	old := *p
	n := len(old)
	x := old[n-1]
	*p = old[0 : n-1]
	return x
}

func (b *Balancer) dispatch(req Request) {
	w := heap.Pop(&b.pool).(*Worker)
	w.request <- req
	w.pending++
	heap.Push(&b.pool, w)
	fmt.Println("dispatched to worker", w.index)
}

func (b *Balancer) completed(w *Worker) {
	w.pending--
	heap.Remove(&b.pool, w.index)
	heap.Push(&b.pool, w)
}

func Run() {
	NumWorkers := 4
	req := make(chan Request)
	done := make(chan *Worker)
	b := Balancer{make([]*Worker, NumWorkers), done}
	for i := 0; i < NumWorkers; i++ {
		w := Worker{make(chan Request), 0, i}
		b.pool[i] = &w
		go w.work(done)
	}
	go b.balance(req)
	for i := 0; i < NumWorkers*4; i++ {
		go requester(req)
	}
	time.Sleep(200000 * time.Millisecond)
}

func main() {
	Run()
}

当你运行这段代码时,你得到了以下输出:

selecting, done: 0xc0820082a0
dispatched to worker 0
selecting, done: 0xc0820082a0
dispatched to worker 3
selecting, done: 0xc0820082a0
dispatched to worker 2
selecting, done: 0xc0820082a0
dispatched to worker 1
selecting, done: 0xc0820082a0
sending to done: 0xc0820082a0
sending to done: 0xc0820082a0
3
sending to done: 0xc0820082a0
2
1
0
sending to done: 0xc0820082a0

正如你所看到的,它在同一个管道上进行选择和发送(done: 0xc0820082a0),但是选择语句没有接收到发送的值,并且一直阻塞。这是怎么发生的?上面的代码有什么问题?谢谢!

英文:

I'm new to Go and trying to implement a simple load balancer as seen in this slides:
http://concur.rspace.googlecode.com/hg/talk/concur.html#slide-42

The complete code:

package main
import (
&quot;fmt&quot;
&quot;time&quot;
&quot;container/heap&quot;
)
type Request struct {
fn func(*Worker) int
c  chan int
}
func requester(work chan &lt;-Request) {
c := make(chan int)
work &lt;- Request{workFn, c}
result := &lt;-c
furtherProcess(result)
}
func workFn(w *Worker) int {
time.Sleep(1000 * time.Millisecond)
return w.index
}
func furtherProcess(result int) {
fmt.Println(result)
}
type Worker struct {
request chan Request
pending int
index   int
}
func (w *Worker) work(done chan *Worker) {
for req := range w.request {
req.c &lt;- req.fn(w)
fmt.Println(&quot;sending to done:&quot;, done)
done &lt;- w
fmt.Println(&quot;sended to done&quot;)
}
}
type Pool []*Worker
type Balancer struct {
pool Pool
done chan *Worker
}
func (b *Balancer) balance(work chan Request) {
for {
fmt.Println(&quot;selecting, done:&quot;, b.done)
select {
case req := &lt;-work:
b.dispatch(req)
case w := &lt;-b.done:
fmt.Println(&quot;completed&quot;)
b.completed(w)
}
}
}
func (p Pool) Len() int {
return len(p)
}
func (p Pool) Less(i, j int) bool {
return p[i].pending &lt; p[j].pending
}
func (p Pool) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
func (p *Pool) Push(x interface{}) {
*p = append(*p, x.(*Worker))
}
func (p *Pool) Pop() interface{} {
old := *p
n := len(old)
x := old[n - 1]
*p = old[0 : n - 1]
return x
}
func (b *Balancer) dispatch(req Request) {
w := heap.Pop(&amp;b.pool).(*Worker)
w.request &lt;- req
w.pending++
heap.Push(&amp;b.pool, w)
fmt.Println(&quot;dispatched to worker&quot;, w.index)
}
func (b *Balancer) completed(w *Worker) {
w.pending--
heap.Remove(&amp;b.pool, w.index)
heap.Push(&amp;b.pool, w)
}
func Run() {
NumWorkers := 4
req := make(chan Request)
done := make(chan *Worker)
b := Balancer{make([]*Worker, NumWorkers), done}
for i := 0; i &lt; NumWorkers; i++ {
w := Worker{make(chan Request), 0, i}
b.pool[i] = &amp;w
go w.work(done)
}
go b.balance(req)
for i := 0; i &lt; NumWorkers * 4; i++ {
go requester(req)
}
time.Sleep(200000 * time.Millisecond)
}
func main() {
Run()
}

When I ran it, I got following outputs:

selecting, done: 0xc0820082a0
dispatched to worker 0
selecting, done: 0xc0820082a0
dispatched to worker 3
selecting, done: 0xc0820082a0
dispatched to worker 2
selecting, done: 0xc0820082a0
dispatched to worker 1
selecting, done: 0xc0820082a0
sending to done: 0xc0820082a0
sending to done: 0xc0820082a0
3
sending to done: 0xc0820082a0
2
1
0
sending to done: 0xc0820082a0

As you can see, it was selecting on and sending to the same pipe (done: 0xc0820082a0), but the select didn't receive the sended value and was blocking forever. How could this happen? What's the problem with the above code? Thanks!

答案1

得分: 0

使用kill -ABRT <PID>命令,你可以看到所有的工作线程都在done <- w这一行被阻塞,而负载均衡器则在w.request <- req这一行被阻塞,导致了死锁(工作线程无法继续执行直到负载均衡器接收到它们的“完成”信号,而负载均衡器无法继续执行直到选定的工作线程接收到请求)。

如果你将done <- w替换为go func() { done <- w }(),你会发现你的程序可以处理16个请求而不会挂起。

另外注意:你可以考虑使用sync.WaitGroup来替代time.Sleep(200000 * time.Millisecond)

英文:

Using kill -ABRT &lt;PID&gt; you can see that all your Workers are blocked on done &lt;- w while your Balancer is blocked on w.request &lt;- req, creating a deadlock (workers can't go further until the balancer receives their "done" signals, and the balancer can't go further until the selected worker takes the request).

If you replace done &lt;- w by go func() { done &lt;- w }(), you can see that your program will process the 16 requests without hanging.

Side note: instead of time.Sleep(200000 * time.Millisecond), look into sync.WaitGroup

huangapple
  • 本文由 发表于 2015年10月25日 15:17:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/33327201.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定