如何同时运行多个 goroutine 并按照它们运行的顺序收集结果?

huangapple go评论83阅读模式
英文:

How to run multiple goroutines and collect results in the same order it runs

问题

我有以下代码,其中有一个双重go例程结构:

package main

import(
	"fmt"
	"math/rand"
	"time"
	"strconv"
)

func main(){
	outchan := make(chan string)
	for i:=0;i<10;i++{
		go testfun(i, outchan)
	}
	for i:=0;i<10;i++{
		a := <-outchan
		fmt.Println(a)
	}
}

func testfun(i int, outchan chan<- string){
	outchan2 := make(chan int)
	time.Sleep(time.Millisecond*time.Duration(int64(rand.Intn(10))))
	for j:=0;j<10;j++ {
		go testfun2(j, outchan2)
	}
	tempStr := strconv.FormatInt(int64(i),10)+" - "
	for j:=0;j<10;j++ {
		tempStr = tempStr + strconv.FormatInt(int64(<-outchan2),10)
	}
	outchan <- tempStr
}

func testfun2(j int, outchan2 chan<- int){
	time.Sleep(time.Millisecond*time.Duration(int64(rand.Intn(10))))
	outchan2 <- j
}

我期望的输出是:

0 - 0123456789
1 - 0123456789
2 - 0123456789
3 - 0123456789
4 - 0123456789
5 - 0123456789
6 - 0123456789
7 - 0123456789
8 - 0123456789
9 - 0123456789

但实际上我得到了这个输出:

7 - 7980345261
6 - 4035897621
3 - 9047526831
9 - 4032861975
8 - 9570831624
5 - 3798021546
1 - 0985362471
0 - 1849276035
2 - 9572806143
4 - 5768032419

有人可以告诉我如何实现我期望的输出吗?我是个新手,如果解决方案很明显,请原谅我。我已经找了好几天了。

英文:

I have the following code which has a double-go routine structure:

package main

import(
	&quot;fmt&quot;
	&quot;math/rand&quot;
	&quot;time&quot;
	&quot;strconv&quot;
)

func main(){
	outchan := make(chan string)
	for i:=0;i&lt;10;i++{
		go testfun(i, outchan)
	}
	for i:=0;i&lt;10;i++{
		a := &lt;-outchan
		fmt.Println(a)
	}
}

func testfun(i int, outchan chan&lt;- string){
	outchan2 := make(chan int)
	time.Sleep(time.Millisecond*time.Duration(int64(rand.Intn(10))))
	for j:=0;j&lt;10;j++ {
		go testfun2(j, outchan2)
	}
	tempStr := strconv.FormatInt(int64(i),10)+&quot; - &quot;
	for j:=0;j&lt;10;j++ {
		tempStr = tempStr + strconv.FormatInt(int64(&lt;-outchan2),10)
	}
	outchan &lt;- tempStr
}

func testfun2(j int, outchan2 chan&lt;- int){
	time.Sleep(time.Millisecond*time.Duration(int64(rand.Intn(10))))
	outchan2 &lt;- j
}

The output I was expecting is

0 - 0123456789
1 - 0123456789
2 - 0123456789
3 - 0123456789
4 - 0123456789
5 - 0123456789
6 - 0123456789
7 - 0123456789
8 - 0123456789
9 - 0123456789

But instead I got this:

7 - 7980345261
6 - 4035897621
3 - 9047526831
9 - 4032861975
8 - 9570831624
5 - 3798021546
1 - 0985362471
0 - 1849276035
2 - 9572806143
4 - 5768032419

Could anyone show me how to achieve the output I was expecting? I'm a newbie and please forgive me if the solution is obvious. I've been looking for it for days.

答案1

得分: 6

为了给你一个更好的理解,问题在于你正在读取一个单通道,而推送到通道的值由于你的time.Sleep调用而以任意顺序进行。如果你想要并发地发出time.Sleep调用以模拟并发的长时间运行的进程,你需要让每个goroutine写入一个带有结果的通道。

