英文:
Closure inconsistency between parallel and sequential execution
问题
我尝试编写了一个通用函数,可以并行或顺序执行函数。在测试过程中,我发现了一些关于闭包的非常意外的行为。在下面的代码中,我定义了一个接受无参数并返回错误的函数列表。这些函数还在闭包中使用了一个for循环变量,但我使用了在循环中定义一个新变量的技巧,以避免捕获。
我期望可以顺序或并发地调用这些函数,效果应该是相同的,但我看到了不同的结果。就像在并发运行时闭包变量被捕获一样。
据我所知,这不是通常捕获循环变量的情况。正如我提到的,我在循环中定义了一个新变量。而且,我没有在循环内部运行闭包函数。我在循环中生成了一个函数列表,但是在循环之后执行这些函数。
我使用的是go version go1.8.3 linux/amd64。
以下是上述测试函数的输出结果:
closure_test.go:91: outer i=0, j=0
closure_test.go:91: outer i=1, j=1
closure_test.go:91: outer i=2, j=2
closure_test.go:91: outer i=3, j=3
closure_test.go:99: Running funcs sequentially
closure_test.go:93: inner i=4, j=0
closure_test.go:93: inner i=4, j=1
closure_test.go:93: inner i=4, j=2
closure_test.go:93: inner i=4, j=3
closure_test.go:104: Running funcs in parallel
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:80: Output channel has 1 unique items; wanted 4
有什么想法吗?这是Go语言的一个错误吗?
英文:
I have attempted to write a generic function that can execute functions in parallel or sequentially. While testing it, I have found some very unexpected behavior regarding closures. In the code below, I define a list of functions that accept no parameters and return an error. The functions also use a for loop variable in a closure but I'm using the trick of defining a new variable within the loop in an attempt to avoid capture.
I'm expecting that I can call these functions sequentially or concurrently with the same effect but I'm seeing different results. It's as if the closure variable is being captured but only when run concurrently.
As far as I can tell, this is not the usual case of capturing a loop variable. As I mentioned, I'm defining a new variable within the loop. Also, I'm not running the closure function within the loop. I'm generating a list of functions within the loop but I'm executing the functions after the loop.
I'm using go version go1.8.3 linux/amd64.
package closure_test
import (
"sync"
"testing"
)
// MergeErrors merges multiple channels of errors.
// Based on https://blog.golang.org/pipelines.
func MergeErrors(cs ...<-chan error) <-chan error {
var wg sync.WaitGroup
out := make(chan error)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan error) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
// WaitForPipeline waits for results from all error channels.
// It returns early on the first error.
func WaitForPipeline(errs ...<-chan error) error {
errc := MergeErrors(errs...)
for err := range errc {
if err != nil {
return err
}
}
return nil
}
func RunInParallel(funcs ...func() error) error {
var errcList [](<-chan error)
for _, f := range funcs {
errc := make(chan error, 1)
errcList = append(errcList, errc)
go func() {
err := f()
if err != nil {
errc <- err
}
close(errc)
}()
}
return WaitForPipeline(errcList...)
}
func RunSequentially(funcs ...func() error) error {
for _, f := range funcs {
err := f()
if err != nil {
return err
}
}
return nil
}
func validateOutputChannel(t *testing.T, out chan int, n int) {
m := map[int]bool{}
for i := 0; i < n; i++ {
m[<-out] = true
}
if len(m) != n {
t.Errorf("Output channel has %v unique items; wanted %v", len(m), n)
}
}
// This fails because j is being captured.
func TestClosure1sp(t *testing.T) {
n := 4
out := make(chan int, n*2)
var funcs [](func() error)
for i := 0; i < n; i++ {
j := i // define a new variable that has scope only inside the current loop iteration
t.Logf("outer i=%v, j=%v", i, j)
f := func() error {
t.Logf("inner i=%v, j=%v", i, j)
out <- j
return nil
}
funcs = append(funcs, f)
}
t.Logf("Running funcs sequentially")
if err := RunSequentially(funcs...); err != nil {
t.Fatal(err)
}
validateOutputChannel(t, out, n)
t.Logf("Running funcs in parallel")
if err := RunInParallel(funcs...); err != nil {
t.Fatal(err)
}
close(out)
validateOutputChannel(t, out, n)
}
Below is the output from the test function above.
closure_test.go:91: outer i=0, j=0
closure_test.go:91: outer i=1, j=1
closure_test.go:91: outer i=2, j=2
closure_test.go:91: outer i=3, j=3
closure_test.go:99: Running funcs sequentially
closure_test.go:93: inner i=4, j=0
closure_test.go:93: inner i=4, j=1
closure_test.go:93: inner i=4, j=2
closure_test.go:93: inner i=4, j=3
closure_test.go:104: Running funcs in parallel
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:80: Output channel has 1 unique items; wanted 4
Any ideas? Is this a bug in Go?
答案1
得分: 4
我相信你的问题出在RunInParallel
函数中。
func RunInParallel(funcs ...func() error) error {
var errcList [](<-chan error)
for _, f := range funcs {
errc := make(chan error, 1)
errcList = append(errcList, errc)
go func() {
// 这一行可能在你的循环结束之前没有被执行到,意味着在每个goroutine开始时,f都是最后一个函数。
// 如果你在循环内部使用另一个变量来捕获f,就不会有这个问题。
err := f()
if err != nil {
errc <- err
}
close(errc)
}()
}
return WaitForPipeline(errcList...)
}
你也可以将f作为参数传递给匿名函数,以避免这个问题。
for _, f := range funcs {
errc := make(chan error, 1)
errcList = append(errcList, errc)
go func(g func() error) {
err := g()
if err != nil {
errc <- err
}
close(errc)
}(f)
}
这里是Playground上的一个实时示例。
英文:
I believe your problem lies in your RunInParallel
func.
func RunInParallel(funcs ...func() error) error {
var errcList [](<-chan error)
for _, f := range funcs {
errc := make(chan error, 1)
errcList = append(errcList, errc)
go func() {
// This line probably isn't being reached until your range
// loop has completed, meaning f is the last func by the time
// each goroutine starts. If you capture f
// in another variable inside the range, you won't have this issue.
err := f()
if err != nil {
errc <- err
}
close(errc)
}()
}
return WaitForPipeline(errcList...)
}
You could also pass f as a parameter to your anonymous function to avoid this issue.
for _, f := range funcs {
errc := make(chan error, 1)
errcList = append(errcList, errc)
go func(g func() error) {
err := g()
if err != nil {
errc <- err
}
close(errc)
}(f)
}
Here is a live example in the playground.
答案2
得分: 4
始终使用-race选项运行你的测试。在你的情况下,你忘记在RunInParallel
的每次迭代中重新创建f
:
func RunInParallel(funcs ...func() error) error {
var errcList [](<-chan error)
for _, f := range funcs {
f := f // << 在这里
errc := make(chan error, 1)
errcList = append(errcList, errc)
go func() {
err := f()
if err != nil {
errc <- err
}
close(errc)
}()
}
return WaitForPipeline(errcList...)
}
因此,你总是只启动了最后一个f
,而不是每个函数。
英文:
Always run your tests with -race. In your case, you forgot to recreate f
on each iteration in RunInParallel
:
func RunInParallel(funcs ...func() error) error {
var errcList [](<-chan error)
for _, f := range funcs {
f := f // << HERE
errc := make(chan error, 1)
errcList = append(errcList, errc)
go func() {
err := f()
if err != nil {
errc <- err
}
close(errc)
}()
}
return WaitForPipeline(errcList...)
}
As a result, you always launched the last f
instead of each one.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论