英文:
How to sync with pending results in channel?
问题
我有一个工作池,提供一个同步接口来获取结果:
func (p *Pool) Get() *Result {
for {
select {
// 如果通道中有结果,就返回它们
case r := <-p.results:
return r
// 否则检查是否有待处理的工作
// 如果没有,返回nil表示所有工作都已完成
default:
if p.active < 1 {
return nil
}
}
}
}
这个实现的思路是,Get
函数会返回下一个工作结果,如果所有工作都已完成,则返回nil
。
现在这个实现的问题是,我需要手动跟踪所有活动工作的数量,使用p.active
计数器。从理论上讲,这个信息已经存在于p.results
通道的长度中。
在缓冲区为空时,返回空值的惯用方法是什么?
英文:
I have a worker pool which offers a sync interface to pull out results:
func (p *Pool) Get() *Result {
for {
select {
// if there are results in channel return them
case r := <-p.results:
return r
// else check if there is any work pending we must wait for
// if not return nil to indicate that all work was done
default:
if p.active < 1 {
return nil
}
}
}
}
The idea is that Get will return the next worker result or nil
if all work is done.
The problem with this implementation now is that I need to manually keep track of all active work with the p.active
counter. This feels somehow wrong as theoretically the information sits already in the length of the p.results
channel.
What is an idiomatic approach to return nothing if a buffer is empty?
答案1
得分: 1
很遗憾,没有len(chan)
这个函数,如果你不知道工作线程的数量,你的方法已经很好了。
然而,你需要一种对计数器进行同步的方法,下面是一个非常简单的方法:
type Result struct {
I int
}
type Pool struct {
res chan *Result
c int32
}
func New() *Pool {
return &Pool{
res: make(chan *Result),
}
}
func (p *Pool) Put(r *Result) {
atomic.AddInt32(&p.c, 1)
time.Sleep(time.Duration(100+r.I%1000) * time.Microsecond)
p.res <- r
}
func (p *Pool) Get() (r *Result) {
for {
select {
case r = <-p.res:
atomic.AddInt32(&p.c, -1)
return
default:
if atomic.LoadInt32(&p.c) == 0 {
return
}
}
}
}
func main() {
runtime.GOMAXPROCS(8)
p := New()
for i := 0; i < 50; i++ {
go p.Put(&Result{i})
}
time.Sleep(10 * time.Microsecond)
for {
r := p.Get()
if r == nil {
return
}
fmt.Println("r.I", r.I)
}
}
为了完整起见,这里还有另一个使用WaitGroup的示例,但是这也是一种过度设计,因为内部的WaitGroup
也使用了原子计数器。
type Pool struct {
res chan *Result
wg sync.WaitGroup
}
func New(n int) (p *Pool) {
p = &Pool{
res: make(chan *Result, n),
}
p.wg.Add(n)
go func() {
p.wg.Wait()
close(p.res)
}()
return
}
func (p *Pool) Get() *Result {
for {
r, ok := <-p.res
if !ok {
return nil
}
p.wg.Done()
return r
}
}
//func Put is the same as above and the test code is the same.
英文:
Sadly there's no len(chan)
, your approach is as good as it gets if you don't know the number of workers.
However you need some kind of synchronization for the counter, here's an extremely simple approach:
type Result struct {
I int
}
type Pool struct {
res chan *Result
c int32
}
func New() *Pool {
return &Pool{
res: make(chan *Result),
}
}
func (p *Pool) Put(r *Result) {
atomic.AddInt32(&p.c, 1)
time.Sleep(time.Duration(100+r.I%1000) * time.Microsecond)
p.res <- r
}
func (p *Pool) Get() (r *Result) {
for {
select {
case r = <-p.res:
atomic.AddInt32(&p.c, -1)
return
default:
if atomic.LoadInt32(&p.c) == 0 {
return
}
}
}
}
func main() {
runtime.GOMAXPROCS(8)
p := New()
for i := 0; i < 50; i++ {
go p.Put(&Result{i})
}
time.Sleep(10 * time.Microsecond)
for {
r := p.Get()
if r == nil {
return
}
fmt.Println("r.I", r.I)
}
}
//edit
For completeness sake, here's another example using WaitGroup, but again this is an overkill since internally WG uses atomic counters anyway.
type Pool struct {
res chan *Result
wg sync.WaitGroup
}
func New(n int) (p *Pool) {
p = &Pool{
res: make(chan *Result, n),
}
p.wg.Add(n)
go func() {
p.wg.Wait()
close(p.res)
}()
return
}
func (p *Pool) Get() *Result {
for {
r, ok := <-p.res
if !ok {
return nil
}
p.wg.Done()
return r
}
}
//func Put is the same as above and the test code is the same.
答案2
得分: 0
你可以使用WaitGroup
和一个在其上阻塞的goroutine,并在完成时更改一些标志。
假设你有一个WaitGroup
,对于每个入队的项目,你都会在其上调用wg.Add(1)
。每次从通道接收到消息,或者每次一个工作线程完成时,都可以调用wg.Done()
来减少计数器。
然后,你可以创建一个等待所有任务完成的goroutine,并设置一个标志:
go func() {
wg.Wait()
p.done = true // 当然要确保线程安全
}
在你的默认情况下,你只需检查完成标志:
default:
if p.done {
return nil
}
}
详细信息请参阅WaitGroup
的文档。那里的示例与你的情况有些相似。http://golang.org/pkg/sync/#WaitGroup.Wait
英文:
You could use a WaitGroup
, and a gorouotine that blocks on it, and changes some flag when it's done.
So say you have a waitGroup, and for each item you enqueue you call wg.Add(1)
on it. Every time you receive on the channel, or every time a worker finishes, it can call wg.Done()
to decrease its counter.
Then you have a goroutine that waits for evertyhing to finish, and just sets a flag:
go func() {
wg.Wait()
p.done = true //do this thread safe of course
}
And in your default case, you simply check the done flag
default:
if p.done {
return nil
}
See the docs on WaitGroup
for more details. The example there is a bit similar to your case. http://golang.org/pkg/sync/#WaitGroup.Wait
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论