如何从一个 io.Reader 中创建多个消费者?

huangapple go评论174阅读模式
英文:

How to have multiple consumer from one io.Reader?

问题

我正在处理一个小脚本,使用bufio.Scannerhttp.Request以及Go协程并行计算单词和行数。

package main

import (
	"bufio"
	"fmt"
	"io"
	"log"
	"net/http"
	"time"
)

func main() {
	err := request("http://www.google.com")

	if err != nil {
		log.Fatal(err)
	}

	// just keep main alive with sleep for now
	time.Sleep(2 * time.Second)
}

func request(url string) error {
	res, err := http.Get(url)

	if err != nil {
		return err
	}

	go scanLineWise(res.Body)
	go scanWordWise(res.Body)

	return err
}

func scanLineWise(r io.Reader) {
	s := bufio.NewScanner(r)
	s.Split(bufio.ScanLines)

	i := 0

	for s.Scan() {
		i++
	}

	fmt.Printf("Counted %d lines.\n", i)
}

func scanWordWise(r io.Reader) {
	s := bufio.NewScanner(r)
	s.Split(bufio.ScanWords)

	i := 0

	for s.Scan() {
		i++
	}

	fmt.Printf("Counted %d words.\n", i)
}

根据预期,scanLineWise将计算一定数量的行,而scanWordWise将计算零个单词。这是因为scanLineWise已经从req.Body中读取了所有内容。

我想知道:如何优雅地解决这个问题?

我最初的想法是构建一个实现io.Readerio.Writer的结构体。我们可以使用io.Copyreq.Body读取并将其写入writer。当扫描器从该写入器读取时,写入器将复制数据而不是读取它。不幸的是,这只会随着时间的推移收集内存并破坏流的整个概念...

英文:

I am working on a small script which uses bufio.Scanner and http.Request as well as go routines to count words and lines in parallel.

package main
 
import (
	"bufio"
	"fmt"
	"io"
	"log"
	"net/http"
	"time"
)
 
func main() {
	err := request("http://www.google.com")
 
	if err != nil {
		log.Fatal(err)
	}
 
	// just keep main alive with sleep for now
	time.Sleep(2 * time.Second)
}
 
func request(url string) error {
	res, err := http.Get(url)
 
	if err != nil {
		return err
	}
 
	go scanLineWise(res.Body)
	go scanWordWise(res.Body)
 
	return err
}
 
func scanLineWise(r io.Reader) {
	s := bufio.NewScanner(r)
	s.Split(bufio.ScanLines)
 
	i := 0
 
	for s.Scan() {
		i++
	}
 
	fmt.Printf("Counted %d lines.\n", i)
}
 
func scanWordWise(r io.Reader) {
	s := bufio.NewScanner(r)
	s.Split(bufio.ScanWords)
 
	i := 0
 
	for s.Scan() {
		i++
	}
 
	fmt.Printf("Counted %d words.\n", i)
}

Source

As more or less expected from streams scanLineWise will count a number while scalWordWise will count zero. This is because scanLineWise already reads everything from req.Body.

I would know like to know: How to solve this elegantly?

My first thought was to build a struct which implements io.Reader and io.Writer. We could use io.Copy to read from req.Body and write it to the writer. When the scanners read from this writer then writer will copy the data instead of reading it. Unfortunately this will just collect memory over time and break the whole idea of streams...

答案1

得分: 22

选项非常简单--你要么保持数据的“流动”,要么缓冲数据体。

如果你确实需要按顺序多次读取数据体,你需要将其缓冲到某个地方。这是无法避免的。

有多种方法可以流式传输数据,比如让行计数器将行输出到单词计数器(最好通过通道)。你还可以使用io.TeeReaderio.Pipe构建一个管道,并为每个函数提供一个唯一的读取器。

...
pipeReader, pipeWriter := io.Pipe()
bodyReader := io.TeeReader(res.Body, pipeWriter)
go scanLineWise(bodyReader)
go scanWordWise(pipeReader)
...

不过,如果有更多的消费者,这可能会变得难以管理,所以你可以使用io.MultiWriter将数据多路复用到多个io.Reader

...
pipeOneR, pipeOneW := io.Pipe()
pipeTwoR, pipeTwoW := io.Pipe()
pipeThreeR, pipeThreeW := io.Pipe()

go scanLineWise(pipeOneR)
go scanWordWise(pipeTwoR)
go scanSomething(pipeThreeR)

