英文:
Channel deadlock on workerpool
问题
我正在玩弄通道,通过创建一个包含1000个工作线程的工作池。目前我遇到了以下错误:
fatal error: all goroutines are asleep - deadlock!
这是我的代码:
package main
import "fmt"
import "time"
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results)
}
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
for i:=0;i<len(results);i++ {
<-results
}
}
为什么会发生这种情况?我对Go语言还不太熟悉,希望能理解这个问题。
英文:
I am playing around with channels by making a workerpool of a 1000 workers. Currently I am getting the following error:
fatal error: all goroutines are asleep - deadlock!
Here is my code:
package main
import "fmt"
import "time"
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results)
}
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
for i:=0;i<len(results);i++ {
<-results
}
}
Why is this happening? I am still new to go and I am hoping to make sense of this.
答案1
得分: 3
问题在于你的通道正在填满。main()
函数在读取任何结果之前,尝试将所有作业放入jobs
通道中。但是,在任何写入通道之前,results
通道只能容纳100个结果,否则写入通道的操作将被阻塞,因此所有的工作线程最终都会被阻塞,等待该通道中的空间——然而这个空间永远不会出现,因为main()
函数尚未开始从results
通道中读取数据。
要快速修复这个问题,你可以将jobs
的大小调整为足够大,以容纳所有的作业,这样main()
函数就可以继续进行读取阶段;或者你可以将results
的大小调整为足够大,以容纳所有的结果,这样工作线程就可以输出它们的结果而不会被阻塞。
一个更好的方法是创建另一个goroutine来填充jobs
队列,这样main()
函数就可以直接开始读取结果:
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results)
}
go func() {
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
}
for i := 1; i < 1000000; i++ {
<-results
}
}
请注意,我不得不将最后的for
循环更改为固定的迭代次数,否则它可能在所有结果被读取之前终止。
英文:
The problem is that your channels are filling up. The main()
routine tries to put all jobs into the jobs
channel before reading any results. But the results
channel only has space for 100 results before any write to the channel will block, so all the workers will eventually block waiting for space in this channel – space that will never come, because main()
has not started reading from results
yet.
To quickly fix this, you can either make jobs
big enough to hold all jobs, so the main()
function can continue to the reading phase; or you can make results
big enough to hold all results, so the workers can output their results without blocking.
A nicer approach is to make another goroutine to fill up the jobs
queue, so main()
can go straight to reading results:
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results)
}
go func() {
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
}
for i := 1; i < 1000000; i++ {
<-results
}
}
Note that I had to change the final for
loop to a fixed number of iterations, otherwise it might terminate before all the results have been read.
答案2
得分: 3
虽然Thomas的答案基本正确,但我发表了我认为更好的Go版本,而且还可以与无缓冲通道一起使用:
func main() {
jobs := make(chan int)
results := make(chan int)
var wg sync.WaitGroup
// 你可以在这里使用一次调用来初始化WaitGroup的计数,但这样容易出错
// 如果你更改了循环的大小,可能会忘记更改WG的计数。所以在循环中调用wg.Add
// wg.Add(1000)
for w := 1; w <= 1000; w++ {
wg.Add(1)
go func() {
defer wg.Done()
worker(w, jobs, results)
}()
}
go func() {
for j := 1; j < 2000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
}()
// 在这个goroutine中,我们等待所有“生产者”例程完成
// 然后关闭结果通道,以停止消费者循环
go func() {
wg.Wait()
close(results)
}()
for i := range results {
fmt.Print(i, " ")
}
fmt.Println("==========DONE==============")
}
希望对你有帮助!
英文:
While Thomas' answer is basically correct, I post my version which is IMO better Go and also works with unbuffered channels:
func main() {
jobs := make(chan int)
results := make(chan int)
var wg sync.WaitGroup
// you could init the WaitGroup's count here with one call but this is error
// prone - if you change the loop's size you could forget to change the
// WG's count. So call wg.Add in loop
//wg.Add(1000)
for w := 1; w <= 1000; w++ {
wg.Add(1)
go func() {
defer wg.Done()
worker(w, jobs, results)
}()
}
go func() {
for j := 1; j < 2000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
}()
// in this gorutine we wait until all "producer" routines are done
// then close the results channel so that the consumer loop stops
go func() {
wg.Wait()
close(results)
}()
for i := range results {
fmt.Print(i, " ")
}
fmt.Println("==========DONE==============")
}
答案3
得分: 1
以下代码:
for j := 1; j < 1000000; j++ {
jobs <- j
}
应该在一个单独的 goroutine 中运行,因为所有的工作线程都会阻塞等待主 goroutine 从结果通道接收数据,而主 goroutine 则会陷入循环中。
英文:
The following code:
for j := 1; j < 1000000; j++ {
jobs <- j
}
should run in a separate goroutine, since all the workers will block waiting for the main gorourine to receive on the results channel, while the main goroutine is stuck in the loop.
答案4
得分: 0
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Millisecond * time.Duration(10))
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
wg := new(sync.WaitGroup)
wg.Add(1000)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results, wg)
}
go func() {
wg.Wait()
close(results)
}()
go func() {
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
}()
sum := 0
for v := range results {
sum += v
}
fmt.Println("==========CLOSED==============")
fmt.Println("sum", sum)
}
英文:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Millisecond * time.Duration(10))
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
wg := new(sync.WaitGroup)
wg.Add(1000)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results, wg)
}
go func() {
wg.Wait()
close(results)
}()
go func() {
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
}()
sum := 0
for v := range results {
sum += v
}
fmt.Println("==========CLOSED==============")
fmt.Println("sum", sum)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论