Why am I getting a deadlock when a function call that populates a channel is not embedded in a goroutine?

huangapple go评论123阅读模式
英文:

Why am I getting a deadlock when a function call that populates a channel is not embedded in a goroutine?

问题

我知道sync包及其waitgroup选项,但我不想在这个测试中使用它。我正在测试一种信号量。

所以我有以下代码:

package main

import (
	"fmt"
	"os"
	"time"
)

func main() {

	fmt.Print("wassap")

	jobs := make(chan int)
	processStarted := make(chan struct{}, 1)
	processCompleted := make(chan struct{}, 1)

	createJobs(jobs)

	go func() {
		worker(jobs, processStarted, processCompleted)
	}()

	go func() {
		sync(processStarted, processCompleted)
	}()

	time.Sleep(3600 * time.Second)
	fmt.Print("\nend of main...")

	interrupt := make(chan os.Signal)
	<-interrupt

}

func createJobs(jobs chan<- int) {
	defer close(jobs)
	for i := 1; i < 20; i++ {
		jobs <- i
	}
}

func worker(jobs <-chan int, processStarted <-chan struct{}, processCompleted <-chan struct{}) {

	for {
		select {
		case i := <-jobs:
			fmt.Printf("\nFetching job #%d from channel", i)
			time.Sleep(2 * time.Second)
		case <-processStarted:
			fmt.Print("\nProcess Started. Waiting for it to be completed")
			<-processCompleted
			fmt.Print("\nProcess completed")
		}

	}
}

func sync(processStarted chan<- struct{}, processCompleted chan<- struct{}) {

	// acquire semaphore. Send signal to channel to indicate that it is busy
	processStarted <- struct{}{}

	for i := 1; i < 5; i++ {
		fmt.Printf("\nprocessing %d", i)
		time.Sleep(5 * time.Second)
	}

	// release semaphore
	processCompleted <- struct{}{}
}

我试图测试的内容非常简单:我有一个createJobs函数,其唯一目的是向通道中添加元素,这里是一个int通道。然后我有一个worker函数,它将从该通道中取出对象,并在取下一个元素之前休眠2秒。

现在,还有一个sync函数。这个函数的唯一目的是模拟在worker运行时启动的进程。如果这个进程是活动的,那么在sync结束时,jobs元素的处理应该停止,这就是为什么我有两个通道的原因,一个用于指示进程已启动,另一个用于指示进程已结束。

当运行我的代码时,我得到以下错误:

fatal error: all goroutines are asleep - deadlock!

如果我修改调用createJobs的方式,将其包装在一个goroutine中,像这样:

go func() {
	createJobs(jobs)
}()

那么我的代码就可以正常运行。

我只是想理解为什么会发生这种情况。我的意思是:main例程正在执行,然后它调用createJobs(没有包装),所以main例程应该被阻塞,直到此调用结束。一旦createJobs结束,就意味着通道中有元素。main继续执行并启动其他goroutine workersync来完成它们的工作。在main结束之前,我只是添加了一个睡眠器,以便给之前创建的goroutine足够的时间来完成。

我不是在寻求其他解决此问题的方法,我只想知道当createJobs在goroutine之外发生时发生了什么。

英文:

I'm aware of the sync package and its waitgroup options, I don't want to use it for this test. I'm testing a kind of semaphore.

So I've got:

package main
import (
&quot;fmt&quot;
&quot;os&quot;
&quot;time&quot;
)
func main() {
fmt.Print(&quot;wassap&quot;)
jobs := make(chan int)
processStarted := make(chan struct{}, 1)
processCompleted := make(chan struct{}, 1)
createJobs(jobs)
go func() {
worker(jobs, processStarted, processCompleted)
}()
go func() {
sync(processStarted, processCompleted)
}()
time.Sleep(3600 * time.Second)
fmt.Print(&quot;\nend of main...&quot;)
interrupt := make(chan os.Signal)
&lt;-interrupt
}
func createJobs(jobs chan&lt;- int) {
defer close(jobs)
for i := 1; i &lt; 20; i++ {
jobs &lt;- i
}
}
func worker(jobs &lt;-chan int, processStarted &lt;-chan struct{}, processCompleted &lt;-chan struct{}) {
for {
select {
case i := &lt;-jobs:
fmt.Printf(&quot;\nFetching job #%d from channel&quot;, i)
time.Sleep(2 * time.Second)
case &lt;-processStarted:
fmt.Print(&quot;\nProcess Started. Waiting for it to be completed&quot;)
&lt;-processCompleted
fmt.Print(&quot;\nProcess completed&quot;)
}
}
}
func sync(processStarted chan&lt;- struct{}, processCompleted chan&lt;- struct{}) {
// acquire semaphore. Send signal to channel to indicate that it is busy
processStarted &lt;- struct{}{}
for i := 1; i &lt; 5; i++ {
fmt.Printf(&quot;\nprocessing %d&quot;, i)
time.Sleep(5 * time.Second)
}
// release semaphore
processCompleted &lt;- struct{}{}
}

What I'm trying to test is fairly simple: I've got a createJobs function whose only purpose is to add elements to a channel, in this case an int channel. Then I've got a worker that is going to pull out objects from that channel and sleep for 2 seconds before pulling the next element.

Now, there's also a sync function. The sole purpose of this function is to simulate a process that was initiated while worker was running. If this process is active, then processing of jobs elements should be stopped while sync ends, reason why I've got two channels, one to indicate that the process started and one that the process ended.

When running my code I'm getting the following error:

> fatal error: all goroutines are asleep - deadlock!

If I modify the way createJobs is called by wrapping it out in a goroutine like this:

go func() {
createJobs(jobs)
}()

then my code runs correctly.

I just want to understand why this is happening. I mean: main routine is being executed, then it hits the call to createJobs (no wrap) so main routine is supposed to be blocked until this call ends. Once createJobs has ended, it means there are elements in the channel. main continues execution and spin up the other goroutines worker and sync to do their stuff. Before main ends, I'm simply adding a sleeper to give time to the previously created goroutines to finish.

I'm not asking to other solutions to this problem, I just want to know what's going on when createJobs occurs outside a goroutine.

答案1

得分: 3

你将jobs声明为一个无缓冲通道,然后尝试同步地向其中推送20个值。当你调用createJobs(jobs)时,这将阻塞你的主函数。

将第13行改为:

	jobs := make(chan int, 20)

...将解决死锁问题。


编辑 - 在评论中请求的澄清:

无缓冲通道没有容量,并且会阻塞生产者的执行,直到消费者接收到消息。

一个很好的无缓冲通道的类比是一个管道,在这个例子中,过程如下:

+------------------+     +------------+      +-------------+
| PRODUCER         |     | PIPE       |      | CONSUMER    |
|                  +---->|            +----->|             |
| createJobs(jobs) |     | 无缓冲通道 |      | worker(...) |
|                  |     |            |      |             |
+------------------+     +------------+      +-------------+

死锁发生的原因是createJobs(jobs)被同步调用,而此时还没有运行消费者。

当函数(PRODUCER)在goroutine中调用时,它是工作的,因为基本上通道的插入和读取是并行进行的吗?

是的。如果生产者是异步调用的,它不会阻塞main()函数,因此消费者也有机会被调用。在这种情况下,生产者会将所有任务一个接一个地推送到通道中,然后由工作线程一个接一个地消费它们。

英文:

You are declaring jobs as an unbuffered channel and then trying to push synchronously 20 values into it. This will block your main function when you call createJobs(jobs).

Changing line 13 to:

	jobs := make(chan int, 20)

...will solve the deadlock.


EDIT - clarifications requested in the comments:

Unbuffered channels have no capacity and block the execution of the producer until a consumer receives the message.

A pretty good analogy for an unbuffered channel is a pipe and in this example the process looks like this:

+------------------+     +------------+      +-------------+
| PRODUCER         |     | PIPE       |      | CONSUMER    |
|                  +----&gt;|            +-----&gt;|             |
| createJobs(jobs) |     | unbuffered |      | worker(...) |
|                  |     | channel    |      |             |
+------------------+     +------------+      +-------------+

The deadlock occurs because createJobs(jobs) is invoked synchronously, and there's no consumer yet running.
<br><br>

> It is working when the function(PRODUCER) is called within a goroutine because basically the insertion into the channel and the read from it are happening in parallel?

Yes. If the producer is invoked asynchronously, it won't block the main() function and therefore the consumer will get the chance to be invoked as well. In this case the producer will push all its tasks, one by one, as they are consumed by the worker, one by one.

答案2

得分: 0

之前的解决方案完全有效。但是你也可以使用以下代码块,如果你仍然想保持默认的通道大小:

	go func() {
		createJobs(jobs)
	}()
英文:

Previous solution totally working. But you can also use

	go func() {
createJobs(jobs)
}()

If you still want to keep default channel size.

huangapple
  • 本文由 发表于 2023年3月29日 06:28:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/75871589.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定