寻找 Golang 并发模式。

huangapple go评论88阅读模式
英文:

looking for golang concurrency pattern

问题

这是我正在尝试解决的问题:

package main

import "fmt"

func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
   for d := range work_in_chan {
        fmt.Println("A ", d)
        work_out_chan <- d
   }
}

func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
   for d := range work_in_chan {
        fmt.Println("B ", d)
        work_out_chan <- d
   }
}

func account(account_chan <-chan int, final_chan chan<- int) {

    wa_in := make(chan int)
    wa_out := make(chan int)
    wb_in := make(chan int)
    wb_out := make(chan int)

    go workerA(wa_in, wa_out)
    go workerB(wb_in, wb_out)

    for d := range account_chan {

        //TODO - dumb implementation starts here
        wa_in <- d
        <-wa_out

        wb_in <- d
        <-wb_out
        //TODO - dumb implementation ends here

        final_chan <- d
    }
}

func main() {

    account_chan := make(chan int, 100)
    final_chan := make(chan int, 100)

    go account(account_chan, final_chan)

    account_chan <- 1
    account_chan <- 2
    account_chan <- 3

    fmt.Println(<-final_chan)
    fmt.Println(<-final_chan)
    fmt.Println(<-final_chan)
}

account协程在account_chan上接收传入的数据,对数据执行一些操作,然后将数据发送到final_chan。workerA和workerB执行account的工作(顺序不重要),在将数据发送到final_chan之前,它们都必须完成对数据的处理。
有一些要求:

  • workerA和workerB是单个协程。
  • 在任何时候都应该有固定数量的协程(因此不能为每个新数据项添加新的协程)。

我贴出的实现是愚蠢的,因为现在workerA和workerB从未同时执行(尽管它们可以和应该同时执行,因为它们彼此完全独立)。那么我可以使用哪种并发模式来解决这个问题?

英文:

Here is the problem I am trying to solve:

package main
import &quot;fmt&quot;
func workerA(work_in_chan &lt;-chan int,work_out_chan chan&lt;- int){
for d := range work_in_chan {
fmt.Println(&quot;A &quot;,d)
work_out_chan &lt;- d
}
}
func workerB(work_in_chan &lt;-chan int,work_out_chan chan&lt;- int){
for d := range work_in_chan {
fmt.Println(&quot;B &quot;,d)
work_out_chan &lt;- d
}
}
func account(account_chan &lt;-chan int,final_chan chan&lt;- int){
wa_in := make(chan int)
wa_out := make(chan int)
wb_in := make(chan int)
wb_out := make(chan int)
go workerA(wa_in,wa_out)
go workerB(wb_in,wb_out)
for d := range account_chan {
//TODO - dumb implementation starts here
wa_in &lt;- d
&lt;-wa_out
wb_in &lt;- d
&lt;-wb_out
//TODO - dumb implementation ends here
final_chan &lt;- d
}
}
func main() {
account_chan := make(chan int, 100)
final_chan := make(chan int, 100)
go account(account_chan,final_chan)
account_chan &lt;- 1
account_chan &lt;- 2
account_chan &lt;- 3
fmt.Println(&lt;-final_chan)
fmt.Println(&lt;-final_chan)
fmt.Println(&lt;-final_chan)
}

The account goroutine receives incoming data on account_chan, executes some work on the data, and once complete sends the data to final_chan. The account work is done by workerA and workerB (order is not important),both must complete on the data before account sends it to final_data.
There are a few requirements:

  • workerA and workerB are single goroutines
  • there should be a constant amount of goroutines at any one time (so no adding new goroutines for each new data item).

My pasted implementation is dumb since now workerA and workerB are never executing concurrently (as they could & should since they are completely independent of each other). So which concurrency pattern can I use to solve this problem?

答案1

得分: 1

你将输入传递给工作线程,然后阻塞直到分别获得它们的结果。

// 给工作线程 A 分配任务
wa_in <- d
// 等待工作线程 A 完成
<-wa_out
// 给工作线程 B 分配任务
wb_in <- d
// 等待工作线程 B 完成
<-wb_out

相反,使用**select语句**同时等待两个通道的结果:

func account(account_chan <-chan int, final_chan chan<- int){
wa_in := make(chan int)
wa_out := make(chan int)
wb_in := make(chan int)
wb_out := make(chan int)
go workerA(wa_in, wa_out)
go workerB(wb_in, wb_out)
for d := range account_chan {
wa_in <- d
wb_in <- d
for i := 0; i < 2; i++ {
select {
case <-wa_out:
case <-wb_out:
}
}
final_chan <- d
}
}

http://play.golang.org/p/U0fk1yiqWL

现在,两个工作线程将并发运行,但程序仍然保证等待所有工作线程完成。

还可以参考并发模式的 Go 文档

英文:

You pass the input for the workers and then block until you get their result separately.

// Give worker A work
wa_in &lt;- d
// Wait until worker A finished
&lt;-wa_out
// Give worker B work
wb_in &lt;- d
// Wait until worker B finished
&lt;-wb_out

Instead, use the select statement to wait for a result on one of two channels symultaneously:

func account(account_chan &lt;-chan int,final_chan chan&lt;- int){
wa_in := make(chan int)
wa_out := make(chan int)
wb_in := make(chan int)
wb_out := make(chan int)
go workerA(wa_in,wa_out)
go workerB(wb_in,wb_out)
for d := range account_chan {
wa_in &lt;- d
wb_in &lt;- d
for i := 0 ; i &lt; 2; i++ {
select {
case &lt;-wa_out:
case &lt;-wb_out:
}
}
final_chan &lt;- d
}
}

http://play.golang.org/p/U0fk1yiqWL

Now, the two workers will run concurrently but the program is still guaranteed to wait for all the workers to finish.

Also see the concurrency patterns go doc.

答案2

得分: 1

根据您提供的限制,可以做的事情不多。只需重新排序通道操作以允许并发可能是您要寻找的全部内容。

for d := range account_chan {
    wa_in <- d
    wb_in <- d

    <-wa_out
    <-wb_out

    final_chan <- d
}

play.golang.org/p/4d8hKyHTWq

第一次看到这个模式时,我担心“但如果 B 先完成怎么办”。事实证明,顺序并不重要,因为两者都需要接收。

关于风格的一点说明:
提供的代码片段似乎有太多的通道和 goroutine。但这可能是因为这是一个更复杂的问题,被简化为了关键部分。实际上可能存在一个问题,即来自工作协程的输出通道。在示例中没有使用它们的输出,我无法看到它们在完整的代码清单中如何使用。要么值被复制,那么输出通道就不需要(sync.WaitGroup会更好),要么它们在工作协程之间共享是不安全的。

英文:

With the restrictions you've provided there isn't much that can be done. Simply reordering the channel operation to allows concurrency might be all you're looking for.

for d := range account_chan {
wa_in &lt;- d
wb_in &lt;- d
&lt;-wa_out
&lt;-wb_out
final_chan &lt;- d
}

play.golang.org/p/4d8hKyHTWq
The first time I saw this pattern, I worried "but what if B gets done first". It turns out the order doesn't really matter as both need to recv'd from.


An aside on style:
The provided snippet smells like it has too many channels and goroutines. But that may because this is a more complicated problem distilled down to a the essential parts. One thing that may actually be a problem is the out channel from the workers. Their output isn't used in the example and I can't see how it could be in a full listing. Either the values are copied in which case the out channel isn't needed (a sync.WaitGroup would be better) or they're not safe to share between the workers.

huangapple
  • 本文由 发表于 2013年8月29日 23:29:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/18515098.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定