简单的负载均衡器无法正常工作。

huangapple go评论91阅读模式
英文:

Simple load balancer doesn't work properly

问题

我从2010年的Google IO中获取了负载均衡器的代码,并为Balancer添加了优先级队列和同步锁的实现。我故意将workFn函数的延迟设置得比requester的延迟更长,以便我可以看到挂起值如何增加。我在命令行中运行它,并注意到在所有工作程序启动后,程序停止,并且所有工作程序的挂起值为1,并且没有显示任何内容。我无法找出错误在哪里,有时completed只调用一次或两次。看起来<-b.done在select语句中没有被正确处理。

package main

import (
	"container/heap"
	"fmt"
	"math/rand"
	"os"
	"sync"
	"time"
)

var nWorker int32 = 6

func main() {
	rchanel := make(chan Request)
	workers := Pool{
		{make(chan Request), 0, 0},
		{make(chan Request), 0, 1},
		{make(chan Request), 0, 2},
		{make(chan Request), 0, 3},
		{make(chan Request), 0, 4},
		{make(chan Request), 0, 5},
	}
	doneChan := make(chan *Worker)
	balancer := Balancer{workers, sync.Mutex{}, doneChan}
	for _, elem := range workers {
		go elem.work(doneChan)
	}
	go balancer.balance(rchanel)
	go requester(rchanel)

	var input string
	fmt.Scanln(&input)
}

type Request struct {
	fn func() int
	c  chan int
}

func requester(work chan Request) {
	c := make(chan int)
	for {
		time.Sleep(time.Duration(rand.Int31n(nWorker)) * 2e4)
		work <- Request{workFn, c}
		go func() {
			result := <-c
			fmt.Fprintf(os.Stderr, "Done: %v \n", result)
		}()
	}
}

func workFn() int {
	val := rand.Int31n(nWorker)
	time.Sleep(time.Duration(val) * 2e8)
	return int(val)
}

type Worker struct {
	requests chan Request
	pending  int
	index    int
}

func (w *Worker) work(done chan *Worker) {
	for {
		req := <-w.requests
		req.c <- req.fn()
		done <- w
	}
}

type Pool []*Worker

func (p Pool) Less(i, j int) bool {
	return p[i].pending < p[j].pending
}
func (p Pool) Swap(i, j int) {
	p[i], p[j] = p[j], p[i]
	p[i].index = i
	p[j].index = j
}
func (p Pool) Len() int { return len(p) }
func (p *Pool) Push(x interface{}) {
	n := len(*p)
	worker := x.(*Worker)
	worker.index = n
	*p = append(*p, worker)
}
func (p *Pool) Pop() interface{} {
	old := *p
	n := len(old)
	item := old[n-1]
	item.index = -1
	*p = old[0 : n-1]
	return item
}

type Balancer struct {
	pool Pool
	mu   sync.Mutex
	done chan *Worker
}

func (b *Balancer) dispatch(req Request) {
	b.mu.Lock()
	w := heap.Pop(&b.pool).(*Worker)
	w.requests <- req
	w.pending++
	heap.Push(&b.pool, w)
	b.mu.Unlock()
}
func (b *Balancer) completed(w *Worker) {
	b.mu.Lock()
	w.pending--
	heap.Remove(&b.pool, w.index)
	heap.Push(&b.pool, w)
	b.mu.Unlock()
}

func (b *Balancer) balance(work chan Request) {
	for {
		select {
		case req := <-work:
			b.dispatch(req)
			b.printStatus()
		case w := <-b.done:
			b.completed(w)
			b.printStatus()
		}
	}
}

func (b *Balancer) printStatus() {
	fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending)
}
英文:

I took loadbalancer code from google io 2010 and added implementation for priority queue and sync locking for Balancer. I intentionally set workFn function delay greater than for requester so I can see how pending value will be increased. I ran it in cli and noticed that after all workers start, program stops with pending value 1 for all workers and shows nothing. Where is mistake I cannot figure out, some times completed called only once or two times. It looks like &lt;-b.done is not properly handled in select case.

