Golang并发读取的缓冲区

huangapple go评论76阅读模式
英文:

Golang buffer with concurrent readers

问题

我想在Go语言中构建一个支持多个并发读取器和一个写入器的缓冲区。无论写入了什么数据到缓冲区,都应该被所有读取器读取。新的读取器可以随时加入,这意味着已经写入的数据必须能够被延迟的读取器回放。

这个缓冲区应该满足以下接口:

type MyBuffer interface {
    Write(p []byte) (n int, err error)
    NextReader() io.Reader
}

你对这样的实现有什么建议,最好使用内置类型?

英文:

I want to build a buffer in Go that supports multiple concurrent readers and one writer. Whatever is written to the buffer should be read by all readers. New readers are allowed to drop in at any time, which means already written data must be able to be played back for late readers.

The buffer should satisfy the following interface:

type MyBuffer interface {
    Write(p []byte) (n int, err error)
    NextReader() io.Reader
}

Do you have any suggestions for such an implementation preferably using built in types?

答案1

得分: 7

根据这个写入器的性质和使用方式,将所有内容保存在内存中(以便能够为后来加入的读者重新播放所有内容)是非常冒险的,可能需要大量的内存,或者导致应用程序由于内存不足而崩溃。

对于将所有内容保存在内存中的“低流量”记录器来说,这可能是可以接受的,但是对于例如流式传输音频或视频的情况,很可能不行。

如果下面的读取实现读取了写入缓冲区的所有数据,它们的Read()方法将正确报告io.EOF。需要注意的是,某些结构(例如bufio.Scanner)在遇到io.EOF后可能不会读取更多数据(但这不是我们实现的缺陷)。

如果你希望缓冲区的读取器在缓冲区中没有更多数据可用时等待,直到写入新数据而不是返回io.EOF,你可以在这里的“尾读取器”中包装返回的读取器:https://stackoverflow.com/questions/31120987/go-tail-f-like-generator/31122253#31122253。

“内存安全”的文件实现

这是一个非常简单而优雅的解决方案。它使用文件进行写入,并使用文件进行读取。同步基本上由操作系统提供。这不会冒内存不足的风险,因为数据仅存储在磁盘上。根据你的写入器的性质,这可能足够或不足。

我更愿意使用以下接口,因为在文件的情况下Close()很重要。

type MyBuf interface {
	io.WriteCloser
	NewReader() (io.ReadCloser, error)
}

实现非常简单:

type mybuf struct {
	*os.File
}

func (mb *mybuf) NewReader() (io.ReadCloser, error) {
	f, err := os.Open(mb.Name())
	if err != nil {
		return nil, err
	}
	return f, nil
}

func NewMyBuf(name string) (MyBuf, error) {
	f, err := os.Create(name)
	if err != nil {
		return nil, err
	}
	return &mybuf{File: f}, nil
}

我们的mybuf类型嵌入了*os.File,因此我们可以“免费”获得Write()Close()方法。

NewReader()方法只是以只读模式打开现有的后备文件并返回它,再次利用它实现了io.ReadCloser

创建一个新的MyBuf值是在NewMyBuf()函数中实现的,如果创建文件失败,它也可能返回一个error

注意:

请注意,由于mybuf嵌入了*os.File,可以通过类型断言“访问”os.File的其他公开方法,即使它们不是MyBuf接口的一部分。我不认为这是一个缺陷,但如果你想禁止这样做,你必须更改mybuf的实现,不再嵌入os.File,而是将其作为一个命名字段(但然后你必须自己添加Write()Close()方法,并正确地转发给os.File字段)。

内存实现

如果文件实现不够,这里是一个内存实现。

由于我们现在只在内存中,我们将使用以下接口:

type MyBuf interface {
	io.Writer
	NewReader() io.Reader
}

思路是存储传递给缓冲区的所有字节切片。当调用Read()时,读取器将提供存储的切片,每个读取器将跟踪其Read()方法提供的存储切片的数量。必须处理同步,我们将使用一个简单的sync.RWMutex

