英文:
How to sync goroutines in pipeline
问题
我需要帮助理解为什么以下代码不起作用。我正在构建一个流水线,并尝试添加一个步骤,用于同步两个源通道的值。我的源/生产者代码如下所示(在我的实际代码中,我从文件中读取文本)。这些源已经排序,但不能保证两个源中都有值。
func Source() <-chan int {
out := make(chan int, 5)
go func() {
defer close(out)
out <- 1
out <- 2
out <- 3
out <- 4
out <- 5
out <- 7
}()
return out
}
同步代码如下所示:
func Sync(a, b <-chan int) <-chan int {
out := make(chan int)
go func() {
av, ak := <-a
bv, bk := <-b
for ak || bk {
if !ak || av < bv {
out <- bv
bv, bk = <-b
continue
}
if !bk || bv > av {
out <- av
av, ak = <-a
continue
}
out <- av
av, ak = <-a
bv, bk = <-b
}
close(out)
}()
return out
}
我的程序大致如下:
func main() {
os := Source()
ns := Source()
for val := range Sync(ns, os) {
fmt.Printf("[SYNCED] %v \n", val)
}
}
期望的行为是,两个源将值缓冲到通道中,我的同步函数首先从第一个源读取值,然后从第二个源读取值。比较它们,如果相等,则继续读取下一个值。如果不相等,我们将发送落后的值,并用新值替换它,然后再次进行相同的比较。
发生的情况是,同步代码似乎对值运行了多次,我会多次得到类似 "[SYNCED] 1" 的输出。为什么会这样?
请帮助我解决这个问题!
英文:
I would need help to understand why the following code does not work. I am building a pipeline and trying to have a step that synchronize values from two source channels. My source/producer code looks something like below (in my real code i read the text from a file). The sources are sorted but are values are not guaranteed to be in both sources.
func Source() <-chan int{
out := make(chan int, 5)
go func() {
defer reader.Close()
out <- 1
out <- 2
out <- 3
out <- 4
out <- 5
out <- 7
close(out)
}()
return out
}
and the synchronization code looks like this:
func Sync(a, b <-chan int) <-chan int {
out := make(chan int)
go func() {
av, ak:= <-a
bv, bk:= <-b
for ak || bk {
if !ak || av < bv {
out <- bv
bv, bk = <-b
continue
}
if !bk|| bv > av {
out <- av
av, ak = <-a
continue
}
out <- av
av, ak = <-a
bv, bk = <-b
}
close(out)
}()
return out
}
and my program looks something like this:
func main() {
os := Source()
ns := Source()
for val := range Sync(ns, os) {
fmt.Printf("[SYNCED] %v \n", val)
}
}
The expected behaviour is that my both sources buffer values into the channel and my sync first reads value from the first source. Then from the second. Compare them and if they are equal continues to the next in the both channels. If the differ we will send out the value that is behind and replace it with a new one and make the same comparison again.
What happends is that it looks like the sync code is run several times for the values and I will get things like [SYNCED] 1 several times. Why?
Please help me get this fixed!
答案1
得分: 2
关于 http://play.golang.org/p/uhd3EWrwEo 和 http://play.golang.org/p/Dqq7-cPaFq -
实际上,整数的代码在类似的测试用例下也会失败:
os := Source([]int{1, 2, 3})
ns := Source([]int{1, 3, 4})
会使整数版本进入无限循环。
这是因为在检查 !aok || avalue > bvalue
时,它没有考虑到如果 aok
为真(a
中仍然有元素)且 bok
为假(b
中没有更多元素),那么 avalue > ""
总是为真。因此它尝试从 b
中取出另一个项(其中为空),然后进入无限循环。修复后的代码:http://play.golang.org/p/vYhuOZxRMl
英文:
Regarding http://play.golang.org/p/uhd3EWrwEo and http://play.golang.org/p/Dqq7-cPaFq -
Actually, the code for ints will fail with the similar test case as well:
os := Source([]int{1, 2, 3})
ns := Source([]int{1, 3, 4})
puts the ints version to infinite loop.
This happens because when !aok || avalue > bvalue
is checked, it does not take into account that if aok
is true (some elements still are in a
) and bok
is false (no more elements in b
), then avalue > ""
is always true. So it tries to take another item from b
(which is empty) and goes to infinite loop. Fixed code: http://play.golang.org/p/vYhuOZxRMl
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论