尝试编写一个工作方法池时出现死锁问题。

huangapple go评论80阅读模式
英文:

Deadlock when trying to code a pool of worker methods

问题

在下面的代码中,我不明白为什么"Worker"方法似乎会退出,而不是从输入通道"in"中获取值并进行处理。

我原以为它们只会在消耗完输入通道"in"中的所有输入并进行处理后才会返回。

package main

import (
	"fmt"
	"sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
	i   int
	val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
	for item := range in {
		item *= item // 返回输入值的平方
		fmt.Printf("=> %d: %d\n", id, item)
		out <- Result{item, id}
	}
	wg.Done()
	fmt.Printf("%d exiting ", id)
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
	wg := sync.WaitGroup{}
	for id := 0; id < n_workers; id++ {
		fmt.Printf("Starting : %d\n", id)
		wg.Add(1)
		go Worker(in, out, id, &wg)
	}
	wg.Wait()  // 等待所有工作线程完成任务
	close(out) // 当所有任务完成时关闭输出通道
}

const (
	NW = 4
)

func main() {
	in := make(chan int)
	out := make(chan Result)

	go func() {
		for i := 0; i < 100; i++ {
			in <- i
		}
		close(in)
	}()
	Run_parallel(NW, in, out, Worker)

	for item := range out {
		fmt.Printf("From out : %d: %d", item.i, item.val)
	}
}

输出结果为

Starting : 0
Starting : 1
Starting : 2
Starting : 3
=> 3: 0
=> 0: 1
=> 1: 4
=> 2: 9
fatal error: all goroutines are asleep - deadlock!
英文:

In the code hereunder, I don't understand why the "Worker" methods seem to exit instead of pulling values from the input channel "in" and processing them.

I had assumed they would only return after having consumed all input from the input channel "in" and processing them

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
i   int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
for item := range in {
item *= item // returns the square of the input value
fmt.Printf(&quot;=&gt; %d: %d\n&quot;, id, item)
out &lt;- Result{item, id}
}
wg.Done()
fmt.Printf(&quot;%d exiting &quot;, id)
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
wg := sync.WaitGroup{}
for id := 0; id &lt; n_workers; id++ {
fmt.Printf(&quot;Starting : %d\n&quot;, id)
wg.Add(1)
go Worker(in, out, id, &amp;wg)
}
wg.Wait()  // wait for all workers to complete their tasks
close(out) // close the output channel when all tasks are completed
}
const (
NW = 4
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
for i := 0; i &lt; 100; i++ {
in &lt;- i
}
close(in)
}()
Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf(&quot;From out : %d: %d&quot;, item.i, item.val)
}
}

The output is

Starting : 0
Starting : 1
Starting : 2
Starting : 3
=&gt; 3: 0
=&gt; 0: 1
=&gt; 1: 4
=&gt; 2: 9
fatal error: all goroutines are asleep - deadlock!

答案1

得分: 3

致命错误:所有的goroutine都处于休眠状态 - 死锁!

完整的错误信息显示了每个goroutine所“卡住”的位置。如果你在playground中运行这个代码,它甚至会显示行号。这让我很容易诊断。

你的Run_parallelmain的goroutine中运行,所以在main可以从out中读取之前,Run_parallel必须返回。在Run_parallel可以返回之前,它必须执行wg.Wait()。但在工作线程调用wg.Done()之前,它们必须写入out。这就导致了死锁。

一个简单的解决方案是:在自己的Goroutine中并发地运行Run_parallel

	go Run_parallel(NW, in, out, Worker)