// 当然,这里可能需要一些错误处理
io.Copy(io.MultiWriter(pipeOneW, pipeTwoW, pipeThreeW), res.Body)
...
英文:

The options are pretty straightforward -- you either maintain the "stream" of data, or you buffer the body.

If you really do need to read over the body more then once sequentially, you need to buffer it somewhere. There's no way around that.

There's a number of way you could stream the data, like having the line counter output lines into the word counter (preferably through channels). You could also build a pipeline using io.TeeReader and io.Pipe, and supply a unique reader for each function.

...
pipeReader, pipeWriter := io.Pipe()
bodyReader := io.TeeReader(res.Body, pipeWriter)
go scanLineWise(bodyReader)
go scanWordWise(pipeReader)
...

That can get unwieldy with more consumers though, so you could use io.MultiWriter to multiplex to more io.Readers.

...
pipeOneR, pipeOneW := io.Pipe()
pipeTwoR, pipeTwoW := io.Pipe()
pipeThreeR, pipeThreeW := io.Pipe()

go scanLineWise(pipeOneR)
go scanWordWise(pipeTwoR)
go scanSomething(pipeThreeR)

// of course, this should probably have some error handling
io.Copy(io.MultiWriter(pipeOneW, pipeTwoW, pipeThreeW), res.Body)
...

答案2

得分: 4

你可以使用通道,在scanLineWise中进行实际的读取,然后将行传递给scanWordWise。例如:

func countLines(r io.Reader) (ch chan string) {
    ch = make(chan string)
    go func() {
        s := bufio.NewScanner(r)
        s.Split(bufio.ScanLines)

        cnt := 0

        for s.Scan() {
            ch <- s.Text()
            cnt++
        }
        close(ch)
        fmt.Printf("Counted %d lines.\n", cnt)
    }()

    return
}

func countWords(ch <-chan string) {
    cnt := 0
    for line := range ch {
        s := bufio.NewScanner(strings.NewReader(line))
        s.Split(bufio.ScanWords)
        for s.Scan() {
            cnt++
        }
    }
    fmt.Printf("Counted %d words.\n", cnt)
}

func main() {
    r := strings.NewReader(body)
    ch := countLines(r)
    go countWords(ch)
    time.Sleep(1 * time.Second)
}

这段代码使用了两个函数countLinescountWords来分别计算行数和单词数。countLines函数接受一个io.Reader类型的参数,并返回一个通道ch。在该函数中,我们使用bufio.NewScanner来创建一个扫描器s,并使用bufio.ScanLines作为分隔符,逐行读取输入。然后,我们将每一行的内容发送到通道ch中,并计算行数。最后,我们关闭通道并打印行数。

countWords函数接受一个<-chan string类型的参数,即接收字符串类型的通道。在该函数中,我们使用bufio.NewScanner来创建一个扫描器s,并使用bufio.ScanWords作为分隔符,逐个单词地扫描每一行。每扫描到一个单词,计数器cnt就加一。最后,我们打印单词数。

main函数中,我们创建一个strings.NewReader来读取输入内容,并将其传递给countLines函数。然后,我们使用go关键字在后台启动countWords函数来计算单词数。最后,我们使用time.Sleep函数暂停一秒钟,以确保所有的计算都完成。

英文:

You could use channels, do the actual reading in your scanLineWise then pass the lines to scanWordWise, for example:

func countLines(r io.Reader) (ch chan string) {
	ch = make(chan string)
	go func() {
		s := bufio.NewScanner(r)
		s.Split(bufio.ScanLines)

		cnt := 0

		for s.Scan() {
			ch &lt;- s.Text()
			cnt++
		}
		close(ch)
		fmt.Printf(&quot;Counted %d lines.\n&quot;, cnt)
	}()

	return
}

func countWords(ch &lt;-chan string) {
	cnt := 0
	for line := range ch {
		s := bufio.NewScanner(strings.NewReader(line))
		s.Split(bufio.ScanWords)
		for s.Scan() {
			cnt++
		}
	}
	fmt.Printf(&quot;Counted %d words.\n&quot;, cnt)
}

func main() {
	r := strings.NewReader(body)
	ch := countLines(r)
	go countWords(ch)
	time.Sleep(1 * time.Second)
}

huangapple
  • 本文由 发表于 2014年7月10日 21:02:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/24677285.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定