如何解决生产者和消费者代码中的死锁问题

huangapple go评论130阅读模式
英文:

How to address deadlock in producer and consumer code

问题

当我运行下面的程序时,出现了错误:

davecheney      tweets about golang
beertocode      does not tweet about golang
ironzeb         tweets about golang
beertocode      tweets about golang
vampirewalk666  tweets about golang
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc000010260?)
        /usr/local/go/src/runtime/sema.go:56 +0x25
sync.(*WaitGroup).Wait(0x100c000058058?)
        /usr/local/go/src/sync/waitgroup.go:136 +0x52
main.main()
        /home/joe/go/src/github.com/go-concurrency-exercises/1-producer-consumer/main.go:53 +0x14f

死锁是从哪里发生的,如何改进程序以避免死锁?

以下是你提供的程序的代码:

package main

import (
	"fmt"
	"sync"
	"time"
)

func producer(stream Stream, tweetChan chan *Tweet) {
	for {
		tweet, err := stream.Next()
		if err == ErrEOF {
			close(tweetChan)
			return
		}
		tweetChan <- tweet
		//tweets = append(tweets, tweet)
	}
}

func consumer(tweetChan chan *Tweet) {
	for t := range tweetChan {
		if t.IsTalkingAboutGo() {
			fmt.Println(t.Username, "\ttweets about golang")
		} else {
			fmt.Println(t.Username, "\tdoes not tweet about golang")
		}
	}
}

func main() {
	start := time.Now()
	stream := GetMockStream()

	var wg sync.WaitGroup
	tweetChan := make(chan *Tweet)
	// Producer
	//tweets := producer(stream)
	wg.Add(2)
	go producer(stream, tweetChan)
	// Consumer
	//consumer(tweets)
	go consumer(tweetChan)

	wg.Wait()

	fmt.Printf("Process took %s\n", time.Since(start))
}

如果你需要查看mockstream.go,请参考以下链接:
https://github.com/loong/go-concurrency-exercises/tree/master/1-producer-consumer

根据你提供的信息,你的程序是通过修改main.go来实现原始程序的并发版本。

英文:

When I ran the program below, I got an error

davecheney      tweets about golang
beertocode      does not tweet about golang
ironzeb         tweets about golang
beertocode      tweets about golang
vampirewalk666  tweets about golang
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc000010260?)
        /usr/local/go/src/runtime/sema.go:56 +0x25
sync.(*WaitGroup).Wait(0x100c000058058?)
        /usr/local/go/src/sync/waitgroup.go:136 +0x52
main.main()
        /home/joe/go/src/github.com/go-concurrency-exercises/1-producer-consumer/main.go:53 +0x14f

Where is the deadlock coming from and how to improve the program to avoid that?

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

func producer(stream Stream, tweetChan chan *Tweet) {
	for {
		tweet, err := stream.Next()
		if err == ErrEOF {
			close(tweetChan)
			return
		}
		tweetChan &lt;- tweet
		//tweets = append(tweets, tweet)
	}
}

func consumer(tweetChan chan *Tweet) {
	for t := range tweetChan {
		if t.IsTalkingAboutGo() {
			fmt.Println(t.Username, &quot;\ttweets about golang&quot;)
		} else {
			fmt.Println(t.Username, &quot;\tdoes not tweet about golang&quot;)
		}
	}
}

func main() {
	start := time.Now()
	stream := GetMockStream()

	var wg sync.WaitGroup
	tweetChan := make(chan *Tweet)
	// Producer
	//tweets := producer(stream)
	wg.Add(2)
	go producer(stream, tweetChan)
	// Consumer
	//consumer(tweets)
	go consumer(tweetChan)

	wg.Wait()

	fmt.Printf(&quot;Process took %s\n&quot;, time.Since(start))
}

If you need to see mockstream.go, refer to
https://github.com/loong/go-concurrency-exercises/tree/master/1-producer-consumer

My program is a concurrent version of the original program by modifying main.go

答案1

得分: 1

调用wg.Wait()会等待直到组的计数器为零,但是没有正在运行的goroutine来减少计数器。

修复方法是在goroutine函数返回之前调用wg.Done()

func producer(wg *sync.WaitGroup, stream Stream, tweetChan chan *Tweet) {
    defer wg.Done()
    for {
        tweet, err := stream.Next()
        if err == ErrEOF {
            close(tweetChan)
            return
        }
        tweetChan <- tweet
    }
}

func consumer(wg *sync.WaitGroup, tweetChan chan *Tweet) {
    defer wg.Done()
    for t := range tweetChan {
        if t.IsTalkingAboutGo() {
            fmt.Println(t.Username, "\ttweets about golang")
        } else {
            fmt.Println(t.Username, "\tdoes not tweet about golang")
        }
    }
}

func main() {
    start := time.Now()
    stream := GetMockStream()
    var wg sync.WaitGroup
    tweetChan := make(chan *Tweet)
    wg.Add(2)
    go producer(&wg, stream, tweetChan)
    go consumer(&wg, tweetChan)
    wg.Wait()
    fmt.Printf("Process took %s\n", time.Since(start))
}
英文:

The call to wg.Wait() is waiting until the group's counter is zero, but there are no running goroutines to decrement the counter.

Fix by calling wg.Done() before returning from the goroutine functions:

func producer(wg *sync.WaitGroup, stream Stream, tweetChan chan *Tweet) {
    defer wg.Done()
    for {
        tweet, err := stream.Next()
        if err == ErrEOF {
            close(tweetChan)
            return
        }
        tweetChan &lt;- tweet
    }
}

func consumer(wg *sync.WaitGroup, tweetChan chan *Tweet) {
    defer wg.Done()
    for t := range tweetChan {
        if t.IsTalkingAboutGo() {
            fmt.Println(t.Username, &quot;\ttweets about golang&quot;)
        } else {
            fmt.Println(t.Username, &quot;\tdoes not tweet about golang&quot;)
        }
    }
}

func main() {
    start := time.Now()
    stream := GetMockStream()
    var wg sync.WaitGroup
    tweetChan := make(chan *Tweet)
    wg.Add(2)
    go producer(&amp;wg, stream, tweetChan)
    go consumer(&amp;wg, tweetChan)
    wg.Wait()
    fmt.Printf(&quot;Process took %s\n&quot;, time.Since(start))
}

huangapple
  • 本文由 发表于 2022年9月19日 08:10:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/73767181.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定