废话不多说,这是实现:

type mybuf struct {
	data [][]byte
	sync.RWMutex
}

func (mb *mybuf) Write(p []byte) (n int, err error) {
	if len(p) == 0 {
		return 0, nil
	}
	// 无法保留p,所以我们必须复制它:
	p2 := make([]byte, len(p))
	copy(p2, p)
	mb.Lock()
	mb.data = append(mb.data, p2)
	mb.Unlock()
	return len(p), nil
}

type mybufReader struct {
	mb   *mybuf // 我们从中读取的缓冲区
	i    int    // 下一个切片索引
	data []byte // 当前要提供的数据切片
}

func (mbr *mybufReader) Read(p []byte) (n int, err error) {
	if len(p) == 0 {
		return 0, nil
	}
	// 我们有数据要发送吗?
	if len(mbr.data) == 0 {
		mb := mbr.mb
		mb.RLock()
		if mbr.i < len(mb.data) {
			mbr.data = mb.data[mbr.i]
			mbr.i++
		}
		mb.RUnlock()
	}
	if len(mbr.data) == 0 {
		return 0, io.EOF
	}

	n = copy(p, mbr.data)
	mbr.data = mbr.data[n:]
	return n, nil
}

func (mb *mybuf) NewReader() io.Reader {
	return &mybufReader{mb: mb}
}

func NewMyBuf() MyBuf {
	return &mybuf{}
}

请注意,Writer.Write()的一般约定包括实现不能保留传递的切片,因此我们必须在“存储”之前对其进行复制。

还要注意,读取器的Read()尝试以最小的时间锁定。也就是说,只有在我们需要从缓冲区获取新的数据切片时才会锁定,并且只进行读取锁定,这意味着如果读取器有部分数据切片,将在Read()中发送而不锁定和触摸缓冲区。

英文:

Depending on the nature of this writer and how you use it, keeping everything in memory (to be able to re-play everything for readers joining later) is very risky and might demand a lot of memory, or cause your app to crash due to out of memory.

Using it for a "low-traffic" logger keeping everything in memory is probably ok, but for example streaming some audio or video is most likely not.

If the reader implementations below read all the data that was written to the buffer, their Read() method will report io.EOF, properly. Care must be taken as some constructs (such as bufio.Scanner) may not read more data once io.EOF is encountered (but this is not the flaw of our implementation).

If you want the readers of our buffer to wait if no more data is available in the buffer, to wait until new data is written instead of returning io.EOF, you may wrap the returned readers in a "tail reader" presented here: https://stackoverflow.com/questions/31120987/go-tail-f-like-generator/31122253#31122253.

"Memory-safe" file implementation

Here is an extremely simple and elegant solution. It uses a file to write to, and also uses files to read from. The synchronization is basically provided by the operating system. This does not risk out of memory error, as the data is solely stored on the disk. Depending on the nature of your writer, this may or may not be sufficient.

I will rather use the following interface, because Close() is important in case of files.

type MyBuf interface {
	io.WriteCloser
	NewReader() (io.ReadCloser, error)
}

And the implementation is extremely simple:

type mybuf struct {
	*os.File
}

func (mb *mybuf) NewReader() (io.ReadCloser, error) {
	f, err := os.Open(mb.Name())
	if err != nil {
		return nil, err
	}
	return f, nil
}

func NewMyBuf(name string) (MyBuf, error) {
	f, err := os.Create(name)
	if err != nil {
		return nil, err
	}
	return &amp;mybuf{File: f}, nil
}

Our mybuf type embeds *os.File, so we get the Write() and Close() methods for "free".

The NewReader() simply opens the existing, backing file for reading (in read-only mode) and returns it, again taking advantage of that it implements io.ReadCloser.

Creating a new MyBuf value is implementing in the NewMyBuf() function which may also return an error if creating the file fails.

Notes:

