英文:
Simple load balancer doesn't work properly
问题
我从2010年的Google IO中获取了负载均衡器的代码,并为Balancer添加了优先级队列和同步锁的实现。我故意将workFn
函数的延迟设置得比requester
的延迟更长,以便我可以看到挂起值如何增加。我在命令行中运行它,并注意到在所有工作程序启动后,程序停止,并且所有工作程序的挂起值为1,并且没有显示任何内容。我无法找出错误在哪里,有时completed
只调用一次或两次。看起来<-b.done
在select语句中没有被正确处理。
package main
import (
"container/heap"
"fmt"
"math/rand"
"os"
"sync"
"time"
)
var nWorker int32 = 6
func main() {
rchanel := make(chan Request)
workers := Pool{
{make(chan Request), 0, 0},
{make(chan Request), 0, 1},
{make(chan Request), 0, 2},
{make(chan Request), 0, 3},
{make(chan Request), 0, 4},
{make(chan Request), 0, 5},
}
doneChan := make(chan *Worker)
balancer := Balancer{workers, sync.Mutex{}, doneChan}
for _, elem := range workers {
go elem.work(doneChan)
}
go balancer.balance(rchanel)
go requester(rchanel)
var input string
fmt.Scanln(&input)
}
type Request struct {
fn func() int
c chan int
}
func requester(work chan Request) {
c := make(chan int)
for {
time.Sleep(time.Duration(rand.Int31n(nWorker)) * 2e4)
work <- Request{workFn, c}
go func() {
result := <-c
fmt.Fprintf(os.Stderr, "Done: %v \n", result)
}()
}
}
func workFn() int {
val := rand.Int31n(nWorker)
time.Sleep(time.Duration(val) * 2e8)
return int(val)
}
type Worker struct {
requests chan Request
pending int
index int
}
func (w *Worker) work(done chan *Worker) {
for {
req := <-w.requests
req.c <- req.fn()
done <- w
}
}
type Pool []*Worker
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
func (p Pool) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
p[i].index = i
p[j].index = j
}
func (p Pool) Len() int { return len(p) }
func (p *Pool) Push(x interface{}) {
n := len(*p)
worker := x.(*Worker)
worker.index = n
*p = append(*p, worker)
}
func (p *Pool) Pop() interface{} {
old := *p
n := len(old)
item := old[n-1]
item.index = -1
*p = old[0 : n-1]
return item
}
type Balancer struct {
pool Pool
mu sync.Mutex
done chan *Worker
}
func (b *Balancer) dispatch(req Request) {
b.mu.Lock()
w := heap.Pop(&b.pool).(*Worker)
w.requests <- req
w.pending++
heap.Push(&b.pool, w)
b.mu.Unlock()
}
func (b *Balancer) completed(w *Worker) {
b.mu.Lock()
w.pending--
heap.Remove(&b.pool, w.index)
heap.Push(&b.pool, w)
b.mu.Unlock()
}
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work:
b.dispatch(req)
b.printStatus()
case w := <-b.done:
b.completed(w)
b.printStatus()
}
}
}
func (b *Balancer) printStatus() {
fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending)
}
英文:
I took loadbalancer code from google io 2010 and added implementation for priority queue and sync locking for Balancer. I intentionally set workFn
function delay greater than for requester
so I can see how pending value will be increased. I ran it in cli and noticed that after all workers start, program stops with pending value 1 for all workers and shows nothing. Where is mistake I cannot figure out, some times completed
called only once or two times. It looks like <-b.done
is not properly handled in select case.
package main
import (
"container/heap"
"fmt"
"math/rand"
"os"
"sync"
"time"
)
var nWorker int32 = 6
func main() {
rchanel := make(chan Request)
workers := Pool{
{make(chan Request), 0, 0},
{make(chan Request), 0, 1},
{make(chan Request), 0, 2},
{make(chan Request), 0, 3},
{make(chan Request), 0, 4},
{make(chan Request), 0, 5},
}
doneChan := make(chan *Worker)
balancer := Balancer{workers, sync.Mutex{}, doneChan}
for _, elem := range workers {
go elem.work(doneChan)
}
go balancer.balance(rchanel)
go requester(rchanel)
var input string
fmt.Scanln(&input)
}
type Request struct {
fn func() int
c chan int
}
func requester(work chan Request) {
c := make(chan int)
for {
time.Sleep(time.Duration(rand.Int31n(nWorker)) * 2e4)
work <- Request{workFn, c}
go func() {
result := <-c
fmt.Fprintf(os.Stderr, "Done: %v \n", result)
}()
}
}
func workFn() int {
val := rand.Int31n(nWorker)
time.Sleep(time.Duration(val) * 2e8)
return int(val)
}
type Worker struct {
requests chan Request
pending int
index int
}
func (w *Worker) work(done chan *Worker) {
for {
req := <-w.requests
req.c <- req.fn()
done <- w
}
}
type Pool []*Worker
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
func (p Pool) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
p[i].index = i
p[j].index = j
}
func (p Pool) Len() int { return len(p) }
func (p *Pool) Push(x interface{}) {
n := len(*p)
worker := x.(*Worker)
worker.index = n
*p = append(*p, worker)
}
func (p *Pool) Pop() interface{} {
old := *p
n := len(old)
item := old[n-1]
item.index = -1
*p = old[0 : n-1]
return item
}
type Balancer struct {
pool Pool
mu sync.Mutex
done chan *Worker
}
func (b *Balancer) dispatch(req Request) {
b.mu.Lock()
w := heap.Pop(&b.pool).(*Worker)
w.requests <- req
w.pending++
heap.Push(&b.pool, w)
b.mu.Unlock()
}
func (b *Balancer) completed(w *Worker) {
b.mu.Lock()
w.pending--
heap.Remove(&b.pool, w.index)
heap.Push(&b.pool, w)
b.mu.Unlock()
}
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work:
b.dispatch(req)
b.printStatus()
case w := <-b.done:
b.completed(w)
b.printStatus()
}
}
}
func (b *Balancer) printStatus() {
fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending)
}
答案1
得分: 1
问题在于balance()
goroutine 最终在 dispatch()
中的 w.requests <- req
处被阻塞,同时特定的 Worker
在 work()
中的 done <- w
处被阻塞,导致运行 balance()
的 goroutine 产生死锁。
以下是你需要的修复方法。balance()
需要在内部使用 goroutine。这将修复问题,因为现在无论例程在 dispatch()
还是 completed()
中被阻塞,balance()
的主例程都将继续从 channel
中进行选择。
注意:这在 playground 上无法运行,因为它会一直运行下去。
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work:
go func() {
b.dispatch(req)
b.printStatus()
}()
case w := <-b.done:
go func() {
b.completed(w)
b.printStatus()
}()
}
}
}
现在,printStatus
调用可以并发进行,它也需要使用 mutex
,否则会出现随机的 panic
。
func (b *Balancer) printStatus() {
b.mu.Lock()
fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending)
b.mu.Unlock()
}
现在,如果我能弄清楚为什么 pending
值一直在增加就好了... 据我所知,Worker.work()
应该只允许 pending
为 0
或 1
,因为 Worker
必须在从 dispatch()
获取另一个 Request
之前等待 done <- w
。我相信这是期望的结果,不是吗?
英文:
The issue is that the balance()
goroutine eventually gets blocked in dispatch()
on w.requests <- req
at the same time that specific Worker
is blocking in work()
on done <- w
, producing a deadlock for the goroutine running balance()
.
Here is the fix you need. balance()
needs to utilize goroutines internally. This will fix the issue because now it doesn't matter if the routine blocks in dispatch()
or completed()
, the main routine for balance()
will continue select
ing from the channel
s.
NOTE: This does not work on the playground because it goes on forever.
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work:
go func() {
b.dispatch(req)
b.printStatus()
}()
case w := <-b.done:
go func() {
b.completed(w)
b.printStatus()
}()
}
}
}
Now that printStatus
calls can be done concurrently, it needs to make use of the mutex
as well, or you will get random panic
s.
func (b *Balancer) printStatus() {
b.mu.Lock()
fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending)
b.mu.Unlock()
}
Now if I could just figure out why the pending
values just keep increasing... As far as I can tell, Worker.work()
should only ever allow pending
to be 0
or 1
because the Worker
has to wait on done <- w
before it can get another Request
from dispatch()
. I believe this is the desired result though isn't it?
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论