使用goroutine处理值并将结果收集到切片中

huangapple go评论85阅读模式
英文:

Using goroutines to process values and gather results into a slice

问题

我最近在探索Go语言和goroutine,但是我对goroutine的工作原理感到困惑。

我尝试将之前编写的代码转换为使用goroutine的Go代码,但是遇到了fatal error: all goroutines are asleep - deadlock!的错误。

我想要做的是使用goroutine处理列表中的项目,然后将处理后的值收集到一个新的列表中。但是在"收集"部分遇到了问题。

代码:

sampleChan := make(chan sample)
var wg sync.WaitGroup

// 从contents列表中读取
for i, line := range contents {
	wg.Add(1)
    // 使用goroutine处理每个项目,并将输出发送到sampleChan
	go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
wg.Wait()

// 从sampleChan中读取并放入切片中
var sampleList []sample
for s := range sampleChan {
	sampleList = append(sampleList, s)
}
close(sampleChan)

从goroutine中收集结果的正确方法是什么?

我知道切片不是线程安全的,所以我不能让每个goroutine都向切片中追加数据。

英文:

I'm recently exploring Go and how goroutines work confuse me.

I tried to port code I had written before into Go using goroutines but got a fatal error: all goroutines are asleep - deadlock! error.

What I'm trying to do is use goroutines to process items in a list, then gather the processed values into a new list. But I'm having problems in the "gathering" part.

Code:

sampleChan := make(chan sample)
var wg sync.WaitGroup

// Read from contents list
for i, line := range contents {
	wg.Add(1)
    // Process each item with a goroutine and send output to sampleChan
	go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
wg.Wait()

// Read from sampleChan and put into a slice
var sampleList []sample
for s := range sampleChan {
	sampleList = append(sampleList, s)
}
close(sampleChan)

What's the right way to gather results from goroutines?

I know slices are not threadsafe so I can't have each goroutine just append to the slice.

答案1

得分: 20

你的代码几乎正确。有几个问题:首先,你在收集结果之前等待所有的工作线程完成;其次,你的for循环在通道关闭后终止,但通道只有在for循环终止后才关闭。

你可以通过在工作线程完成后异步关闭通道来修复代码:

for i, line := range contents {
    wg.Add(1)
    // 使用goroutine处理每个项目,并将输出发送到sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}

go func() {
    wg.Wait()
    close(sampleChan)
}()

for s := range sampleChan {
  ..
}

作为一种风格建议(遵循https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions),最好将newSample设计为一个简单的同步函数,不需要传入等待组和通道,只需生成其结果。然后,工作线程的代码将如下所示:

for i, line := range contents {
    wg.Add(1)
    go func(line string) {
        defer wg.Done()
        sampleChan <- newSample(line, *replicatePtr, *timePtr)
    }(line)
}

这样做可以将并发原语放在一起,除了简化newSample并使其更容易测试外,还可以让你了解并发的情况,并直观地检查wg.Done()是否始终被调用。如果你想重构代码,例如使用固定数量的工作线程,那么你的更改将局限在一个地方。

英文:

Your code is almost correct. There's a couple of problems: first, you're waiting for all the workers to finish before collecting the results, and second your for loop terminates when the channel is closed, but the channel is closed only after the for loop terminates.

You can fix the code by asynchronously closing the channel when the workers are finished:

for i, line := range contents {
    wg.Add(1)
    // Process each item with a goroutine and send output to sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan, &amp;wg)
}

go func() {
    wg.Wait()
    close(sampleChan)
}()

for s := range sampleChan {
  ..
}

As a note of style (and following https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions), it'd be preferable if newSample was a simple, synchronous function that didn't take the waitgroup and channel, and simply generated its result. Then the worker code would look like:

for i, line := range contents {
    wg.Add(1)
    go func(line string) {
        defer wg.Done()
        sampleChan &lt;- newSample(line, *replicatePtr, *timePtr)
    }(line)
}

This keeps your concurrency primitives all together, which apart from simplifiying newSample and making it easier to test, it allows you to see what's going on with the concurrency, and visually check that wg.Done() is always called. And if you want to refactor the code to for example use a fixed number of workers, then your changes will all be local.

答案2

得分: 7

有两个问题:

  1. 使用非缓冲通道:非缓冲通道会阻塞接收者,直到通道上有可用数据,并阻塞发送者,直到有接收者可用。这导致了错误。
  2. 在使用 range 前没有关闭通道:由于你从未关闭 ch 通道,range 循环将永远不会结束。

你需要使用一个"带缓冲的"通道,并在使用 range 前"关闭"通道。

代码如下:

package main

import (
	"fmt"
	"sync"
)

func double(line int, ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	ch <- line * 2
}

func main() {
	contents := []int{1, 2, 3, 4, 5}
	sampleChan := make(chan int, len(contents))
	var wg sync.WaitGroup

	// 从 contents 列表中读取数据
	for _, line := range contents {
		wg.Add(1)
		go double(line, sampleChan, &wg)
	}

	wg.Wait()
	close(sampleChan)

	// 从 sampleChan 中读取数据并放入切片中
	var sampleList []int
	for s := range sampleChan {
		sampleList = append(sampleList, s)
	}

	fmt.Println(sampleList)
}

修改后的代码如下:

package main

import (
	"fmt"
	"sync"
)

func doubleLines(lines []int, wg *sync.WaitGroup, sampleChan chan int) {
	defer wg.Done()
	defer close(sampleChan)

	var w sync.WaitGroup
	for _, line := range lines {
		w.Add(1)
		go double(&w, line, sampleChan)
	}
	w.Wait()
}

func double(wg *sync.WaitGroup, line int, ch chan int) {
	defer wg.Done()
	ch <- line * 2
}

func collectResult(wg *sync.WaitGroup, channel chan int, sampleList *[]int) {
	defer wg.Done()
	for s := range channel {
		*sampleList = append(*sampleList, s)
	}
}

func main() {
	contents := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
	sampleChan := make(chan int, 1)
	var sampleList []int

	var wg sync.WaitGroup

	wg.Add(1)
	go doubleLines(contents, &wg, sampleChan)

	wg.Add(1)
	go collectResult(&wg, sampleChan, &sampleList)

	wg.Wait()
	fmt.Println(sampleList)
}

你可以在以下链接中查看代码:https://play.golang.org/p/VAe7Qll3iVM

英文:

There are two problems

  1. Using unbuffered channels: Unbuffered channels block receivers until data is available on the channel and senders until a receiver is available.That caused the error
  2. Not closing the channel before range: As you never close the ch channel, the range loop will never finish.

You have to use a buffered channel and close the channel before range

Code

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
func double(line int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
ch &lt;- line * 2
}
func main() {
contents := []int{1, 2, 3, 4, 5}
sampleChan := make(chan int,len(contents))
var wg sync.WaitGroup
// Read from contents list
for _, line := range contents {
wg.Add(1)
go double(line, sampleChan, &amp;wg)
}
wg.Wait()
close(sampleChan)
// Read from sampleChan and put into a slice
var sampleList []int
for s := range sampleChan {
sampleList = append(sampleList, s)
}
fmt.Println(sampleList)
}

Play link : https://play.golang.org/p/k03vt3hd3P

EDIT:
Another approach for better performance would be to run producer and consumer at concurrently

Modified code

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
func doubleLines(lines []int, wg *sync.WaitGroup, sampleChan chan int) {
defer wg.Done()
defer close(sampleChan)
var w sync.WaitGroup
for _, line := range lines {
w.Add(1)
go double(&amp;w, line, sampleChan)
}
w.Wait()
}
func double(wg *sync.WaitGroup, line int, ch chan int) {
defer wg.Done()
ch &lt;- line * 2
}
func collectResult(wg *sync.WaitGroup, channel chan int, sampleList *[]int) {
defer wg.Done()
for s := range channel {
*sampleList = append(*sampleList, s)
}
}
func main() {
contents := []int{0,1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
sampleChan := make(chan int, 1)
var sampleList []int
var wg sync.WaitGroup
wg.Add(1)
go doubleLines(contents, &amp;wg, sampleChan)
wg.Add(1)
go collectResult(&amp;wg, sampleChan, &amp;sampleList)
wg.Wait()
fmt.Println(sampleList)
}

play link: https://play.golang.org/p/VAe7Qll3iVM

huangapple
  • 本文由 发表于 2017年9月2日 13:41:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/46010836.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定