在使用WaitGroups和带缓冲通道的Go代码中,死锁的原因是什么?

huangapple go评论94阅读模式
英文:

What is the cause of the deadlock in my Go code using WaitGroups and Buffered Channels?

问题

WaitGroups、缓冲通道和死锁

我有一段代码导致了死锁,但我不确定原因。我尝试在几个不同的地方使用互斥锁,关闭通道在和外部的独立的go例程中,但结果仍然相同。

我试图通过一个通道(inputChan)发送数据,然后从另一个通道(outputChan)读取数据。

package main

import (
	"fmt"
	"sync"
)

func listStuff(wg *sync.WaitGroup, workerID int, inputChan chan int, outputChan chan int) {
	defer wg.Done()

	for i := range inputChan {
		fmt.Println("sending ", i)
		outputChan <- i
	}
}

func List(workers int) ([]int, error) {
	_output := make([]int, 0)

	inputChan := make(chan int, 1000)
	outputChan := make(chan int, 1000)

	var wg sync.WaitGroup
	wg.Add(workers)

	fmt.Printf("+++ Spinning up %v workers\n", workers)
	for i := 0; i < workers; i++ {
		go listStuff(&wg, i, inputChan, outputChan)
	}

	for i := 0; i < 3000; i++ {
		inputChan <- i
	}

	done := make(chan struct{})
	go func() {
		close(done)
		close(inputChan)
		close(outputChan)
		wg.Wait()
	}()

	for o := range outputChan {
		fmt.Println("reading from channel...")
		_output = append(_output, o)
	}

	<-done
	fmt.Printf("+++ output len: %v\n", len(_output))
	return _output, nil
}

func main() {
	List(5)
}

请帮我翻译以上代码。

英文:

WaitGroups, Buffered Channels, and Deadlocks

I have this bit of code which results in a deadlock and I'm not certain why. I have tried using mutex locking in a few different places, closing channels in and outside of separate go routines, but the result is still the same.

I'm trying to send data through one channel (inputChan), and then read it from another (outputChan)

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

func listStuff(wg *sync.WaitGroup, workerID int, inputChan chan int, outputChan chan int) {
	defer wg.Done()

	for i := range inputChan {
		fmt.Println(&quot;sending &quot;, i)
		outputChan &lt;- i
	}
}

func List(workers int) ([]int, error) {
	_output := make([]int, 0)

	inputChan := make(chan int, 1000)
	outputChan := make(chan int, 1000)

	var wg sync.WaitGroup
	wg.Add(workers)

	fmt.Printf(&quot;+++ Spinning up %v workers\n&quot;, workers)
	for i := 0; i &lt; workers; i++ {
		go listStuff(&amp;wg, i, inputChan, outputChan)
	}

	for i := 0; i &lt; 3000; i++ {
		inputChan &lt;- i
	}

	done := make(chan struct{})
	go func() {
		close(done)
		close(inputChan)
		close(outputChan)
		wg.Wait()
	}()

	for o := range outputChan {
		fmt.Println(&quot;reading from channel...&quot;)
		_output = append(_output, o)
	}

	&lt;-done
	fmt.Printf(&quot;+++ output len: %v\n&quot;, len(_output))
	return _output, nil
}

func main() {
	List(5)
}

答案1

得分: 1

在你的主函数中,代码是按顺序执行的,首先尝试将3k个值写入inputChan,然后再从outputChan读取值。

你的代码在第一步上出现了阻塞:

  • 在成功发送3k个值到inputChan之前,outputChan中没有任何值被消耗,因此工作线程在第一个1k值之后会被卡在outputChan <- i处。
  • 一旦工作线程停止从inputChan中消耗值,main函数将在约2k个值之后被卡在inputChan <- i处。

