如何在管道中同步goroutine

huangapple go评论104阅读模式
英文:

How to sync goroutines in pipeline

问题

我需要帮助理解为什么以下代码不起作用。我正在构建一个流水线,并尝试添加一个步骤,用于同步两个源通道的值。我的源/生产者代码如下所示(在我的实际代码中,我从文件中读取文本)。这些源已经排序,但不能保证两个源中都有值。

func Source() <-chan int {
    out := make(chan int, 5)

    go func() {
        defer close(out)

        out <- 1
        out <- 2
        out <- 3
        out <- 4
        out <- 5
        out <- 7
    }()

    return out
}

同步代码如下所示:

func Sync(a, b <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        av, ak := <-a
        bv, bk := <-b

        for ak || bk {

            if !ak || av < bv {
                out <- bv

                bv, bk = <-b
                continue
            }

            if !bk || bv > av {
                out <- av

                av, ak = <-a
                continue
            }

            out <- av

            av, ak = <-a
            bv, bk = <-b
        }

        close(out)
    }()

    return out
}

我的程序大致如下:

func main() {
    os := Source()
    ns := Source()

    for val := range Sync(ns, os) {
        fmt.Printf("[SYNCED] %v \n", val)
    }
}

期望的行为是,两个源将值缓冲到通道中,我的同步函数首先从第一个源读取值,然后从第二个源读取值。比较它们,如果相等,则继续读取下一个值。如果不相等,我们将发送落后的值,并用新值替换它,然后再次进行相同的比较。

发生的情况是,同步代码似乎对值运行了多次,我会多次得到类似 "[SYNCED] 1" 的输出。为什么会这样?

请帮助我解决这个问题!

英文:

I would need help to understand why the following code does not work. I am building a pipeline and trying to have a step that synchronize values from two source channels. My source/producer code looks something like below (in my real code i read the text from a file). The sources are sorted but are values are not guaranteed to be in both sources.

func Source() &lt;-chan int{
	out := make(chan int, 5)

	go func() {
		defer reader.Close()

		out &lt;- 1
		out &lt;- 2
		out &lt;- 3
		out &lt;- 4
		out &lt;- 5
		out &lt;- 7

		close(out)
	}()

	return out
}

and the synchronization code looks like this:

func Sync(a, b &lt;-chan int) &lt;-chan int {
	out := make(chan int)

	go func() {
		av, ak:= &lt;-a
		bv, bk:= &lt;-b

		for ak || bk {

			if !ak || av &lt; bv {
				out &lt;- bv

				bv, bk = &lt;-b
				continue
			}

			if !bk|| bv &gt; av {
				out &lt;- av

				av, ak = &lt;-a
				continue
			}

			out &lt;- av

			av, ak = &lt;-a
			bv, bk = &lt;-b
		}

		close(out)
	}()

	return out
}

and my program looks something like this:

func main() {
	os := Source()
	ns := Source()

	for val := range Sync(ns, os) {
		fmt.Printf(&quot;[SYNCED] %v \n&quot;, val)
	}
}

The expected behaviour is that my both sources buffer values into the channel and my sync first reads value from the first source. Then from the second. Compare them and if they are equal continues to the next in the both channels. If the differ we will send out the value that is behind and replace it with a new one and make the same comparison again.

What happends is that it looks like the sync code is run several times for the values and I will get things like [SYNCED] 1 several times. Why?

Please help me get this fixed!

答案1

得分: 2

关于 http://play.golang.org/p/uhd3EWrwEohttp://play.golang.org/p/Dqq7-cPaFq -

实际上,整数的代码在类似的测试用例下也会失败:

os := Source([]int{1, 2, 3})
ns := Source([]int{1, 3, 4})

会使整数版本进入无限循环。

这是因为在检查 !aok || avalue > bvalue 时,它没有考虑到如果 aok 为真(a 中仍然有元素)且 bok 为假(b 中没有更多元素),那么 avalue > "" 总是为真。因此它尝试从 b 中取出另一个项(其中为空),然后进入无限循环。修复后的代码:http://play.golang.org/p/vYhuOZxRMl

英文:

Regarding http://play.golang.org/p/uhd3EWrwEo and http://play.golang.org/p/Dqq7-cPaFq -

Actually, the code for ints will fail with the similar test case as well:

os := Source([]int{1, 2, 3})
ns := Source([]int{1, 3, 4})

puts the ints version to infinite loop.

This happens because when !aok || avalue &gt; bvalue is checked, it does not take into account that if aok is true (some elements still are in a) and bok is false (no more elements in b), then avalue &gt; &quot;&quot; is always true. So it tries to take another item from b (which is empty) and goes to infinite loop. Fixed code: http://play.golang.org/p/vYhuOZxRMl

huangapple
  • 本文由 发表于 2015年7月26日 07:07:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/31631974.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定