英文:
golang used waitGroup and still ended up with a deadlock error
问题
我正在尝试按照《Go并发编程》一书中的示例实现桥接模式。
func bridge_impl() {
    done := make(chan interface{})
    defer close(done)
    var wg sync.WaitGroup
    bridge := func(
        done <-chan interface{},
        chanStream <-chan <-chan interface{},
    ) <-chan interface{} {
        valStream := make(chan interface{})
        go func() {
            wg.Add(1)
            defer close(valStream)
            for {
                var stream <-chan interface{}
                select {
                case maybeStream, ok := <-chanStream:
                    fmt.Println("works")
                    if ok == false {
                        return
                    }
                    stream = maybeStream
                case <-done:
                    return
                }
                for val := range stream {
                    select {
                    case valStream <- val:
                    case <-done:
                    }
                }
            }
        }()
        return valStream
    }
    genVals := func() <-chan <-chan interface{} {
        chanStream := make(chan (<-chan interface{}))
        go func() {
            wg.Add(1)
            defer close(chanStream)
            for i := 0; i < 10; i++ {
                stream := make(chan interface{})
                stream <- i
                close(stream)
                chanStream <- stream
            }
        }()
        return chanStream
    }
    for v := range bridge(done, genVals()) {
        fmt.Printf("%v ", v)
    }
    wg.Wait()
}
然而,我遇到了一个死锁错误all goroutines are asleep - deadlock!。起初,我认为即使在书中的示例中没有实现,我也应该添加一个WaitGroup,但结果仍然出现了相同的错误。
英文:
I'm trying to implement the bridge pattern following Go Concurrency book
func bridge_impl() {
done := make(chan interface{})
defer close(done)
var wg sync.WaitGroup
bridge := func(
done <-chan interface{},
chanStream <-chan <-chan interface{},
) <-chan interface{} {
valStream := make(chan interface{})
go func() {
wg.Add(1)
defer close(valStream)
for {
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
fmt.Println("works")
if ok == false {
return
}
stream = maybeStream
case <-done:
return
}
for val := range stream {
select {
case valStream <- val:
case <-done:
}
}
}
}()
return valStream
}
genVals := func() <-chan <-chan interface{} {
chanStream := make(chan (<-chan interface{}))
go func() {
wg.Add(1)
defer close(chanStream)
for i := 0; i < 10; i++ {
stream := make(chan interface{})
stream <- i
close(stream)
chanStream <- stream
}
}()
return chanStream
}
for v := range bridge(done, genVals()) {
fmt.Printf("%v ", v)
}
wg.Wait()
}
However I'm receiving a deadlock errorall goroutines are asleep - deadlock! at first I thought I should add a waitgroup even though it wasn't implemented in the book example but I ended up with the same error
答案1
得分: 2
有两个主要问题。
第一个问题:
for i := 0; i < 10; i++ {
stream := make(chan interface{})
stream <- i
close(stream)
chanStream <- stream
}
在创建后向无缓冲通道写入数据,但没有 goroutine 读取。请使用带缓冲的通道或另一个 goroutine。
stream := make(chan interface{}, 1) // 缓冲大小为 1,以避免 `stream <- i` 阻塞
第二个问题:
在使用 wg.Add(1) 时没有使用 wg.Done()。你可以在两种情况下都使用 defer。
wg.Add(1)
defer wg.Done()
英文:
There are two main issues.
Working example
First issue:
for i := 0; i < 10; i++ {
stream := make(chan interface{})
stream <- i
close(stream)
chanStream <- stream
}
writing to unbuffered channel after creation with no goroutine reading. Use buffered channel or another goroutine.
stream := make(chan interface{}, 1) // buffer size 1 to not block `stream <- i`
Second issue:
Using wg.Add(1) without wg.Done().
You can use defer in both cases.
wg.Add(1)
defer wg.Done()
答案2
得分: 2
根据我理解,你根本不需要使用WaitGroup,只需要重新排列genVals函数循环中的语句顺序即可:
for i := 0; i < 10; i++ {
	stream := make(chan interface{})
	chanStream <- stream
	stream <- i
	close(stream)
}
你可以在以下链接中查看代码示例:https://go.dev/play/p/7D9OzrsvZyi
英文:
From what I understand you do not need a WaitGroup at all, you just need to re-order the statements in the genVals function's loop:
for i := 0; i < 10; i++ {
	stream := make(chan interface{})
	chanStream <- stream
	stream <- i
	close(stream)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论