Note that since mybuf embeds *os.File, it is possible with a type assertion to "reach" other exported methods of os.File even though they are not part of the MyBuf interface. I do not consider this a flaw, but if you want to disallow this, you have to change the implementation of mybuf to not embed os.File but rather have it as a named field (but then you have to add the Write() and Close() methods yourself, properly forwarding to the os.File field).

In-memory implementation

If the file implementation is not sufficient, here comes an in-memory implementation.

Since we're now in-memory only, we will use the following interface:

type MyBuf interface {
	io.Writer
	NewReader() io.Reader
}

The idea is to store all byte slices that are ever passed to our buffer. Readers will provide the stored slices when Read() is called, each reader will keep track of how many of the stored slices were served by its Read() method. Synchronization must be dealt with, we will use a simple sync.RWMutex.

Without further ado, here is the implementation:

type mybuf struct {
	data [][]byte
	sync.RWMutex
}

func (mb *mybuf) Write(p []byte) (n int, err error) {
	if len(p) == 0 {
		return 0, nil
	}
	// Cannot retain p, so we must copy it:
	p2 := make([]byte, len(p))
	copy(p2, p)
	mb.Lock()
	mb.data = append(mb.data, p2)
	mb.Unlock()
	return len(p), nil
}

type mybufReader struct {
	mb   *mybuf // buffer we read from
	i    int    // next slice index
	data []byte // current data slice to serve
}

func (mbr *mybufReader) Read(p []byte) (n int, err error) {
	if len(p) == 0 {
		return 0, nil
	}
	// Do we have data to send?
	if len(mbr.data) == 0 {
		mb := mbr.mb
		mb.RLock()
		if mbr.i &lt; len(mb.data) {
			mbr.data = mb.data[mbr.i]
			mbr.i++
		}
		mb.RUnlock()
	}
	if len(mbr.data) == 0 {
		return 0, io.EOF
	}

	n = copy(p, mbr.data)
	mbr.data = mbr.data[n:]
	return n, nil
}

func (mb *mybuf) NewReader() io.Reader {
	return &amp;mybufReader{mb: mb}
}

func NewMyBuf() MyBuf {
	return &amp;mybuf{}
}

Note that the general contract of Writer.Write() includes that an implementation must not retain the passed slice, so we have to make a copy of it before "storing" it.

Also note that the Read() of readers attempts to lock for minimal amount of time. That is, it only locks if we need new data slice from buffer, and only does read-locking, meaning if the reader has a partial data slice, will send that in Read() without locking and touching the buffer.

答案2

得分: 1

我链接到了只追加的提交日志,因为它似乎非常符合你的要求。我对分布式系统和提交日志都不太熟悉,所以可能会误解一些概念,但是 Kafka 的介绍用漂亮的图表清楚地解释了一切。

对于我来说,Go 也是相对陌生的,所以可能有更好的方法:

也许你可以将缓冲区建模为一个切片,我认为有几种情况:

  • 缓冲区没有读取器,新数据被写入缓冲区,缓冲区长度增加

  • 缓冲区有一个/多个读取器:

    • 读取器订阅缓冲区
    • 缓冲区创建并返回一个通道给该客户端
    • 缓冲区维护一个客户端通道列表
    • 写入发生时,循环遍历所有客户端通道并向其发布消息(发布-订阅)

这解决了实时消费者流的发布-订阅问题,其中消息被广播出去,但没有解决回溯的问题。

Kafka 可以进行回溯,他们的介绍展示了如何实现:

> 这个偏移量由消费者控制:通常,消费者会线性地推进其偏移量,但实际上,由于位置由消费者控制,它可以按任意顺序消费记录。例如,消费者可以重置到较旧的偏移量以重新处理过去的数据,或者跳到最新的记录并从“现在”开始消费。
>
> 这些特性的组合意味着 Kafka 消费者非常廉价 - 它们可以随时加入和退出,对集群或其他消费者的影响不大。例如,您可以使用我们的命令行工具“tail”任何主题的内容,而不会改变任何现有消费者所消费的内容。

