Go 通道无限循环

huangapple go评论88阅读模式
英文:

Go channel infinite loop

问题

我正在尝试使用通道来捕获一组 goroutine 中的错误,但是通道进入了一个无限循环,开始消耗 CPU。

func UnzipFile(f *bytes.Buffer, location string) error {
    zipReader, err := zip.NewReader(bytes.NewReader(f.Bytes()), int64(f.Len()))

    if err != nil {
        return err
    }

    if err := os.MkdirAll(location, os.ModePerm); err != nil {
        return err
    }

    errorChannel := make(chan error)
    errorList := []error{}

    go errorChannelWatch(errorChannel, errorList)

    fileWaitGroup := &sync.WaitGroup{}

    for _, file := range zipReader.File {
        fileWaitGroup.Add(1)
        go writeZipFileToLocal(file, location, errorChannel, fileWaitGroup)
    }

    fileWaitGroup.Wait()

    close(errorChannel)

    log.Println(errorList)

    return nil
}

func errorChannelWatch(ch chan error, list []error) {
    for {
        select {
        case err := <-ch:

            list = append(list, err)
        }
    }
}

func writeZipFileToLocal(file *zip.File, location string, ch chan error, wg *sync.WaitGroup) {
    defer wg.Done()

    zipFilehandle, err := file.Open()

    if err != nil {
        ch <- err
        return
    }

    defer zipFilehandle.Close()

    if file.FileInfo().IsDir() {
        if err := os.MkdirAll(filepath.Join(location, file.Name), os.ModePerm); err != nil {
            ch <- err
        }
        return
    }

    localFileHandle, err := os.OpenFile(filepath.Join(location, file.Name), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, file.Mode())

    if err != nil {
        ch <- err
        return
    }

    defer localFileHandle.Close()

    if _, err := io.Copy(localFileHandle, zipFilehandle); err != nil {
        ch <- err
        return
    }

    ch <- fmt.Errorf("Test error")
}

所以,我正在循环一个文件切片,并将它们写入磁盘,当出现错误时,我会将错误报告给 errorChannel,以便将该错误保存到切片中。

我使用 sync.WaitGroup 来等待所有的 goroutine 完成,当它们完成时,我想打印 errorList 并检查执行过程中是否有任何错误。

无论如何,该列表始终为空,即使我在 writeZipFileToLocal 的末尾添加了 ch <- fmt.Errorf("test"),通道仍然挂起。

我不确定我漏掉了什么。

英文:

I am trying to catch errors from a group of goroutines using a channel, but the channel enters an infinite loop, starts consuming CPU.

func UnzipFile(f *bytes.Buffer, location string) error {
zipReader, err := zip.NewReader(bytes.NewReader(f.Bytes()), int64(f.Len()))
if err != nil {
return err
}
if err := os.MkdirAll(location, os.ModePerm); err != nil {
return err
}
errorChannel := make(chan error)
errorList := []error{}
go errorChannelWatch(errorChannel, errorList)
fileWaitGroup := &amp;sync.WaitGroup{}
for _, file := range zipReader.File {
fileWaitGroup.Add(1)
go writeZipFileToLocal(file, location, errorChannel, fileWaitGroup)
}
fileWaitGroup.Wait()
close(errorChannel)
log.Println(errorList)
return nil
}
func errorChannelWatch(ch chan error, list []error) {
for {
select {
case err := &lt;- ch:
list = append(list, err)
}
}
}
func writeZipFileToLocal(file *zip.File, location string, ch chan error, wg *sync.WaitGroup) {
defer wg.Done()
zipFilehandle, err := file.Open()
if err != nil {
ch &lt;- err
return
}
defer zipFilehandle.Close()
if file.FileInfo().IsDir() {
if err := os.MkdirAll(filepath.Join(location, file.Name), os.ModePerm); err != nil {
ch &lt;- err
}
return
}
localFileHandle, err := os.OpenFile(filepath.Join(location, file.Name), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, file.Mode())
if err != nil {
ch &lt;- err
return
}
defer localFileHandle.Close()
if _, err := io.Copy(localFileHandle, zipFilehandle); err != nil {
ch &lt;- err
return
}
ch &lt;- fmt.Errorf(&quot;Test error&quot;)
}

So I am looping a slice of files and writing them to my disk, when there is an error I report back to the errorChannel to save that error into a slice.

I use a sync.WaitGroup to wait for all goroutines and when they are done I want to print errorList and check if there was any error during the execution.

