英文:
Deadlock when trying to code a pool of worker methods
问题
在下面的代码中,我不明白为什么"Worker"方法似乎会退出,而不是从输入通道"in"中获取值并进行处理。
我原以为它们只会在消耗完输入通道"in"中的所有输入并进行处理后才会返回。
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
i int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
for item := range in {
item *= item // 返回输入值的平方
fmt.Printf("=> %d: %d\n", id, item)
out <- Result{item, id}
}
wg.Done()
fmt.Printf("%d exiting ", id)
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
wg := sync.WaitGroup{}
for id := 0; id < n_workers; id++ {
fmt.Printf("Starting : %d\n", id)
wg.Add(1)
go Worker(in, out, id, &wg)
}
wg.Wait() // 等待所有工作线程完成任务
close(out) // 当所有任务完成时关闭输出通道
}
const (
NW = 4
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
for i := 0; i < 100; i++ {
in <- i
}
close(in)
}()
Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out : %d: %d", item.i, item.val)
}
}
输出结果为
Starting : 0
Starting : 1
Starting : 2
Starting : 3
=> 3: 0
=> 0: 1
=> 1: 4
=> 2: 9
fatal error: all goroutines are asleep - deadlock!
英文:
In the code hereunder, I don't understand why the "Worker" methods seem to exit instead of pulling values from the input channel "in" and processing them.
I had assumed they would only return after having consumed all input from the input channel "in" and processing them
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
i int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
for item := range in {
item *= item // returns the square of the input value
fmt.Printf("=> %d: %d\n", id, item)
out <- Result{item, id}
}
wg.Done()
fmt.Printf("%d exiting ", id)
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
wg := sync.WaitGroup{}
for id := 0; id < n_workers; id++ {
fmt.Printf("Starting : %d\n", id)
wg.Add(1)
go Worker(in, out, id, &wg)
}
wg.Wait() // wait for all workers to complete their tasks
close(out) // close the output channel when all tasks are completed
}
const (
NW = 4
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
for i := 0; i < 100; i++ {
in <- i
}
close(in)
}()
Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out : %d: %d", item.i, item.val)
}
}
The output is
Starting : 0
Starting : 1
Starting : 2
Starting : 3
=> 3: 0
=> 0: 1
=> 1: 4
=> 2: 9
fatal error: all goroutines are asleep - deadlock!
答案1
得分: 3
致命错误:所有的goroutine都处于休眠状态 - 死锁!
完整的错误信息显示了每个goroutine所“卡住”的位置。如果你在playground中运行这个代码,它甚至会显示行号。这让我很容易诊断。
你的Run_parallel
在main
的goroutine中运行,所以在main
可以从out
中读取之前,Run_parallel
必须返回。在Run_parallel
可以返回之前,它必须执行wg.Wait()
。但在工作线程调用wg.Done()
之前,它们必须写入out
。这就导致了死锁。
一个简单的解决方案是:在自己的Goroutine中并发地运行Run_parallel
。
go Run_parallel(NW, in, out, Worker)
现在,main
遍历out
,等待out
的关闭信号表示完成。Run_parallel
使用wg.Wait()
等待工作线程,而工作线程将遍历in
。所有的工作都将完成,并且程序直到所有工作都完成后才会结束。(https://go.dev/play/p/oMrgH2U09tQ)
英文:
> fatal error: all goroutines are asleep - deadlock!
The full error shows where each goroutine is "stuck". If you run this in the playground, it will even show you the line number. That made it easy for me to diagnose.
Your Run_parallel
runs in the main
groutine, so before main
can read from out
, Run_parallel
must return. Before Run_parallel
can return, it must wg.Wait()
. But before the workers call wg.Done()
, they must write to out
. That's what causes a deadlock.
One solution is simple: just run Run_parallel
concurrently in its own Goroutine.
go Run_parallel(NW, in, out, Worker)
Now, main
ranges over out
, waiting on out
s closure to signal completion. Run_parallel
waits for the workers with wg.Wait()
, and the workers will range over in
. All the work will get done, and the program won't end until it's all done. (https://go.dev/play/p/oMrgH2U09tQ)
答案2
得分: -1
解决方案:
Run_parallel必须在自己的goroutine中运行:
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
id int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
item *= 2 // 返回输入值的两倍(对数据的虚假处理)
out <- Result{id, item}
}
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
wg := sync.WaitGroup{}
for id := 0; id < n_workers; id++ {
wg.Add(1)
go Worker(in, out, id, &wg)
}
wg.Wait() // 等待所有工作线程完成任务
close(out) // 当所有任务完成时关闭输出通道
}
const (
NW = 8
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
for i := 0; i < 10; i++ {
in <- i
}
close(in)
}()
go Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out [%d]: %d\n", item.id, item.val)
}
println("- - - All done - - -")
}
希望对你有帮助!
英文:
Solution :
Run_parallel has to run in it’s own goroutine:
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
id int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
item *= 2 // returns the double of the input value (Bogus handling of data)
out <- Result{id, item}
}
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
wg := sync.WaitGroup{}
for id := 0; id < n_workers; id++ {
wg.Add(1)
go Worker(in, out, id, &wg)
}
wg.Wait() // wait for all workers to complete their tasks
close(out) // close the output channel when all tasks are completed
}
const (
NW = 8
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
for i := 0; i < 10; i++ {
in <- i
}
close(in)
}()
go Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out [%d]: %d\n", item.id, item.val)
}
println("- - - All done - - -")
}
答案3
得分: -1
解决方案的另一种表述:
在这种替代的表述中,不需要将Run_parallel作为goroutine启动(它会触发自己的goroutine)。
我更喜欢第二种解决方案,因为它自动化了Run_parallel()必须与主函数并行运行的事实。出于同样的原因,它更安全,更不容易出错(不需要记住使用go关键字运行Run_parallel)。
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
id int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
item *= 2 // 返回输入值的两倍(对数据的虚假处理)
out <- Result{id, item}
}
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
go func() {
wg := sync.WaitGroup{}
defer close(out) // 在所有任务完成后关闭输出通道
for id := 0; id < n_workers; id++ {
wg.Add(1)
go Worker(in, out, id, &wg)
}
wg.Wait() // 等待所有工作线程完成任务,并触发延迟的close(out)
}()
}
const (
NW = 8
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
defer close(in)
for i := 0; i < 10; i++ {
in <- i
}
}()
Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out [%d]: %d\n", item.id, item.val)
}
println("- - - All done - - -")
}
英文:
Alternative formulation of the solution:
In that alternative formulation , it is not necessary to start Run_parallel as a goroutine (it triggers its own goroutine).
I prefer that second solution, because it automates the fact that Run_parallel() has to run parallel to the main function. Also, for the same reason it's safer, less error-prone (no need to remember to run Run_parallel with the go keyword).
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
id int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
item *= 2 // returns the double of the input value (Bogus handling of data)
out <- Result{id, item}
}
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
go func() {
wg := sync.WaitGroup{}
defer close(out) // close the output channel when all tasks are completed
for id := 0; id < n_workers; id++ {
wg.Add(1)
go Worker(in, out, id, &wg)
}
wg.Wait() // wait for all workers to complete their tasks *and* trigger the -differed- close(out)
}()
}
const (
NW = 8
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
defer close(in)
for i := 0; i < 10; i++ {
in <- i
}
}()
Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out [%d]: %d\n", item.id, item.val)
}
println("- - - All done - - -")
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论