英文:

I linked to the append only commit log, because it seems very similar to your requirements. I am pretty new to distributed systems and the commit log so I may be butchering a couple of the concepts, but the kafka introduction clearly explains everything with nice charts.

Go is also pretty new to me, so i'm sure there's a better way to do it:

But perhaps you could model your buffer as a slice, I think a couple of cases:

  • buffer has no readers, new data is written to the buffer, buffer length grows

  • buffer has one/many reader(s):

  • reader subscribes to buffer

  • buffer creates and returns a channel to that client

  • buffer maintains a list of client channels

  • write occurs -> loops through all client channels and publishes to it (pub sub)

This addresses a pubsub real time consumer stream, where messages are fanned out, but does not address the backfill.

Kafka enables a backfill and their intro illustrates how it can be done Golang并发读取的缓冲区

> This offset is controlled by the consumer: normally a consumer will
> advance its offset linearly as it reads records, but, in fact, since
> the position is controlled by the consumer it can consume records in
> any order it likes. For example a consumer can reset to an older
> offset to reprocess data from the past or skip ahead to the most
> recent record and start consuming from "now".
>
> This combination of features means that Kafka consumers are very
> cheap—they can come and go without much impact on the cluster or on
> other consumers. For example, you can use our command line tools to
> "tail" the contents of any topic without changing what is consumed by
> any existing consumers.

答案3

得分: 1

我必须做一个类似的实验,所以分享一下:

type MultiReaderBuffer struct {
    mu  sync.RWMutex
    buf []byte
}

func (b *MultiReaderBuffer) Write(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    b.mu.Lock()
    b.buf = append(b.buf, p...)
    b.mu.Unlock()
    return len(p), nil
}

func (b *MultiReaderBuffer) NewReader() io.Reader {
    return &mrbReader{mrb: b}
}

type mrbReader struct {
    mrb *MultiReaderBuffer
    off int
}

func (r *mrbReader) Read(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    r.mrb.mu.RLock()
    n = copy(p, r.mrb.buf[r.off:])
    r.mrb.mu.RUnlock()
    if n == 0 {
        return 0, io.EOF
    }
    r.off += n
    return n, nil
}

这段代码定义了一个名为MultiReaderBuffer的结构体,它包含一个互斥锁mu和一个字节切片bufMultiReaderBuffer结构体有两个方法:WriteNewReaderWrite方法用于将字节写入buf中,NewReader方法返回一个实现了io.Reader接口的mrbReader结构体指针。

mrbReader结构体包含一个指向MultiReaderBuffer的指针mrb和一个偏移量offmrbReader结构体有一个Read方法,用于从buf中读取字节并将其复制到给定的字节切片p中。

这段代码实现了一个多读取器缓冲区,可以同时从多个读取器中读取数据。

英文:

I had to do something similar as part of an experiment, so sharing:

type MultiReaderBuffer struct {
	mu  sync.RWMutex
	buf []byte
}

func (b *MultiReaderBuffer) Write(p []byte) (n int, err error) {
	if len(p) == 0 {
		return 0, nil
	}
	b.mu.Lock()
	b.buf = append(b.buf, p...)
	b.mu.Unlock()
	return len(p), nil
}

func (b *MultiReaderBuffer) NewReader() io.Reader {
	return &amp;mrbReader{mrb: b}
}

type mrbReader struct {
	mrb *MultiReaderBuffer
	off int
}

func (r *mrbReader) Read(p []byte) (n int, err error) {
	if len(p) == 0 {
		return 0, nil
	}
	r.mrb.mu.RLock()
	n = copy(p, r.mrb.buf[r.off:])
	r.mrb.mu.RUnlock()
	if n == 0 {
		return 0, io.EOF
	}
	r.off += n
	return n, nil
}

huangapple
  • 本文由 发表于 2017年6月1日 22:58:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/44310982.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定