现在,main遍历out,等待out的关闭信号表示完成。Run_parallel使用wg.Wait()等待工作线程,而工作线程将遍历in。所有的工作都将完成,并且程序直到所有工作都完成后才会结束。(https://go.dev/play/p/oMrgH2U09tQ)

英文:

> fatal error: all goroutines are asleep - deadlock!

The full error shows where each goroutine is "stuck". If you run this in the playground, it will even show you the line number. That made it easy for me to diagnose.

Your Run_parallel runs in the main groutine, so before main can read from out, Run_parallel must return. Before Run_parallel can return, it must wg.Wait(). But before the workers call wg.Done(), they must write to out. That's what causes a deadlock.

One solution is simple: just run Run_parallel concurrently in its own Goroutine.

	go Run_parallel(NW, in, out, Worker)

Now, main ranges over out, waiting on outs closure to signal completion. Run_parallel waits for the workers with wg.Wait(), and the workers will range over in. All the work will get done, and the program won't end until it's all done. (https://go.dev/play/p/oMrgH2U09tQ)

答案2

得分: -1

解决方案:

Run_parallel必须在自己的goroutine中运行:

package main

import (
	"fmt"
	"sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
	id  int
	val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
	defer wg.Done()
	for item := range in {
		item *= 2 // 返回输入值的两倍(对数据的虚假处理)
		out <- Result{id, item}
	}
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
	wg := sync.WaitGroup{}
	for id := 0; id < n_workers; id++ {
		wg.Add(1)
		go Worker(in, out, id, &wg)
	}
	wg.Wait()  // 等待所有工作线程完成任务
	close(out) // 当所有任务完成时关闭输出通道
}

const (
	NW = 8
)

func main() {

	in := make(chan int)
	out := make(chan Result)

	go func() {
		for i := 0; i < 10; i++ {
			in <- i
		}
		close(in)
	}()

	go Run_parallel(NW, in, out, Worker)

	for item := range out {
		fmt.Printf("From out [%d]: %d\n", item.id, item.val)
	}

	println("- - - All done - - -")

}

希望对你有帮助!

英文:

Solution :

Run_parallel has to run in it’s own goroutine:

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
id  int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
item *= 2 // returns the double of the input value (Bogus handling of data)
out &lt;- Result{id, item}
}
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
wg := sync.WaitGroup{}
for id := 0; id &lt; n_workers; id++ {
wg.Add(1)
go Worker(in, out, id, &amp;wg)
}
wg.Wait()  // wait for all workers to complete their tasks
close(out) // close the output channel when all tasks are completed
}
const (
NW = 8
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
for i := 0; i &lt; 10; i++ {
in &lt;- i
}
close(in)
}()
go Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf(&quot;From out [%d]: %d\n&quot;, item.id, item.val)
}
println(&quot;- - - All done - - -&quot;)
}

答案3

得分: -1

解决方案的另一种表述:

在这种替代的表述中,不需要将Run_parallel作为goroutine启动(它会触发自己的goroutine)。
我更喜欢第二种解决方案,因为它自动化了Run_parallel()必须与主函数并行运行的事实。出于同样的原因,它更安全,更不容易出错(不需要记住使用go关键字运行Run_parallel)。

package main

import (
	"fmt"
	"sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
	id  int
	val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
	defer wg.Done()
	for item := range in {
		item *= 2 // 返回输入值的两倍(对数据的虚假处理)
		out <- Result{id, item}
	}
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
	go func() {
		wg := sync.WaitGroup{}
		defer close(out) // 在所有任务完成后关闭输出通道
		for id := 0; id < n_workers; id++ {
			wg.Add(1)
			go Worker(in, out, id, &wg)
		}
		wg.Wait() // 等待所有工作线程完成任务,并触发延迟的close(out)
	}()
}

const (
	NW = 8
)

func main() {

	in := make(chan int)
	out := make(chan Result)

	go func() {
		defer close(in)
		for i := 0; i < 10; i++ {
			in <- i
		}
	}()

	Run_parallel(NW, in, out, Worker)

	for item := range out {
		fmt.Printf("From out [%d]: %d\n", item.id, item.val)
	}

	println("- - - All done - - -")
}
英文:

Alternative formulation of the solution:

In that alternative formulation , it is not necessary to start Run_parallel as a goroutine (it triggers its own goroutine).
I prefer that second solution, because it automates the fact that Run_parallel() has to run parallel to the main function. Also, for the same reason it's safer, less error-prone (no need to remember to run Run_parallel with the go keyword).

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
id  int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
item *= 2 // returns the double of the input value (Bogus handling of data)
out &lt;- Result{id, item}
}
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
go func() {
wg := sync.WaitGroup{}
defer close(out) // close the output channel when all tasks are completed
for id := 0; id &lt; n_workers; id++ {
wg.Add(1)
go Worker(in, out, id, &amp;wg)
}
wg.Wait() // wait for all workers to complete their tasks *and* trigger the -differed- close(out)
}()
}
const (
NW = 8
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
defer close(in)
for i := 0; i &lt; 10; i++ {
in &lt;- i
}
}()
Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf(&quot;From out [%d]: %d\n&quot;, item.id, item.val)
}
println(&quot;- - - All done - - -&quot;)
}

huangapple
  • 本文由 发表于 2021年12月2日 22:54:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/70201670.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定