如何与通道中的待处理结果进行同步?

huangapple go评论83阅读模式
英文:

How to sync with pending results in channel?

问题

我有一个工作池,提供一个同步接口来获取结果:

func (p *Pool) Get() *Result {
    for {
        select {
        // 如果通道中有结果,就返回它们
        case r := <-p.results:
            return r
        // 否则检查是否有待处理的工作
        // 如果没有,返回nil表示所有工作都已完成
        default:
            if p.active < 1 {
                return nil
            }
        }
    }
}

这个实现的思路是,Get函数会返回下一个工作结果,如果所有工作都已完成,则返回nil

现在这个实现的问题是,我需要手动跟踪所有活动工作的数量,使用p.active计数器。从理论上讲,这个信息已经存在于p.results通道的长度中。

在缓冲区为空时,返回空值的惯用方法是什么?

英文:

I have a worker pool which offers a sync interface to pull out results:

func (p *Pool) Get() *Result {
    for {
        select {
        // if there are results in channel return them
        case r := &lt;-p.results:
            return r
        // else check if there is any work pending we must wait for
        // if not return nil to indicate that all work was done
        default:
            if p.active &lt; 1 {
                return nil
            }
        }
    }
}

The idea is that Get will return the next worker result or nil if all work is done.

The problem with this implementation now is that I need to manually keep track of all active work with the p.active counter. This feels somehow wrong as theoretically the information sits already in the length of the p.results channel.

What is an idiomatic approach to return nothing if a buffer is empty?

答案1

得分: 1

很遗憾,没有len(chan)这个函数,如果你不知道工作线程的数量,你的方法已经很好了。

然而,你需要一种对计数器进行同步的方法,下面是一个非常简单的方法:

type Result struct {
    I int
}
type Pool struct {
    res chan *Result
    c   int32
}

func New() *Pool {
    return &Pool{
        res: make(chan *Result),
    }
}

func (p *Pool) Put(r *Result) {
    atomic.AddInt32(&p.c, 1)
    time.Sleep(time.Duration(100+r.I%1000) * time.Microsecond)
    p.res <- r
}

func (p *Pool) Get() (r *Result) {
    for {
        select {
        case r = <-p.res:
            atomic.AddInt32(&p.c, -1)
            return
        default:
            if atomic.LoadInt32(&p.c) == 0 {
                return
            }
        }
    }
}
func main() {
    runtime.GOMAXPROCS(8)
    p := New()
    for i := 0; i < 50; i++ {
        go p.Put(&Result{i})
    }
    time.Sleep(10 * time.Microsecond)
    for {
        r := p.Get()
        if r == nil {
            return
        }
        fmt.Println("r.I", r.I)
    }
}

为了完整起见,这里还有另一个使用WaitGroup的示例,但是这也是一种过度设计,因为内部的WaitGroup也使用了原子计数器。

type Pool struct {
    res chan *Result
    wg  sync.WaitGroup
}

func New(n int) (p *Pool) {
    p = &Pool{
        res: make(chan *Result, n),
    }
    p.wg.Add(n)
    go func() {
        p.wg.Wait()
        close(p.res)
    }()
    return
}

func (p *Pool) Get() *Result {
    for {
        r, ok := <-p.res
        if !ok {
            return nil
        }
        p.wg.Done()
        return r

    }
}

//func Put is the same as above and the test code is the same.
英文:

Sadly there's no len(chan), your approach is as good as it gets if you don't know the number of workers.

However you need some kind of synchronization for the counter, here's an extremely simple approach:

type Result struct {
	I int
}
type Pool struct {
	res chan *Result
	c   int32
}

func New() *Pool {
	return &amp;Pool{
		res: make(chan *Result),
	}
}

func (p *Pool) Put(r *Result) {
	atomic.AddInt32(&amp;p.c, 1)
	time.Sleep(time.Duration(100+r.I%1000) * time.Microsecond)
	p.res &lt;- r
}

func (p *Pool) Get() (r *Result) {
	for {
		select {
		case r = &lt;-p.res:
			atomic.AddInt32(&amp;p.c, -1)
			return
		default:
			if atomic.LoadInt32(&amp;p.c) == 0 {
				return
			}
		}
	}
}
func main() {
	runtime.GOMAXPROCS(8)
	p := New()
	for i := 0; i &lt; 50; i++ {
		go p.Put(&amp;Result{i})
	}
	time.Sleep(10 * time.Microsecond)
	for {
		r := p.Get()
		if r == nil {
			return
		}
		fmt.Println(&quot;r.I&quot;, r.I)
	}
}

//edit

For completeness sake, here's another example using WaitGroup, but again this is an overkill since internally WG uses atomic counters anyway.

type Pool struct {
	res chan *Result
	wg  sync.WaitGroup
}

func New(n int) (p *Pool) {
	p = &amp;Pool{
		res: make(chan *Result, n),
	}
	p.wg.Add(n)
	go func() {
		p.wg.Wait()
		close(p.res)
	}()
	return
}

func (p *Pool) Get() *Result {
	for {
		r, ok := &lt;-p.res
		if !ok {
			return nil
		}
		p.wg.Done()
		return r

	}
}

//func Put is the same as above and the test code is the same.

答案2

得分: 0

你可以使用WaitGroup和一个在其上阻塞的goroutine,并在完成时更改一些标志。

假设你有一个WaitGroup,对于每个入队的项目,你都会在其上调用wg.Add(1)。每次从通道接收到消息,或者每次一个工作线程完成时,都可以调用wg.Done()来减少计数器。

然后,你可以创建一个等待所有任务完成的goroutine,并设置一个标志:

go func() {
    wg.Wait()
    p.done = true // 当然要确保线程安全
} 

在你的默认情况下,你只需检查完成标志:

default:
    if p.done {
        return nil
    }
}

详细信息请参阅WaitGroup的文档。那里的示例与你的情况有些相似。http://golang.org/pkg/sync/#WaitGroup.Wait

英文:

You could use a WaitGroup, and a gorouotine that blocks on it, and changes some flag when it's done.

So say you have a waitGroup, and for each item you enqueue you call wg.Add(1) on it. Every time you receive on the channel, or every time a worker finishes, it can call wg.Done() to decrease its counter.

Then you have a goroutine that waits for evertyhing to finish, and just sets a flag:

 go func() {
     wg.Wait()
     p.done = true //do this thread safe of course
 } 

And in your default case, you simply check the done flag

   default:
        if p.done {
            return nil
        }

See the docs on WaitGroup for more details. The example there is a bit similar to your case. http://golang.org/pkg/sync/#WaitGroup.Wait

huangapple
  • 本文由 发表于 2014年7月13日 21:45:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/24723327.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定