这样,你可以遍历结果通道的有序列表,并在下一个通道可以读取之前进行阻塞(与此答案中的输出队列的思路相同:https://stackoverflow.com/questions/3227042/maintaining-order-in-a-multi-threaded-pipeline#answer-3228846)。下面是经过修改的代码,其中一些名称已更改以便于跟踪:

package main

import (
    "fmt"
    "math/rand"
    "time"
    "strconv"
)

func main() {
    var jobs []chan string
    for i := 0; i < 10; i++ {
        job := make(chan string)
        jobs = append(jobs, job)
        go testfun(i, job)
    }
    for _, result := range jobs {
        fmt.Println(<-result)
    }
}

func testfun(i int, job chan<- string) {
    var innerJobs []chan int
    time.Sleep(time.Millisecond * time.Duration(int64(rand.Intn(10))))
    for j := 0; j < 10; j++ {
        innerJob := make(chan int)
        innerJobs = append(innerJobs, innerJob)
        go testfun2(j, innerJob)
    }
    tempStr := strconv.FormatInt(int64(i), 10) + " - "
    for _, result := range innerJobs {
        tempStr = tempStr + strconv.FormatInt(int64(<-result), 10)
    }
    job <- tempStr
}

func testfun2(j int, innerJob chan<- int) {
    time.Sleep(time.Millisecond * time.Duration(int64(rand.Intn(10))))
    innerJob <- j
}

希望这可以帮助你理解代码的修改。

英文:

To give you a better idea. The issue is that you're reading a single channel where the values that are pushed onto the channel are in an arbitrary order due to your time.Sleep calls. If you want to issue the time.Sleep calls concurrently to simulate concurrent long-running processes, what you'll want to do is make each goroutine write to a channel with the results.

This way you can iterate across an in-order list of the results channels blocking until the next channel can be read from (the same idea as the output queue in this answer https://stackoverflow.com/questions/3227042/maintaining-order-in-a-multi-threaded-pipeline#answer-3228846) Here's your reworked code with some name changes to make things easier to track:

package main

import(
    &quot;fmt&quot;
    &quot;math/rand&quot;
    &quot;time&quot;
    &quot;strconv&quot;
)

func main(){
    var jobs []chan string
    for i := 0; i&lt;10; i++{
        job := make(chan string)
        jobs = append(jobs, job)
        go testfun(i, job)
    }
    for _, result := range jobs {
      fmt.Println(&lt;-result)
    }
}

func testfun(i int, job chan&lt;- string){
    var innerJobs []chan int
    time.Sleep(time.Millisecond*time.Duration(int64(rand.Intn(10))))
    for j := 0; j&lt;10; j++ {
        innerJob := make(chan int)
        innerJobs = append(innerJobs, innerJob)
        go testfun2(j, innerJob)
    }
    tempStr := strconv.FormatInt(int64(i),10)+&quot; - &quot;
    for _, result := range innerJobs {
      tempStr = tempStr + strconv.FormatInt(int64(&lt;-result),10)
    }
    job &lt;- tempStr
}

func testfun2(j int, innerJob chan&lt;- int){
    time.Sleep(time.Millisecond*time.Duration(int64(rand.Intn(10))))
    innerJob &lt;- j
}

答案2

得分: 3

一个不同/更高效的方法是使用切片(或数组)并使用sync.WaitGroup

func main() {
    var wg sync.WaitGroup
    out := make([]string, 10)
    for i := 0; i < len(out); i++ {
        wg.Add(1)
        go testfun(i, &out[i], &wg)
    }
    wg.Wait()
    for i := 0; i < len(out); i++ {
        a := out[i]
        fmt.Println(a)
    }
}

func testfun(i int, outVal *string, wg *sync.WaitGroup) {
    //........
    *outVal = tempStr
    wg.Done()
}

playground

编辑:同样更新了testfun2的示例,之前忘记了。

英文:

A different / more efficient approach is using a slice (or an array) and using a sync.WaitGroup:

func main() {
	var wg sync.WaitGroup
	out := make([]string, 10)
	for i := 0; i &lt; len(out); i++ {
		wg.Add(1)
		go testfun(i, &amp;out[i], &amp;wg)
	}
	wg.Wait()
	for i := 0; i &lt; len(out); i++ {
		a := out[i]
		fmt.Println(a)
	}
}

func testfun(i int, outVal *string, wg *sync.WaitGroup) {
    //........
	*outVal = tempStr
	wg.Done()
}

<kbd>playground</kbd>

edit: updated the example for testfun2 as well, forgot about that.

huangapple
  • 本文由 发表于 2015年9月30日 08:40:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/32855960.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定