修复这个问题的一种方法是让生产者(inputChan <- i)和最终消费者(for o := range outputChan {)在不同的goroutine中运行。

你可以将这两个角色中的一个保留在主goroutine中,并为另一个角色创建一个新的goroutine。例如:

go func(inputChan chan<- int){
    for i := 0; i < 3000; i++ {
        inputChan <- i
    }
    close(inputChan)
}(inputChan)

done := make(chan struct{})
go func() {
    close(done)
    // close(inputChan) // 我选择在上面关闭了inputChan,不要重复关闭
    close(outputChan)
    wg.Wait()
}()

...

https://go.dev/play/p/doBgfkAbyaO

额外注意:在发出信号done周围的操作顺序很重要;只有在wg.Done()指示所有工作线程都完成之后,才应关闭通道doneoutputChan

    // 最好在控制输入完成的代码旁边关闭inputChan。
    close(inputChan)
    // 如果你有多个生产者写入同一个通道,你可能需要添加一个单独的等待组来处理关闭操作,就像你为工作线程所做的那样。

    go func() {
        wg.Wait()
        // 在工作线程完成之后,下面两个操作必须发生
        close(done)
        close(outputChan)
    }()
英文:

The code in your main function is sequential and first tries to write 3k values into inputChan then will read values from outputChan.

Your code blocks on the first of those steps:

  • nothing drains from outputChan before 3k values are succesfully sent to inputChan, so the workers end up stuck on outputChan &lt;- i after the first 1k value
  • once the workers stop draining from inputChan, main will get stuck on inputChan &lt;- i after ~2k values

One way to fix this can be to have the producer (inputChan &lt;- i) and the end consumer (for o := range outputChan {) run in separate goroutines.

You can keep one of these actors in the main goroutine, and spin a new one for the other. For example :

go func(inputChan chan&lt;- int){
    for i := 0; i &lt; 3000; i++ {
        inputChan &lt;- i
    }
    close(inputChan)
}(inputChan)

done := make(chan struct{})
go func() {
    close(done)
    // close(inputChan) // I chose to close inputChan above, don&#39;t close it twice
    close(outputChan)
    wg.Wait()
}()

...

https://go.dev/play/p/doBgfkAbyaO

one extra note: the order of actions around signaling done is important ; channels done and outputChan should only be closed after wg.Done() indicates that all workers are finished

    // it is best to close inputChan next to the code that controls
    // when its input is complete.
    close(inputChan)
    // If you had several producers writing to the same channel, you
    // would probably have to add a separate waitgroup to handle closing,
    // much like you did for your workers

    go func() {
        wg.Wait()
        // the two following actions must happen *after* workers have
        // completed
        close(done)
        close(outputChan)
    }()

答案2

得分: 0

package main

import (
	"fmt"
	"sync"
)

func listStuff(wg *sync.WaitGroup, workerID int, inputChan chan int, outputChan chan int) {
	defer wg.Done()

	for i := range inputChan {
		fmt.Println("发送", i)
		outputChan <- i
	}
}

func List(workers int) ([]int, error) {
	_output := make([]int, 0)

	inputChan := make(chan int, 1000)
	outputChan := make(chan int, 1000)

	var wg sync.WaitGroup
	wg.Add(workers)

	fmt.Printf("+++ 启动 %v 个工作协程\n", workers)
	for i := 0; i < workers; i++ {
		go listStuff(&wg, i, inputChan, outputChan)
	}

	done := make(chan struct{})

	// 边发送边接收
	go func() {
		for o := range outputChan {
			fmt.Println("从通道中读取...")
			_output = append(_output, o)
		}
		close(done)
	}()

	for i := 0; i < 3000; i++ {
		inputChan <- i
	}

	fmt.Println("**************")

	go func() {
		close(inputChan)
		wg.Wait()
		// 发送完成后关闭通道
		close(outputChan)
	}()

	<-done
	fmt.Printf("+++ 输出长度: %v\n", len(_output))
	return _output, nil
}

func main() {
	List(5)
}
英文:
package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

func listStuff(wg *sync.WaitGroup, workerID int, inputChan chan int, outputChan chan int) {
	defer wg.Done()

	for i := range inputChan {
		fmt.Println(&quot;sending &quot;, i)
		outputChan &lt;- i
	}
}

func List(workers int) ([]int, error) {
	_output := make([]int, 0)

	inputChan := make(chan int, 1000)
	outputChan := make(chan int, 1000)

	var wg sync.WaitGroup
	wg.Add(workers)

	fmt.Printf(&quot;+++ Spinning up %v workers\n&quot;, workers)
	for i := 0; i &lt; workers; i++ {
		go listStuff(&amp;wg, i, inputChan, outputChan)
	}

    done := make(chan struct{})

    //read as the same time as send
	go func() {
		for o := range outputChan {
			fmt.Println(&quot;reading from channel...&quot;)
			_output = append(_output, o)
		}
        close(done)
	}()

	for i := 0; i &lt; 3000; i++ {
		inputChan &lt;- i
	}

	fmt.Println(&quot;**************&quot;)

	go func() {

		close(inputChan)
		wg.Wait()
       //close after send finished
		close(outputChan)
	}()

	&lt;-done
	fmt.Printf(&quot;+++ output len: %v\n&quot;, len(_output))
	return _output, nil
}

func main() {
	List(5)
}

huangapple
  • 本文由 发表于 2023年6月2日 12:22:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/76387092.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定