all goroutines are asleep – deadlock, on a buffered channel, do not understand why

huangapple go评论83阅读模式
英文:

all goroutines are asleep - deadlock, on a buffered channel, do not understand why

问题

我只想创建一定数量的goroutine,比如5个,但是我可以接收可变数量的任务。

这是我尝试实现的代码,下面是测试代码。

package main

import (
	"context"
	"fmt"
	"runtime"
	"time"
)

func doWork(size int, capacity int) int {
	start := time.Now()
	jobs := make(chan *Job, capacity)
	results := make(chan *Job, capacity)
	sem := make(chan struct{}, capacity)
	go chanWorker(jobs, results, sem)
	for i := 0; i < size; i++ {
		jobs <- &Job{id: i}
	}
	close(jobs)
	successCount := 0
	for i := 0; i < size; i++ {
		item := <-results
		if item.result {
			successCount++
		}
		fmt.Printf("Job %d completed %v\n", item.id, item.result)
	}
	close(results)
	close(sem)
	fmt.Printf("Time taken to execute %d jobs with %d capacity = %v\n", size, capacity, time.Since(start))
	return successCount
}

func chanWorker(jobs <-chan *Job, results chan<- *Job, sem chan struct{}) {

	for item := range jobs {
		it := item
		sem <- struct{}{}
		fmt.Printf("Job %d started\n", it.id)
		go func() {
			timeOutCtx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
			defer cancel()
			time.Sleep(time.Duration(it.id) * 100 * time.Millisecond)
			select {
			case <-timeOutCtx.Done():
				fmt.Printf("Job %d timed out\n", it.id)
				it.result = false
				results <- it
				<-sem
				return
			default:
				fmt.Printf("Total number of routines %d\n", runtime.NumGoroutine())
				it.result = true
				results <- it
				<-sem
			}
		}()
	}
}


这是对它的测试

package main

import (
	"testing"
)

func Test_doWork(t *testing.T) {
	type args struct {
		size     int
		capacity int
	}
	tests := []struct {
		name string
		args args
		want int
	}{
		{
			name: "jobs 10 capacity 5",
			args: args{
				size:     10,
				capacity: 5,
			},
			want: 3,
		},
		{
			name: "jobs 100 capacity 5",
			args: args{
				size:     100,
				capacity: 5,
			},
			want: 3,
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			if got := doWork(tt.args.size, tt.args.capacity); got < tt.want {
				t.Errorf("doWork() = %v, want %v", got, tt.want)
			}
		})
	}
}

测试jobs 10 capacity 5是可以工作的,但是jobs 100 capacity 5失败了。

如果我将容量设置为50,对于100个任务,它可以工作,但是对于30个任务就不行了,我无法理解这种行为。

这是我对通道的理解,并期望它能正常工作。

带缓冲的通道在满时会阻塞,直到有一些可用的容量。我期望一旦jobs通道满了,它就会阻塞,直到chanWorker释放一些容量。chanWorker本身接收一个容量,并使用空结构体来确保不会创建超过5个worker。

为什么我会收到错误消息fatal error: all goroutines are asleep - deadlock!

英文:

I want to only create certain number of go routines, say 5, but I can receive variable number of jobs.

Here is my code that attempts to do it and the test is below it.

package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;runtime&quot;
	&quot;time&quot;
)

func doWork(size int, capacity int) int {
	start := time.Now()
	jobs := make(chan *Job, capacity)
	results := make(chan *Job, capacity)
	sem := make(chan struct{}, capacity)
	go chanWorker(jobs, results, sem)
	for i := 0; i &lt; size; i++ {
		jobs &lt;- &amp;Job{id: i}
	}
	close(jobs)
	successCount := 0
	for i := 0; i &lt; size; i++ {
		item := &lt;-results
		if item.result {
			successCount++
		}
		fmt.Printf(&quot;Job %d completed %v\n&quot;, item.id, item.result)
	}
	close(results)
	close(sem)
	fmt.Printf(&quot;Time taken to execute %d jobs with %d capacity = %v\n&quot;, size, capacity, time.Since(start))
	return successCount
}

func chanWorker(jobs &lt;-chan *Job, results chan&lt;- *Job, sem chan struct{}) {

	for item := range jobs {
		it := item
		sem &lt;- struct{}{}
		fmt.Printf(&quot;Job %d started\n&quot;, it.id)
		go func() {
			timeOutCtx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
			defer cancel()
			time.Sleep(time.Duration(it.id) * 100 * time.Millisecond)
			select {
			case &lt;-timeOutCtx.Done():
				fmt.Printf(&quot;Job %d timed out\n&quot;, it.id)
				it.result = false
				results &lt;- it
				&lt;-sem
				return
			default:
				fmt.Printf(&quot;Total number of routines %d\n&quot;, runtime.NumGoroutine())
				it.result = true
				results &lt;- it
				&lt;-sem
			}
		}()
	}
}


The test for this

package main

import (
	&quot;testing&quot;
)

func Test_doWork(t *testing.T) {
	type args struct {
		size     int
		capacity int
	}
	tests := []struct {
		name string
		args args
		want int
	}{
		{
			name: &quot;jobs 10 capacity 5&quot;,
			args: args{
				size:     10,
				capacity: 5,
			},
			want: 3,
		},
		{
			name: &quot;jobs 100 capacity 5&quot;,
			args: args{
				size:     100,
				capacity: 5,
			},
			want: 3,
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			if got := doWork(tt.args.size, tt.args.capacity); got &lt; tt.want {
				t.Errorf(&quot;doWork() = %v, want %v&quot;, got, tt.want)
			}
		})
	}
}

The test jobs 10 capacity 5 works, but jobs 100 capacity 5 fails.

If I make capacity 50 for 100 jobs, it works, doesn't work for 30 and cannot understand the behavior.

Here is how I understood of channels and expected it tow work.

A buffered channel will block if its full, until there is some free capacity available. I expected that as soon as the jobs channel is full, it will block until chanWorker frees some of it. The chanWorker itself receives a capacity and makes use of empty struct to ensure no more than 5 worker is created.

Why should I ever get an error
fatal error: all goroutines are asleep - deadlock! ?

答案1

得分: 1

因为主 goroutine 在将所有作业发送到 jobs 之前不会从 results 接收值,所以工作线程在向 results 发送时被阻塞。主 goroutine 在发送到 jobs 时被阻塞,因为工作线程被阻塞。死锁!

通过使用 goroutine 来提供工作来修复。

go func() {
    for i := 0; i < size; i++ {
        jobs <- &Job{id: i}
    }
    close(jobs)
}()

点击此处 查看示例代码。

英文:

Because the main goroutine does not receive values from results until all jobs are sent to jobs, the workers block on send to results. The main goroutine blocks sending to jobs because the works are blocked. Deadlock!

Fix by using goroutine to feed the work.

go func() {
for i := 0; i &lt; size; i++ {
jobs &lt;- &amp;Job{id: i}
}
close(jobs)
}()

https://go.dev/play/p/WlbPRfxngRV

huangapple
  • 本文由 发表于 2023年4月16日 14:37:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/76026243.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定