英文:
Read continuously from a named pipe
问题
我想知道在使用golang连续读取命名管道时,我还有哪些其他选项。我的当前代码依赖于在goroutine中运行的无限for循环;但这会使一个CPU的使用率保持在100%。
func main() {
....
var wg sync.WaitGroup
fpipe, _ := os.OpenFile(namedPipe, os.O_RDONLY, 0600)
defer fpipe.Close()
f, _ := os.Create("dump.txt")
defer f.Close()
var buff bytes.Buffer
wg.Add(1)
go func() {
for {
io.Copy(&buff, fpipe)
if buff.Len() > 0 {
buff.WriteTo(f)
}
}
}()
wg.Wait()
}
英文:
I would like to know what other options I have in order to read continuously from a named pipe using golang. My current code relies on a infinite for loop running inside a gorutine; but hat keeps one CPU at 100% usage.
func main() {
....
var wg sync.WaitGroup
fpipe, _ := os.OpenFile(namedPipe, os.O_RDONLY, 0600)
defer fpipe.Close()
f, _ := os.Create("dump.txt")
defer f.Close()
var buff bytes.Buffer
wg.Add(1)
go func() {
for {
io.Copy(&buff, fpipe)
if buff.Len() > 0 {
buff.WriteTo(f)
}
}
}()
wg.Wait()
}
答案1
得分: 3
简介
如前所述,如果没有写入者,命名管道读取器将接收到EOF。
然而,我认为@JimB的解决方案不够理想:
- 命名管道有一个最大容量(65kB,如果我没记错的话),在100毫秒的休眠期间可能会被填满。当缓冲区被填满时,所有的写入者都会无故地被阻塞。
- 如果发生重新启动,平均会丢失50毫秒的数据。同样,没有充分的理由。
- 如果你想使用静态缓冲区进行复制,
io.CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error)
将是更好的解决方案,我认为。但这甚至不是必需的,因为io.Copy
(或底层实现)实际上会分配一个32kB的缓冲区。
我的方法
一个更好的解决方案是等待写入事件发生,然后立即将命名管道的内容复制到目标文件中。在大多数系统上,文件系统事件会有某种形式的通知。可以使用github.com/rjeczalik/notify包来访问我们感兴趣的事件,因为写入事件在大多数重要的操作系统上都可以跨平台使用。对我们来说还有另一个有趣的事件是命名管道的删除,因为我们将无法从中读取任何内容。
因此,我的解决方案如下:
package main
import (
"flag"
"io"
"log"
"os"
"github.com/rjeczalik/notify"
)
const (
MAX_CONCURRENT_WRITERS = 5
)
var (
pipePath string
filePath string
)
func init() {
flag.StringVar(&pipePath, "pipe", "", "/path/to/named_pipe to read from")
flag.StringVar(&filePath, "file", "out.txt", "/path/to/output file")
log.SetOutput(os.Stderr)
}
func main() {
flag.Parse()
var p, f *os.File
var err error
var e notify.EventInfo
// 常规操作:检查命名管道是否存在等等
if p, err = os.Open(pipePath); os.IsNotExist(err) {
log.Fatalf("Named pipe '%s' does not exist", pipePath)
} else if os.IsPermission(err) {
log.Fatalf("Insufficient permissions to read named pipe '%s': %s", pipePath, err)
} else if err != nil {
log.Fatalf("Error while opening named pipe '%s': %s", pipePath, err)
}
// 是的,存在且可读。在退出时关闭文件句柄
defer p.Close()
// 对输出文件做同样的操作
if f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600); os.IsNotExist(err) {
log.Fatalf("File '%s' does not exist", filePath)
} else if os.IsPermission(err) {
log.Fatalf("Insufficient permissions to open/create file '%s' for appending: %s", filePath, err)
} else if err != nil {
log.Fatalf("Error while opening file '%s' for writing: %err", filePath, err)
}
// 再次,在退出时关闭文件句柄
defer f.Close()
// 这就是发生的地方。我们为可能发生的事件创建了一个带缓冲的通道。
// 我们将其缓冲到预期并发写入者的数量,是因为如果所有写入者(理论上)同时写入,
// 或者至少非常接近彼此,可能会丢失事件。这是由于底层实现而不是由于Go本身。
c := make(chan notify.EventInfo, MAX_CONCURRENT_WRITERS)
// 在这里,我们告诉notify监视命名管道的事件,特别是写入和删除事件。
// 我们还监视删除事件,因为这会删除我们从中读取的文件句柄,使读取变得不可能。
notify.Watch(pipePath, c, notify.Write|notify.Remove)
// 我们开始一个无限循环...
for {
// ...等待传递的事件。
e = <-c
switch e.Event() {
case notify.Write:
// 如果是写入事件,我们将命名管道的内容复制到我们的输出文件中,
// 然后等待下一个事件的发生。
// 注意,这是幂等的:即使我们有多个写入者对命名管道进行大量写入,
// 第一次调用Copy将复制内容。复制数据的时间可能比生成事件的时间长。
// 然而,后续的调用可能不会复制任何内容,但这并不会造成任何损害。
io.Copy(f, p)
case notify.Remove:
// 一些用户或进程已经删除了命名管道,
// 所以我们没有任何可读取的内容了。
// 我们应该通知用户并退出。
log.Fatalf("Named pipe '%s' was removed. Quitting", pipePath)
}
}
}
希望这可以帮助到你!
英文:
Intro
As already written, a named pipe reader will receive an EOF if no writers are left.
However I find @JimB's solution less than optimal:
- A named pipe has a maximum capacity (65kB, iirc), which may well get filled within the 100msec sleep period. When the buffer is filled, all writers would block for no good reason.
- If a reboot happens, you will loose 50ms worth of data on average. Again, for no good reason.
- If you want to use a static buffer for copying,
io.CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error)
would be the better solution, imho. But this is not even necessary, asio.Copy
(or the underlying implementation) actually allocates a buffer of 32kB.
My approach
A better solution would be to wait for a write to happen and the immediately copy the contents of the named pipe to the destination file. On most systems, there is some sort of notification on file system events. The package github.com/rjeczalik/notify can be used to access the events we are interested in, as write events work cross platform on most of the important OSes. The other event which would be interesting for us is the removal of the named pipe, since we would not have anything to read from.
Hence, my solution would be:
package main
import (
"flag"
"io"
"log"
"os"
"github.com/rjeczalik/notify"
)
const (
MAX_CONCURRENT_WRITERS = 5
)
var (
pipePath string
filePath string
)
func init() {
flag.StringVar(&pipePath, "pipe", "", "/path/to/named_pipe to read from")
flag.StringVar(&filePath, "file", "out.txt", "/path/to/output file")
log.SetOutput(os.Stderr)
}
func main() {
flag.Parse()
var p, f *os.File
var err error
var e notify.EventInfo
// The usual stuff: checking wether the named pipe exists etc
if p, err = os.Open(pipePath); os.IsNotExist(err) {
log.Fatalf("Named pipe '%s' does not exist", pipePath)
} else if os.IsPermission(err) {
log.Fatalf("Insufficient permissions to read named pipe '%s': %s", pipePath, err)
} else if err != nil {
log.Fatalf("Error while opening named pipe '%s': %s", pipePath, err)
}
// Yep, there and readable. Close the file handle on exit
defer p.Close()
// Do the same for the output file
if f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600); os.IsNotExist(err) {
log.Fatalf("File '%s' does not exist", filePath)
} else if os.IsPermission(err) {
log.Fatalf("Insufficient permissions to open/create file '%s' for appending: %s", filePath, err)
} else if err != nil {
log.Fatalf("Error while opening file '%s' for writing: %err", filePath, err)
}
// Again, close the filehandle on exit
defer f.Close()
// Here is where it happens. We create a buffered channel for events which might happen
// on the file. The reason why we make it buffered to the number of expected concurrent writers
// is that if all writers would (theoretically) write at once or at least pretty close
// to each other, it might happen that we loose event. This is due to the underlying implementations
// not because of go.
c := make(chan notify.EventInfo, MAX_CONCURRENT_WRITERS)
// Here we tell notify to watch out named pipe for events, Write and Remove events
// specifically. We watch for remove events, too, as this removes the file handle we
// read from, making reads impossible
notify.Watch(pipePath, c, notify.Write|notify.Remove)
// We start an infinite loop...
for {
// ...waiting for an event to be passed.
e = <-c
switch e.Event() {
case notify.Write:
// If it a a write event, we copy the content of the named pipe to
// our output file and wait for the next event to happen.
// Note that this is idempotent: Even if we have huge writes by multiple
// writers on the named pipe, the first call to Copy will copy the contents.
// The time to copy that data may well be longer than it takes to generate the events.
// However, subsequent calls may copy nothing, but that does not do any harm.
io.Copy(f, p)
case notify.Remove:
// Some user or process has removed the named pipe,
// so we have nothing left to read from.
// We should inform the user and quit.
log.Fatalf("Named pipe '%s' was removed. Quitting", pipePath)
}
}
}
答案2
得分: 2
一个命名管道读取器在没有写入器时会收到EOF。在这段代码之外的解决方案是确保始终有一个写入器进程持有文件描述符,尽管它不需要写入任何内容。
在Go程序中,如果你想等待一个新的写入器,你将需要在for循环中轮询io.Reader
。你当前的代码使用了一个忙循环,这将消耗1个CPU核心的100%。添加一个睡眠和一种在其他错误发生时返回的方法可以解决这个问题:
for {
err := io.Copy(&buff, fpipe)
if buff.Len() > 0 {
buff.WriteTo(f)
}
if err != nil {
// 发生了除EOF之外的其他错误
return
}
time.Sleep(100 * time.Millisecond)
}
英文:
A named pipe reader will receive EOF when there are no writers left. The solution outside of this code is to make sure there is always one writer process holding the file descriptor, though it doesn't need to write anything.
Within the Go program, if you want to wait for a new writer, you will have to poll the io.Reader
in your for loop. Your current code does this with a busy loop, which will consume 100% of 1 cpu core. Adding a sleep and a way to return on other errors will work around the issue:
for {
err := io.Copy(&buff, fpipe)
if buff.Len() > 0 {
buff.WriteTo(f)
}
if err != nil {
// something other than EOF happened
return
}
time.Sleep(100 * time.Millisecond)
}
答案3
得分: 0
问题:当“最后的写入者”关闭管道时,即使后来可能出现新的写入者,你也会得到一个EOF。
解决方案:为写入打开管道,并且不要关闭它。现在,你可以将读取端视为永不结束的读取,而不会收到EOF。将以下代码直接放在打开管道进行读取的位置之后:
nullWriter, err := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
logger.Crit("打开管道进行(占位符)写入时出错", "err", err)
}
defer nullWriter.Close()
英文:
Problem: when the 'last writer' closes the pipe, you get an EOF even though there may be new writers that appear later.
Solution: open the pipe yourself for write and dont close it. Now you can treat the read side as a never ending read without ever getting EOF. Put the following directly after where you open your pipe for read:
nullWriter, err := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
logger.Crit("Error opening pipe for (placeholder) write", "err", err)
}
defer nullWriter.Close()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论