英文:
Deadlock using channels as queues
问题
我正在学习Go语言,并尝试实现一个作业队列。
我想要做的是:
主goroutine通过一个通道将行传递给多个解析器工作线程(将行解析为结构体),每个解析器将结构体发送到另一个工作线程(goroutine)的结构体通道中进行处理(发送到数据库等)。
代码如下:
lineParseQ := make(chan string, 5)
jobProcessQ := make(chan myStruct, 5)
doneQ := make(chan myStruct, 5)
fileName := "myfile.csv"
file, err := os.Open(fileName)
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader := bufio.NewReader(file)
// 启动行解析工作线程并发送到jobProcessQ
for i := 1; i <= 2; i++ {
go lineToStructWorker(i, lineParseQ, jobProcessQ)
}
// 处理来自jobProcessQ的myStruct
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
lineCount := 0
countSend := 0
for {
line, err := reader.ReadString('\n')
if err != nil && err != io.EOF {
log.Fatal(err)
}
if err == io.EOF {
break
}
lineCount++
if lineCount > 1 {
countSend++
lineParseQ <- line[:len(line)-1] // 避免最后一个字符'\n'
}
}
for i := 0; i < countSend; i++ {
fmt.Printf("Received %+v.\n", <-doneQ)
}
close(doneQ)
close(jobProcessQ)
close(lineParseQ)
这是一个简化的示例:https://play.golang.org/p/yz84g6CJraa
工作线程的代码如下:
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {
for j := range lineQ {
strQ <- lineToStruct(j) // 只是将CSV解析为结构体...
}
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // 模拟长时间操作...
done <- a
}
}
我知道问题与"done"通道有关,因为如果我不使用它,就不会出错,但我无法弄清楚如何修复它。
英文:
I'm learning Go and I am trying to implement a job queue.
What I'm trying to do is:
Have the main goroutine feed lines through a channel for multiple parser workers (that parse a line to s struct), and have each parser send the struct to a channel of structs that other workers (goroutines) will process (send to database, etc).
The code looks like this:
lineParseQ := make(chan string, 5)
jobProcessQ := make(chan myStruct, 5)
doneQ := make(chan myStruct, 5)
fileName := "myfile.csv"
file, err := os.Open(fileName)
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader := bufio.NewReader(file)
// Start line parsing workers and send to jobProcessQ
for i := 1; i <= 2; i++ {
go lineToStructWorker(i, lineParseQ, jobProcessQ)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
lineCount := 0
countSend := 0
for {
line, err := reader.ReadString('\n')
if err != nil && err != io.EOF {
log.Fatal(err)
}
if err == io.EOF {
break
}
lineCount++
if lineCount > 1 {
countSend++
lineParseQ <- line[:len(line)-1] // Avoid last char '\n'
}
}
for i := 0; i < countSend; i++ {
fmt.Printf("Received %+v.\n", <-doneQ)
}
close(doneQ)
close(jobProcessQ)
close(lineParseQ)
Here's a simplified playground: https://play.golang.org/p/yz84g6CJraa
the workers look like this:
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
}
I know the problem is related to the "done" channel because if I don't use it, there's no error, but I can't figure out how to fix it.
答案1
得分: 3
你不会从doneQ
开始阅读,直到你完成将所有行发送到lineParseQ
,这比缓冲区空间中的行数更多。因此,一旦doneQ
缓冲区已满,发送操作将被阻塞,开始填充lineParseQ
缓冲区,一旦它也满了,就会发生死锁。将发送到lineParseQ
的循环、从doneQ
读取的循环或两者都移动到单独的goroutine中,例如:
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
这仍然会在最后发生死锁,因为你在同一个goroutine中对通道进行了range
操作,并在其后进行了close
操作;由于range
会一直持续到通道关闭,而close
在range
完成之后进行,所以仍然会发生死锁。你需要将close
放在适当的位置;即在发送例程中,或者在监视给定通道的多个发送者的WaitGroup
上阻塞。
// 启动行解析工作器并发送到jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++ {
wg.Add(1)
go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)
}
// 从jobProcessQ处理myStruct
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
countSend := 0
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
go func() {
wg.Wait()
close(jobProcessQ)
}()
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
// ...
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
wg.Done()
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
close(done)
}
完整的工作示例在这里:https://play.golang.org/p/XsnewSZeb2X
英文:
You don't start reading from doneQ
until you've finished sending all the lines to lineParseQ
, which is more lines than there is buffer space. So once the doneQ
buffer is full, that send blocks, which starts filling the lineParseQ
buffer, and once that's full, it deadlocks. Move either the loop sending to lineParseQ
, the loop reading from doneQ
, or both, to separate goroutine(s), e.g.:
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
This will still deadlock at the end, because you've got a range
over a channel and the close
after it in the same goroutine; since range
continues until the channel is closed, and the close comes after the range
finishes, you still have a deadlock. You need to put the closes in appropriate places; that being, either in the sending routine, or blocked on a WaitGroup
monitoring the sending routines if there are multiple senders for a given channel.
// Start line parsing workers and send to jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++ {
wg.Add(1)
go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
countSend := 0
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
go func() {
wg.Wait()
close(jobProcessQ)
}()
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
// ...
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
wg.Done()
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
close(done)
}
Full working example here: https://play.golang.org/p/XsnewSZeb2X
答案2
得分: 2
协调使用sync.WaitGroup
将管道分解为多个阶段。当你知道管道的某个部分已经完成(并且没有人向特定的通道写入数据)时,关闭该通道以指示所有“工作者”退出,例如:
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
i := i
wg.Add(1)
go func() {
Worker(i)
wg.Done()
}()
}
// wg.Wait() 表示上述操作已完成
缓冲通道在处理突发工作负载时很方便,但有时它们被用来避免设计不良导致的死锁。如果你想避免在goroutine中运行管道的某些部分,可以缓冲一些通道(通常与工作者数量匹配),以避免主goroutine中的阻塞。
如果你有依赖关系的部分需要读取和写入,并且想避免死锁,请确保它们位于单独的goroutine中。将管道的所有部分都放在自己的goroutine中甚至可以消除对缓冲通道的需求:
// 将所有通道操作放入单独的goroutine中
// 可以消除对缓冲通道的需求
lineParseQ := make(chan string, 0)
jobProcessQ := make(chan myStruct, 0)
doneQ := make(chan myStruct, 0)
当然,这是一种权衡 - 一个goroutine大约需要2K的资源,而缓冲通道则要少得多。与大多数设计一样,它取决于如何使用。
此外,不要被臭名昭著的Go for循环陷阱所困扰,使用闭包赋值来避免这个问题:
for i := 1; i <= 5; i++ {
i := i // 新的i(不是上面的i)
go func() {
myfunc(i) // 否则所有的goroutine很可能都会得到'5'
}()
}
最后,在退出之前确保等待所有结果被处理。一个常见的错误是从基于通道的函数中返回并认为所有结果都已经被处理。在服务中,这最终是正确的。但在独立的可执行文件中,处理循环可能仍在处理结果。
go func() {
wgW.Wait() // 等待工作者goroutine完成
close(doneQ) // 现在可以安全地关闭结果通道
}()
// 确保在所有结果被处理之前不返回
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
通过在主goroutine中处理结果,我们确保不会过早地返回而没有处理完所有内容。
将所有内容整合在一起:
https://play.golang.org/p/MjLpQ5xglP3
英文:
Coordinate the pipeline with sync.WaitGroup
breaking each piece into stages. When you know one piece of the pipeline is complete (and no one is writing to a particular channel), close the channel to instruct all "workers" to exit e.g.
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
i := i
wg.Add(1)
go func() {
Worker(i)
wg.Done()
}()
}
// wg.Wait() signals the above have completed
Buffered channels are handy to handle burst workloads, but sometimes they are used to avoid deadlocks in poor designs. If you want to avoid running certain parts of your pipeline in a goroutine you can buffer some channels (matching the number of workers typically) to avoid a blockage in your main goroutine.
If you have dependent pieces that read & write and want to avoid deadlock - ensure they are in separate goroutines. Having all parts of the pipeline it its own goroutine will even remove the need for buffered channels:
// putting all channel work into separate goroutines
// removes the need for buffered channels
lineParseQ := make(chan string, 0)
jobProcessQ := make(chan myStruct, 0)
doneQ := make(chan myStruct, 0)
Its a tradeoff of course - a goroutine costs about 2K in resources - versus a buffered channel which is much less. As with most designs it depends on how it is used.
Also don't get caught by the notorious Go for-loop gotcha, so use a closure assignment to avoid this:
for i := 1; i <= 5; i++ {
i := i // new i (not the i above)
go func() {
myfunc(i) // otherwise all goroutines will most likely get '5'
}()
}
Finally ensure you wait for all results to be processed before exiting.
It's a common mistake to return from a channel based function and believe all results have been processed. In a service this will eventually be true. But in a standalone executable the processing loop may still be working on results.
go func() {
wgW.Wait() // waiting on worker goroutines to finish
close(doneQ) // safe to close results channel now
}()
// ensure we don't return until all results have been processed
for a := range doneQ {
fmt.Printf("Received %v.\n", a)
}
by processing the results in the main goroutine, we ensure we don't return prematurely without having processed everything.
Pulling it all together:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论