如何从goroutine的通道中连续接收数据

huangapple go评论82阅读模式
英文:

How to continuously receive data from channel of goroutine

问题

我是Golang的初学者。我做了一个关于Go通道的练习。在主goroutine中,我打开并读取一个文件的数据,然后将数据通过通道传递给第二个goroutine,以保存到另一个文件中。

我的代码如下:

func main() {
    f, err := os.OpenFile("test.go", os.O_RDONLY, 0600)
    ch := make(chan []byte)
    buf := make([]byte, 10)
    bytes_len, err := f.Read(buf)
    fmt.Println("ReadLen:", bytes_len)
    if err != nil {
        fmt.Println("Error:", err)
        return
    }
    go WriteFile(ch)
    for {
        ch <- buf
        bytes_len, err = f.Read(buf)
        if err != nil {
            fmt.Println("error=", err)
            break
        }
        if bytes_len < 10 {
            ch <- buf[:bytes_len]
            fmt.Println("Finished!")
            break
        }
    }
    time.Sleep(1e9)
    f.Close()
}

func WriteFile(ch <-chan []byte) {
    fmt.Println("* begin!")
    f, err := os.OpenFile("/home/GoProgram/test/test.file", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
    if err != nil {
        fmt.Println("* Error:", err)
        return
    }
    // Method 2: use "for {if}", this will get messed text in target file, not identical with the source file.
    for {
        if bytes, ok := <-ch; ok {
            f.Write(bytes)
            fmt.Println("* buff=", string(bytes))
            bytes = nil
            ok = false
        } else {
            fmt.Println("** End ", string(bytes), "  ", ok)
            break
        }
    }
    f.Close()
}

我的问题是为什么方法2和方法3会在目标文件中得到混乱的文本?我该如何修复它?

英文:

I am a beginner of the Golang. I had made a practice about Go channel. Which I open and read data from a file in the main goroutine, then pass the data to the second goroutine to save to another file with channel.
My code is as flows

  func main() {
f, err := os.OpenFile(&quot;test.go&quot;, os.O_RDONLY, 0600)
ch := make(chan []byte)
buf := make([]byte, 10)
bytes_len, err := f.Read(buf)
fmt.Println(&quot;ReadLen:&quot;, bytes_len)
if err != nil {
fmt.Println(&quot;Error: &quot;, err)
return
}
go WriteFile(ch)
for {
ch&lt;-buf
bytes_len, err = f.Read(buf)
if err != nil {
fmt.Println(&quot;error=&quot;, err)
break
}
if bytes_len &lt; 10 {
ch&lt;-buf[:bytes_len]
fmt.Println(&quot;Finished!&quot;)
break
}
}
time.Sleep(1e9)
f.Close()
}
func WriteFile(ch &lt;-chan []byte) {
fmt.Println(&quot;* begin!&quot;)
f, err := os.OpenFile(&quot;/home/GoProgram/test/test.file&quot;,  os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
if err != nil {
fmt.Println(&quot;* Error:&quot;, err)
return
}
/* Method1:  use the &quot;select&quot; will write to target file OK, but it is too slow!!!
for {
select {
case bytes, ok:= &lt;-ch:
if ok {
f.Write(bytes)
} else {
fmt.Println(&quot;* file closed!&quot;)
break
}
default:
fmt.Println(&quot;* waiting data!&quot;)
}
} \*/
// Method 2: use &quot;for {if}&quot;, this will get messed text in target file, not identical with the source file.
for {
if bytes, ok := &lt;-ch; ok {
f.Write(bytes)
fmt.Println(&quot;* buff=&quot;, string(bytes))
bytes = nil
ok = false
} else {
fmt.Println(&quot;** End &quot;, string(bytes), &quot;  &quot;, ok)
break
}
}
/* Method 3: use &quot;for range&quot;, this will get messed text like in method2
for data:= range ch {
f.Write(data)
//fmt.Println(&quot;* Data:&quot;, string(data))
}
\*/
f.Close()
}

My question is why the Method2 and Method3 will get messed text in the target file? How can I fix it?

答案1

得分: 4

Method2和Method3的文本混乱是因为读取器和写入器共享的缓冲区存在竞争。

以下是上述程序可能的语句执行顺序:

 R: bytes_len, err = f.Read(buf)  
R: ch<-buf[:bytes_len]
W: bytes, ok := <-ch; ok
R: bytes_len, err = f.Read(buf)  // 这会覆盖缓冲区
W: f.Write(bytes)                // 从第二次读取写入数据

使用竞争检测器运行程序。它会为您标记出问题。

解决问题的一种方法是复制数据。例如,从读取的字节创建一个字符串,并将字符串发送到通道。

另一种选择是使用io.Pipe连接goroutine。一个goroutine从源读取并写入管道。另一个goroutine从管道读取并写入目标。管道会处理同步问题。

英文:

Method2 and Method3 get messed up text because there's a race on the buffer shared by the reader and writer.

Here's a possible sequence of statement execution for the program above:

 R: bytes_len, err = f.Read(buf)  
R: ch&lt;-buf[:bytes_len]
W: bytes, ok := &lt;-ch; ok
R: bytes_len, err = f.Read(buf)  // this writes over buffer
W: f.Write(bytes)                // writes data from second read

Run your program with the race dectector. It will flag the issues for you.

One way to fix the problem is to copy the data. For example, create a string from the bytes read and send the string to the channel.

Another option is to connect the goroutines with an io.Pipe. One goroutine reads from the source and writes to the pipe. The other goroutine reads from the pipe and writes to the destination. The pipe takes care of the synchronization issues.

答案2

得分: 1

为了使用你在注释中提到的Method2Method3中的for循环来获取代码片段,你需要使用一个带有缓冲的通道。

导致目标文件中文本混乱的原因是func main中的循环没有与WriteFile中监听通道的循环同步的机制。

另一方面,向带有缓冲的通道发送数据只有在缓冲区满时才会阻塞。接收数据只有在缓冲区为空时才会阻塞。因此,通过初始化一个长度为1的缓冲通道,你可以使用Method1和/或Method2。唯一需要记住的是在完成后关闭通道。

func main() {
    f, _ := os.OpenFile("test.txt", os.O_RDONLY, 0600)
    defer f.Close()
    ch := make(chan []byte, 1) // 使用make的第二个参数来指定缓冲区长度为1
    buf := make([]byte, 10)
    go WriteFile(ch)
    for {
        ch <- buf
        byteLen, err := f.Read(buf)
        if err != nil {
            break
        }
        if byteLen < 10 {
            ch <- buf[:byteLen]
            break
        }
    }
    close(ch) // 在完成后关闭通道
}

func WriteFile(ch <-chan []byte) {
    f, err := os.OpenFile("othertest.txt", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
    defer f.Close()
    if err != nil {
        fmt.Println("* Error:", err)
        return
    }

    // Method 3: 使用"for range"
    for data := range ch {
        f.Write(data)
    }
}
英文:

In order to get the code snippets using the for loops in what you have put in comments as Method2 and Method3, you will need to use a buffered channel.

The reason the text gets messed up in the target file is the loop in func main has no mechanism to synchronize in lock step with the loops listening on the channel in WriteFile.

Sends to a buffered channel, on the other hand, block only when the buffer is full. Receives block when the buffer is empty. So by initializing a channel with a buffer length of one, your can use Method1 and/or Method2. All that's left is remembering to close the channel when you are done.

func main() {
f, _ := os.OpenFile(&quot;test.txt&quot;, os.O_RDONLY, 0600)
defer f.Close()
ch := make(chan []byte, 1) // use second argument to make to give buffer length 1
buf := make([]byte, 10)
go WriteFile(ch)
for {
ch &lt;- buf
byteLen, err := f.Read(buf)
if err != nil {
break
}
if byteLen &lt; 10 {
ch &lt;- buf[:byteLen]
break
}
}
close(ch) //close the channel when you done
}
func WriteFile(ch &lt;-chan []byte) {
f, err := os.OpenFile(&quot;othertest.txt&quot;, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
defer f.Close()
if err != nil {
fmt.Println(&quot;* Error:&quot;, err)
return
}
//Method 3: use &quot;for range&quot;
for data := range ch {
f.Write(data)
}
}

huangapple
  • 本文由 发表于 2015年12月11日 10:54:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/34215373.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定