英文:
Using goroutines to process values and gather results into a slice
问题
我最近在探索Go语言和goroutine,但是我对goroutine的工作原理感到困惑。
我尝试将之前编写的代码转换为使用goroutine的Go代码,但是遇到了fatal error: all goroutines are asleep - deadlock!
的错误。
我想要做的是使用goroutine处理列表中的项目,然后将处理后的值收集到一个新的列表中。但是在"收集"部分遇到了问题。
代码:
sampleChan := make(chan sample)
var wg sync.WaitGroup
// 从contents列表中读取
for i, line := range contents {
wg.Add(1)
// 使用goroutine处理每个项目,并将输出发送到sampleChan
go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
wg.Wait()
// 从sampleChan中读取并放入切片中
var sampleList []sample
for s := range sampleChan {
sampleList = append(sampleList, s)
}
close(sampleChan)
从goroutine中收集结果的正确方法是什么?
我知道切片不是线程安全的,所以我不能让每个goroutine都向切片中追加数据。
英文:
I'm recently exploring Go and how goroutines work confuse me.
I tried to port code I had written before into Go using goroutines but got a fatal error: all goroutines are asleep - deadlock!
error.
What I'm trying to do is use goroutines to process items in a list, then gather the processed values into a new list. But I'm having problems in the "gathering" part.
Code:
sampleChan := make(chan sample)
var wg sync.WaitGroup
// Read from contents list
for i, line := range contents {
wg.Add(1)
// Process each item with a goroutine and send output to sampleChan
go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
wg.Wait()
// Read from sampleChan and put into a slice
var sampleList []sample
for s := range sampleChan {
sampleList = append(sampleList, s)
}
close(sampleChan)
What's the right way to gather results from goroutines?
I know slices are not threadsafe so I can't have each goroutine just append to the slice.
答案1
得分: 20
你的代码几乎正确。有几个问题:首先,你在收集结果之前等待所有的工作线程完成;其次,你的for
循环在通道关闭后终止,但通道只有在for
循环终止后才关闭。
你可以通过在工作线程完成后异步关闭通道来修复代码:
for i, line := range contents {
wg.Add(1)
// 使用goroutine处理每个项目,并将输出发送到sampleChan
go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
go func() {
wg.Wait()
close(sampleChan)
}()
for s := range sampleChan {
..
}
作为一种风格建议(遵循https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions),最好将newSample
设计为一个简单的同步函数,不需要传入等待组和通道,只需生成其结果。然后,工作线程的代码将如下所示:
for i, line := range contents {
wg.Add(1)
go func(line string) {
defer wg.Done()
sampleChan <- newSample(line, *replicatePtr, *timePtr)
}(line)
}
这样做可以将并发原语放在一起,除了简化newSample
并使其更容易测试外,还可以让你了解并发的情况,并直观地检查wg.Done()
是否始终被调用。如果你想重构代码,例如使用固定数量的工作线程,那么你的更改将局限在一个地方。
英文:
Your code is almost correct. There's a couple of problems: first, you're waiting for all the workers to finish before collecting the results, and second your for
loop terminates when the channel is closed, but the channel is closed only after the for
loop terminates.
You can fix the code by asynchronously closing the channel when the workers are finished:
for i, line := range contents {
wg.Add(1)
// Process each item with a goroutine and send output to sampleChan
go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
go func() {
wg.Wait()
close(sampleChan)
}()
for s := range sampleChan {
..
}
As a note of style (and following https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions), it'd be preferable if newSample
was a simple, synchronous function that didn't take the waitgroup and channel, and simply generated its result. Then the worker code would look like:
for i, line := range contents {
wg.Add(1)
go func(line string) {
defer wg.Done()
sampleChan <- newSample(line, *replicatePtr, *timePtr)
}(line)
}
This keeps your concurrency primitives all together, which apart from simplifiying newSample
and making it easier to test, it allows you to see what's going on with the concurrency, and visually check that wg.Done()
is always called. And if you want to refactor the code to for example use a fixed number of workers, then your changes will all be local.
答案2
得分: 7
有两个问题:
- 使用非缓冲通道:非缓冲通道会阻塞接收者,直到通道上有可用数据,并阻塞发送者,直到有接收者可用。这导致了错误。
- 在使用 range 前没有关闭通道:由于你从未关闭 ch 通道,range 循环将永远不会结束。
你需要使用一个"带缓冲的"通道,并在使用 range 前"关闭"通道。
代码如下:
package main
import (
"fmt"
"sync"
)
func double(line int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- line * 2
}
func main() {
contents := []int{1, 2, 3, 4, 5}
sampleChan := make(chan int, len(contents))
var wg sync.WaitGroup
// 从 contents 列表中读取数据
for _, line := range contents {
wg.Add(1)
go double(line, sampleChan, &wg)
}
wg.Wait()
close(sampleChan)
// 从 sampleChan 中读取数据并放入切片中
var sampleList []int
for s := range sampleChan {
sampleList = append(sampleList, s)
}
fmt.Println(sampleList)
}
修改后的代码如下:
package main
import (
"fmt"
"sync"
)
func doubleLines(lines []int, wg *sync.WaitGroup, sampleChan chan int) {
defer wg.Done()
defer close(sampleChan)
var w sync.WaitGroup
for _, line := range lines {
w.Add(1)
go double(&w, line, sampleChan)
}
w.Wait()
}
func double(wg *sync.WaitGroup, line int, ch chan int) {
defer wg.Done()
ch <- line * 2
}
func collectResult(wg *sync.WaitGroup, channel chan int, sampleList *[]int) {
defer wg.Done()
for s := range channel {
*sampleList = append(*sampleList, s)
}
}
func main() {
contents := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
sampleChan := make(chan int, 1)
var sampleList []int
var wg sync.WaitGroup
wg.Add(1)
go doubleLines(contents, &wg, sampleChan)
wg.Add(1)
go collectResult(&wg, sampleChan, &sampleList)
wg.Wait()
fmt.Println(sampleList)
}
你可以在以下链接中查看代码:https://play.golang.org/p/VAe7Qll3iVM
英文:
There are two problems
- Using unbuffered channels: Unbuffered channels block receivers until data is available on the channel and senders until a receiver is available.That caused the error
- Not closing the channel before range: As you never close the ch channel, the range loop will never finish.
You have to use a buffered
channel and close
the channel before range
Code
package main
import (
"fmt"
"sync"
)
func double(line int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- line * 2
}
func main() {
contents := []int{1, 2, 3, 4, 5}
sampleChan := make(chan int,len(contents))
var wg sync.WaitGroup
// Read from contents list
for _, line := range contents {
wg.Add(1)
go double(line, sampleChan, &wg)
}
wg.Wait()
close(sampleChan)
// Read from sampleChan and put into a slice
var sampleList []int
for s := range sampleChan {
sampleList = append(sampleList, s)
}
fmt.Println(sampleList)
}
Play link : https://play.golang.org/p/k03vt3hd3P
EDIT:
Another approach for better performance would be to run producer
and consumer
at concurrently
Modified code
package main
import (
"fmt"
"sync"
)
func doubleLines(lines []int, wg *sync.WaitGroup, sampleChan chan int) {
defer wg.Done()
defer close(sampleChan)
var w sync.WaitGroup
for _, line := range lines {
w.Add(1)
go double(&w, line, sampleChan)
}
w.Wait()
}
func double(wg *sync.WaitGroup, line int, ch chan int) {
defer wg.Done()
ch <- line * 2
}
func collectResult(wg *sync.WaitGroup, channel chan int, sampleList *[]int) {
defer wg.Done()
for s := range channel {
*sampleList = append(*sampleList, s)
}
}
func main() {
contents := []int{0,1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
sampleChan := make(chan int, 1)
var sampleList []int
var wg sync.WaitGroup
wg.Add(1)
go doubleLines(contents, &wg, sampleChan)
wg.Add(1)
go collectResult(&wg, sampleChan, &sampleList)
wg.Wait()
fmt.Println(sampleList)
}
play link: https://play.golang.org/p/VAe7Qll3iVM
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论