英文:
Long running copy to bytes.Buffer
问题
我有一个长时间运行的io.Reader
,每隔几秒返回一些数据(永远不会返回EOF),还有一个goroutine,它将该reader的数据复制到一个bytes.Buffer
中(也永远不会终止)。代码大致如下:
var src io.Reader
var buf bytes.Buffer
func main() {
go io.Copy(&buf, src)
// 做其他事情。定期从buffer中读取数据。
}
我不明白的是,当我尝试从该buffer中读取数据时,我看到了奇怪的结果。无论我是调用buf.Bytes()
还是ioutil.ReadAll(&buf)
,我都只能看到最初写入buffer的那些字节一遍又一遍地重复出现。
我的问题是,我做错了什么?我可以这样使用bytes.Buffer
吗(即io.Copy
到它并定期读取)?
英文:
I have a long lived io.Reader
which returns some data every few seconds (never EOF), and a goroutine which does an io.Copy
from that reader to a bytes.Buffer
(also never terminates). Something like so:
var src io.Reader
var buf bytes.Buffer
func main() {
go io.Copy(&buf, src)
// Do stuff. Read from the buffer periodically.
}
What I don't understand is that I see strange results when I try to read from that buffer. It doesn't matter whether I call buf.Bytes()
or ioutil.ReadAll(&buf)
or anything, I just see the first bytes written to the buffer over and over again.
https://play.golang.org/p/yn0JPrvohV
My question is, what am I doing wrong? Can I use bytes.Buffer
in this way (io.Copy
to it and read periodically)?
答案1
得分: 3
你无法将bytes.Buffer
中的写操作与io.Copy
中发生的写操作同步。即使你将bytes.Buffer
包装在一个结构体中以锁定Read/Write方法,在Copy等待写入时,ReadAll在读取时被阻塞,会导致死锁。你需要手动进行复制,并正确序列化所有访问,或者使用io.Pipe
分离读取和写入。
如果你使用FIFO(io.Pipe
)来同步读取和写入,就不需要额外的锁或通道来追踪第一个io.Reader
。下面是一个示例的read
函数,它在缓冲区满时打印输出,或者在上次打印语句后等待一段时间:
func read(r io.Reader) {
buf := make([]byte, 1024)
pos := 0
lastPrint := time.Now()
for {
n, err := r.Read(buf[pos:])
if n > 0 {
pos += n
}
if pos >= len(buf) || time.Since(lastPrint) > 125*time.Millisecond && pos > 0 {
fmt.Println("read:", buf[:pos])
lastPrint = time.Now()
pos = 0
}
if err != nil {
fmt.Println(err)
break
}
}
if pos > 0 {
fmt.Println("read:", buf[:pos])
}
}
func main() {
pr, pw := io.Pipe()
go func() {
io.Copy(pw, &trickle{})
pw.Close()
}()
read(pr)
}
你可以在这里查看代码示例:https://play.golang.org/p/8NeV3v0LOU
英文:
You can't synchronize your Read calls with the writes that are happening on the bytes.Buffer
in the io.Copy
. Even if you wrap the bytes.Buffer
in a struct to lock the Read/Write methods, you are going to deadlock when the Copy is waiting on a Write while the ReadAll is blocked on the Read. You either need to do the copy manually, and properly serialize all access, or separate the reads and writes with an io.Pipe
.
If you use a FIFO (io.Pipe
) to synchronize reads and writes, you don't need any extra locks or channels to tail the first io.Reader
. Here's an example read
function that either prints when its buffer is full, or waits some interval since the last print statement:
func read(r io.Reader) {
buf := make([]byte, 1024)
pos := 0
lastPrint := time.Now()
for {
n, err := r.Read(buf[pos:])
if n > 0 {
pos += n
}
if pos >= len(buf) || time.Since(lastPrint) > 125*time.Millisecond && pos > 0 {
fmt.Println("read:", buf[:pos])
lastPrint = time.Now()
pos = 0
}
if err != nil {
fmt.Println(err)
break
}
}
if pos > 0 {
fmt.Println("read:", buf[:pos])
}
}
func main() {
pr, pw := io.Pipe()
go func() {
io.Copy(pw, &trickle{})
pw.Close()
}()
read(pr)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论