The list is always empty, even if I add ch &lt;- fmt.Errorf(&quot;test&quot;) at the end of writeZipFileToLocal and the channel always hangs up.

I am not sure what I am missing here.

答案1

得分: 8

1. 关于第一个问题,无限循环:

引用自golang语言规范

> 对于已关闭的通道的接收操作总是可以立即进行,之后会产生元素类型的零值,而之前发送的值已经被接收。

因此,在这个函数中:

func errorChannelWatch(ch chan error, list []error) {
for {
select {
case err := &lt;- ch:
list = append(list, err)
}
}
}

在ch被关闭后,这将变成一个无限循环,向list中添加nil值。

请尝试使用以下代码替代:

func errorChannelWatch(ch chan error, list []error) {
for err := range ch {
list = append(list, err)
}
}

2. 关于第二个问题,为什么你在错误列表中看不到任何内容:

问题出在这个调用上:

errorChannel := make(chan error)
errorList := []error{}
go errorChannelWatch(errorChannel, errorList)

在这里,你将errorList作为值传递给了errorChannelWatch函数。因此,切片errorList不会被函数修改。被修改的是底层数组,只要append调用不需要分配新的数组。

为了解决这个问题,可以将切片指针传递给errorChannelWatch,或者将其重写为闭包的调用,捕获errorList

对于第一个提议的解决方案,将errorChannelWatch改为:

func errorChannelWatch(ch chan error, list *[]error) {
for err := range ch {
*list = append(*list, err)
}
}    

并将调用改为:

errorChannel := make(chan error)
errorList := []error{}
go errorChannelWatch(errorChannel, &errorList)

对于第二个提议的解决方案,只需将调用改为:

   errorChannel := make(chan error)
errorList := []error{}
go func() {
for err := range errorChannel {
errorList = append(errorList, err)
}
} () 

3. 一个小建议:

有人可能会认为这里存在一个同步问题:

fileWaitGroup.Wait()
close(errorChannel)
log.Println(errorList)

你如何确保在调用close之后,errorList不会被修改?有人可能会认为,你无法知道goroutine errorChannelWatch还有多少值需要处理。

在我看来,你的同步是正确的,因为你在向错误通道发送后执行了wg.Done(),所以当fileWaitGroup.Wait()返回时,所有的错误值都会被发送。

但是,如果以后有人给错误通道添加缓冲区或修改代码,情况可能会发生变化。

因此,我建议至少在注释中解释一下同步情况。

英文:

1. For the first point, the infinite loop:

Citing from golang language spec:

> A receive operation on a closed channel can always proceed
> immediately, yielding the element type's zero value after any
> previously sent values have been received.

So in this function

func errorChannelWatch(ch chan error, list []error) {
for {
select {
case err := &lt;- ch:
list = append(list, err)
}
}
}

after ch gets closed this turns into an infinite loop adding nil values to list.

Try this instead:

func errorChannelWatch(ch chan error, list []error) {
for err := range ch {
list = append(list, err)
}
}

2. For the second point, why you don't see anything in your error list:

The problem is this call:

errorChannel := make(chan error)
errorList := []error{}
go errorChannelWatch(errorChannel, errorList)

Here you hand errorChannelWatch the errorList as a value. So the slice errorList will not be changed by the function. What is changed, is the underlying array, as long as the append calls don't need to allocate a new one.

To remedy the situation, either hand a slice pointer to errorChannelWatch or rewrite it as a call to a closure, capturing
errorList.

For the first proposed solution, change errorChannelWatch to

func errorChannelWatch(ch chan error, list *[]error) {
for err := range ch {
*list = append(*list, err)
}
}    

and the call to

errorChannel := make(chan error)
errorList := []error{}
go errorChannelWatch(errorChannel, &amp;errorList)

For the second proposed solution, just change the call to

   errorChannel := make(chan error)
errorList := []error{}
go func() {
for err := range errorChannel {
errorList = append(errorList, err)
}
} () 

3. A minor remark:

One could think, that there is a synchronisation problem here:

fileWaitGroup.Wait()
close(errorChannel)
log.Println(errorList)

How can you be sure, that errorList isn't modified, after the call to close? One could reason, that you can't know, how many values the goroutine errorChannelWatch still has to process.

Your synchronisation seems correct to me, as you do the wg.Done()
after the send to the error channel and so all error values will
be sent, when fileWaitGroup.Wait() returns.

But that can change, if someone later adds a buffering to the error
channel or alters the code.

So I would advise to at least explain the synchronisation in a comment.

huangapple
  • 本文由 发表于 2017年8月21日 03:29:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/45786042.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定