如何从一个 io.Reader 中创建多个消费者?

huangapple go评论195阅读模式

How to have multiple consumer from one io.Reader?



  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "log"
  7. "net/http"
  8. "time"
  9. )
  10. func main() {
  11. err := request("http://www.google.com")
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. // just keep main alive with sleep for now
  16. time.Sleep(2 * time.Second)
  17. }
  18. func request(url string) error {
  19. res, err := http.Get(url)
  20. if err != nil {
  21. return err
  22. }
  23. go scanLineWise(res.Body)
  24. go scanWordWise(res.Body)
  25. return err
  26. }
  27. func scanLineWise(r io.Reader) {
  28. s := bufio.NewScanner(r)
  29. s.Split(bufio.ScanLines)
  30. i := 0
  31. for s.Scan() {
  32. i++
  33. }
  34. fmt.Printf("Counted %d lines.\n", i)
  35. }
  36. func scanWordWise(r io.Reader) {
  37. s := bufio.NewScanner(r)
  38. s.Split(bufio.ScanWords)
  39. i := 0
  40. for s.Scan() {
  41. i++
  42. }
  43. fmt.Printf("Counted %d words.\n", i)
  44. }





I am working on a small script which uses bufio.Scanner and http.Request as well as go routines to count words and lines in parallel.

  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "log"
  7. "net/http"
  8. "time"
  9. )
  10. func main() {
  11. err := request("http://www.google.com")
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. // just keep main alive with sleep for now
  16. time.Sleep(2 * time.Second)
  17. }
  18. func request(url string) error {
  19. res, err := http.Get(url)
  20. if err != nil {
  21. return err
  22. }
  23. go scanLineWise(res.Body)
  24. go scanWordWise(res.Body)
  25. return err
  26. }
  27. func scanLineWise(r io.Reader) {
  28. s := bufio.NewScanner(r)
  29. s.Split(bufio.ScanLines)
  30. i := 0
  31. for s.Scan() {
  32. i++
  33. }
  34. fmt.Printf("Counted %d lines.\n", i)
  35. }
  36. func scanWordWise(r io.Reader) {
  37. s := bufio.NewScanner(r)
  38. s.Split(bufio.ScanWords)
  39. i := 0
  40. for s.Scan() {
  41. i++
  42. }
  43. fmt.Printf("Counted %d words.\n", i)
  44. }


As more or less expected from streams scanLineWise will count a number while scalWordWise will count zero. This is because scanLineWise already reads everything from req.Body.

I would know like to know: How to solve this elegantly?

My first thought was to build a struct which implements io.Reader and io.Writer. We could use io.Copy to read from req.Body and write it to the writer. When the scanners read from this writer then writer will copy the data instead of reading it. Unfortunately this will just collect memory over time and break the whole idea of streams...


得分: 22




  1. ...
  2. pipeReader, pipeWriter := io.Pipe()
  3. bodyReader := io.TeeReader(res.Body, pipeWriter)
  4. go scanLineWise(bodyReader)
  5. go scanWordWise(pipeReader)
  6. ...


  1. ...
  2. pipeOneR, pipeOneW := io.Pipe()
  3. pipeTwoR, pipeTwoW := io.Pipe()
  4. pipeThreeR, pipeThreeW := io.Pipe()
  5. go scanLineWise(pipeOneR)
  6. go scanWordWise(pipeTwoR)
  7. go scanSomething(pipeThreeR)
  8. // 当然,这里可能需要一些错误处理
  9. io.Copy(io.MultiWriter(pipeOneW, pipeTwoW, pipeThreeW), res.Body)
  10. ...

The options are pretty straightforward -- you either maintain the "stream" of data, or you buffer the body.

If you really do need to read over the body more then once sequentially, you need to buffer it somewhere. There's no way around that.

There's a number of way you could stream the data, like having the line counter output lines into the word counter (preferably through channels). You could also build a pipeline using io.TeeReader and io.Pipe, and supply a unique reader for each function.

  1. ...
  2. pipeReader, pipeWriter := io.Pipe()
  3. bodyReader := io.TeeReader(res.Body, pipeWriter)
  4. go scanLineWise(bodyReader)
  5. go scanWordWise(pipeReader)
  6. ...

That can get unwieldy with more consumers though, so you could use io.MultiWriter to multiplex to more io.Readers.

  1. ...
  2. pipeOneR, pipeOneW := io.Pipe()
  3. pipeTwoR, pipeTwoW := io.Pipe()
  4. pipeThreeR, pipeThreeW := io.Pipe()
  5. go scanLineWise(pipeOneR)
  6. go scanWordWise(pipeTwoR)
  7. go scanSomething(pipeThreeR)
  8. // of course, this should probably have some error handling
  9. io.Copy(io.MultiWriter(pipeOneW, pipeTwoW, pipeThreeW), res.Body)
  10. ...


得分: 4


  1. func countLines(r io.Reader) (ch chan string) {
  2. ch = make(chan string)
  3. go func() {
  4. s := bufio.NewScanner(r)
  5. s.Split(bufio.ScanLines)
  6. cnt := 0
  7. for s.Scan() {
  8. ch <- s.Text()
  9. cnt++
  10. }
  11. close(ch)
  12. fmt.Printf("Counted %d lines.\n", cnt)
  13. }()
  14. return
  15. }
  16. func countWords(ch <-chan string) {
  17. cnt := 0
  18. for line := range ch {
  19. s := bufio.NewScanner(strings.NewReader(line))
  20. s.Split(bufio.ScanWords)
  21. for s.Scan() {
  22. cnt++
  23. }
  24. }
  25. fmt.Printf("Counted %d words.\n", cnt)
  26. }
  27. func main() {
  28. r := strings.NewReader(body)
  29. ch := countLines(r)
  30. go countWords(ch)
  31. time.Sleep(1 * time.Second)
  32. }


countWords函数接受一个<-chan string类型的参数,即接收字符串类型的通道。在该函数中,我们使用bufio.NewScanner来创建一个扫描器s,并使用bufio.ScanWords作为分隔符,逐个单词地扫描每一行。每扫描到一个单词,计数器cnt就加一。最后,我们打印单词数。



You could use channels, do the actual reading in your scanLineWise then pass the lines to scanWordWise, for example:

  1. func countLines(r io.Reader) (ch chan string) {
  2. ch = make(chan string)
  3. go func() {
  4. s := bufio.NewScanner(r)
  5. s.Split(bufio.ScanLines)
  6. cnt := 0
  7. for s.Scan() {
  8. ch &lt;- s.Text()
  9. cnt++
  10. }
  11. close(ch)
  12. fmt.Printf(&quot;Counted %d lines.\n&quot;, cnt)
  13. }()
  14. return
  15. }
  16. func countWords(ch &lt;-chan string) {
  17. cnt := 0
  18. for line := range ch {
  19. s := bufio.NewScanner(strings.NewReader(line))
  20. s.Split(bufio.ScanWords)
  21. for s.Scan() {
  22. cnt++
  23. }
  24. }
  25. fmt.Printf(&quot;Counted %d words.\n&quot;, cnt)
  26. }
  27. func main() {
  28. r := strings.NewReader(body)
  29. ch := countLines(r)
  30. go countWords(ch)
  31. time.Sleep(1 * time.Second)
  32. }

  • 本文由 发表于 2014年7月10日 21:02:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/24677285.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
