英文:
all goroutines are asleep - deadlock, on a buffered channel, do not understand why
问题
我只想创建一定数量的goroutine,比如5个,但是我可以接收可变数量的任务。
这是我尝试实现的代码,下面是测试代码。
package main
import (
"context"
"fmt"
"runtime"
"time"
)
func doWork(size int, capacity int) int {
start := time.Now()
jobs := make(chan *Job, capacity)
results := make(chan *Job, capacity)
sem := make(chan struct{}, capacity)
go chanWorker(jobs, results, sem)
for i := 0; i < size; i++ {
jobs <- &Job{id: i}
}
close(jobs)
successCount := 0
for i := 0; i < size; i++ {
item := <-results
if item.result {
successCount++
}
fmt.Printf("Job %d completed %v\n", item.id, item.result)
}
close(results)
close(sem)
fmt.Printf("Time taken to execute %d jobs with %d capacity = %v\n", size, capacity, time.Since(start))
return successCount
}
func chanWorker(jobs <-chan *Job, results chan<- *Job, sem chan struct{}) {
for item := range jobs {
it := item
sem <- struct{}{}
fmt.Printf("Job %d started\n", it.id)
go func() {
timeOutCtx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()
time.Sleep(time.Duration(it.id) * 100 * time.Millisecond)
select {
case <-timeOutCtx.Done():
fmt.Printf("Job %d timed out\n", it.id)
it.result = false
results <- it
<-sem
return
default:
fmt.Printf("Total number of routines %d\n", runtime.NumGoroutine())
it.result = true
results <- it
<-sem
}
}()
}
}
这是对它的测试
package main
import (
"testing"
)
func Test_doWork(t *testing.T) {
type args struct {
size int
capacity int
}
tests := []struct {
name string
args args
want int
}{
{
name: "jobs 10 capacity 5",
args: args{
size: 10,
capacity: 5,
},
want: 3,
},
{
name: "jobs 100 capacity 5",
args: args{
size: 100,
capacity: 5,
},
want: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := doWork(tt.args.size, tt.args.capacity); got < tt.want {
t.Errorf("doWork() = %v, want %v", got, tt.want)
}
})
}
}
测试jobs 10 capacity 5
是可以工作的,但是jobs 100 capacity 5
失败了。
如果我将容量设置为50,对于100个任务,它可以工作,但是对于30个任务就不行了,我无法理解这种行为。
这是我对通道的理解,并期望它能正常工作。
带缓冲的通道在满时会阻塞,直到有一些可用的容量。我期望一旦jobs通道满了,它就会阻塞,直到chanWorker释放一些容量。chanWorker本身接收一个容量,并使用空结构体来确保不会创建超过5个worker。
为什么我会收到错误消息fatal error: all goroutines are asleep - deadlock!
?
英文:
I want to only create certain number of go routines, say 5, but I can receive variable number of jobs.
Here is my code that attempts to do it and the test is below it.
package main
import (
"context"
"fmt"
"runtime"
"time"
)
func doWork(size int, capacity int) int {
start := time.Now()
jobs := make(chan *Job, capacity)
results := make(chan *Job, capacity)
sem := make(chan struct{}, capacity)
go chanWorker(jobs, results, sem)
for i := 0; i < size; i++ {
jobs <- &Job{id: i}
}
close(jobs)
successCount := 0
for i := 0; i < size; i++ {
item := <-results
if item.result {
successCount++
}
fmt.Printf("Job %d completed %v\n", item.id, item.result)
}
close(results)
close(sem)
fmt.Printf("Time taken to execute %d jobs with %d capacity = %v\n", size, capacity, time.Since(start))
return successCount
}
func chanWorker(jobs <-chan *Job, results chan<- *Job, sem chan struct{}) {
for item := range jobs {
it := item
sem <- struct{}{}
fmt.Printf("Job %d started\n", it.id)
go func() {
timeOutCtx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()
time.Sleep(time.Duration(it.id) * 100 * time.Millisecond)
select {
case <-timeOutCtx.Done():
fmt.Printf("Job %d timed out\n", it.id)
it.result = false
results <- it
<-sem
return
default:
fmt.Printf("Total number of routines %d\n", runtime.NumGoroutine())
it.result = true
results <- it
<-sem
}
}()
}
}
The test for this
package main
import (
"testing"
)
func Test_doWork(t *testing.T) {
type args struct {
size int
capacity int
}
tests := []struct {
name string
args args
want int
}{
{
name: "jobs 10 capacity 5",
args: args{
size: 10,
capacity: 5,
},
want: 3,
},
{
name: "jobs 100 capacity 5",
args: args{
size: 100,
capacity: 5,
},
want: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := doWork(tt.args.size, tt.args.capacity); got < tt.want {
t.Errorf("doWork() = %v, want %v", got, tt.want)
}
})
}
}
The test jobs 10 capacity 5
works, but jobs 100 capacity 5
fails.
If I make capacity 50 for 100 jobs, it works, doesn't work for 30 and cannot understand the behavior.
Here is how I understood of channels and expected it tow work.
A buffered channel will block if its full, until there is some free capacity available. I expected that as soon as the jobs channel is full, it will block until chanWorker frees some of it. The chanWorker itself receives a capacity and makes use of empty struct to ensure no more than 5 worker is created.
Why should I ever get an error
fatal error: all goroutines are asleep - deadlock!
?
答案1
得分: 1
因为主 goroutine 在将所有作业发送到 jobs
之前不会从 results
接收值,所以工作线程在向 results
发送时被阻塞。主 goroutine 在发送到 jobs
时被阻塞,因为工作线程被阻塞。死锁!
通过使用 goroutine 来提供工作来修复。
go func() {
for i := 0; i < size; i++ {
jobs <- &Job{id: i}
}
close(jobs)
}()
点击此处 查看示例代码。
英文:
Because the main goroutine does not receive values from results
until all jobs are sent to jobs
, the workers block on send to results
. The main goroutine blocks sending to jobs
because the works are blocked. Deadlock!
Fix by using goroutine to feed the work.
go func() {
for i := 0; i < size; i++ {
jobs <- &Job{id: i}
}
close(jobs)
}()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论