package main
import (
&quot;container/heap&quot;
&quot;fmt&quot;
&quot;math/rand&quot;
&quot;os&quot;
&quot;sync&quot;
&quot;time&quot;
)
var nWorker int32 = 6
func main() {
rchanel := make(chan Request)
workers := Pool{
{make(chan Request), 0, 0},
{make(chan Request), 0, 1},
{make(chan Request), 0, 2},
{make(chan Request), 0, 3},
{make(chan Request), 0, 4},
{make(chan Request), 0, 5},
}
doneChan := make(chan *Worker)
balancer := Balancer{workers, sync.Mutex{}, doneChan}
for _, elem := range workers {
go elem.work(doneChan)
}
go balancer.balance(rchanel)
go requester(rchanel)
var input string
fmt.Scanln(&amp;input)
}
type Request struct {
fn func() int
c  chan int
}
func requester(work chan Request) {
c := make(chan int)
for {
time.Sleep(time.Duration(rand.Int31n(nWorker)) * 2e4)
work &lt;- Request{workFn, c}
go func() {
result := &lt;-c
fmt.Fprintf(os.Stderr, &quot;Done: %v \n&quot;, result)
}()
}
}
func workFn() int {
val := rand.Int31n(nWorker)
time.Sleep(time.Duration(val) * 2e8)
return int(val)
}
type Worker struct {
requests chan Request
pending  int
index    int
}
func (w *Worker) work(done chan *Worker) {
for {
req := &lt;-w.requests
req.c &lt;- req.fn()
done &lt;- w
}
}
type Pool []*Worker
func (p Pool) Less(i, j int) bool {
return p[i].pending &lt; p[j].pending
}
func (p Pool) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
p[i].index = i
p[j].index = j
}
func (p Pool) Len() int { return len(p) }
func (p *Pool) Push(x interface{}) {
n := len(*p)
worker := x.(*Worker)
worker.index = n
*p = append(*p, worker)
}
func (p *Pool) Pop() interface{} {
old := *p
n := len(old)
item := old[n-1]
item.index = -1
*p = old[0 : n-1]
return item
}
type Balancer struct {
pool Pool
mu   sync.Mutex
done chan *Worker
}
func (b *Balancer) dispatch(req Request) {
b.mu.Lock()
w := heap.Pop(&amp;b.pool).(*Worker)
w.requests &lt;- req
w.pending++
heap.Push(&amp;b.pool, w)
b.mu.Unlock()
}
func (b *Balancer) completed(w *Worker) {
b.mu.Lock()
w.pending--
heap.Remove(&amp;b.pool, w.index)
heap.Push(&amp;b.pool, w)
b.mu.Unlock()
}
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := &lt;-work:
b.dispatch(req)
b.printStatus()
case w := &lt;-b.done:
b.completed(w)
b.printStatus()
}
}
}
func (b *Balancer) printStatus() {
fmt.Fprintf(os.Stderr, &quot;Status: %v %v %v %v %v %v\n&quot;, b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending)
}

答案1

得分: 1

问题在于balance() goroutine 最终在 dispatch() 中的 w.requests <- req 处被阻塞,同时特定的 Workerwork() 中的 done <- w 处被阻塞,导致运行 balance() 的 goroutine 产生死锁。

以下是你需要的修复方法。balance() 需要在内部使用 goroutine。这将修复问题,因为现在无论例程在 dispatch() 还是 completed() 中被阻塞,balance() 的主例程都将继续从 channel 中进行选择。

注意:这在 playground 上无法运行,因为它会一直运行下去。

func (b *Balancer) balance(work chan Request) {
    for {
        select {
        case req := <-work:
            go func() {
                b.dispatch(req)
                b.printStatus()
            }()
        case w := <-b.done:
            go func() {
                b.completed(w)
                b.printStatus()
            }()
        }
    }
}

现在,printStatus 调用可以并发进行,它也需要使用 mutex,否则会出现随机的 panic

func (b *Balancer) printStatus() {
    b.mu.Lock()
    fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending)
    b.mu.Unlock()
}

现在,如果我能弄清楚为什么 pending 值一直在增加就好了... 据我所知,Worker.work() 应该只允许 pending01,因为 Worker 必须在从 dispatch() 获取另一个 Request 之前等待 done <- w。我相信这是期望的结果,不是吗?

英文:

The issue is that the balance() goroutine eventually gets blocked in dispatch() on w.requests &lt;- req at the same time that specific Worker is blocking in work() on done &lt;- w, producing a deadlock for the goroutine running balance().

Here is the fix you need. balance() needs to utilize goroutines internally. This will fix the issue because now it doesn't matter if the routine blocks in dispatch() or completed(), the main routine for balance() will continue selecting from the channels.

NOTE: This does not work on the playground because it goes on forever.

func (b *Balancer) balance(work chan Request) {
for {
select {
case req := &lt;-work:
go func() {
b.dispatch(req)
b.printStatus()
}()
case w := &lt;-b.done:
go func() {
b.completed(w)
b.printStatus()
}()
}
}
}

Now that printStatus calls can be done concurrently, it needs to make use of the mutex as well, or you will get random panics.

func (b *Balancer) printStatus() {
b.mu.Lock()
fmt.Fprintf(os.Stderr, &quot;Status: %v %v %v %v %v %v\n&quot;, b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending)
b.mu.Unlock()
}

Now if I could just figure out why the pending values just keep increasing... As far as I can tell, Worker.work() should only ever allow pending to be 0 or 1 because the Worker has to wait on done &lt;- w before it can get another Request from dispatch(). I believe this is the desired result though isn't it?

huangapple
  • 本文由 发表于 2017年8月25日 20:52:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/45881894.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定