Go pipeline using channels

huangapple go评论83阅读模式
英文:

Go pipeline using channels

问题

我正在探索Go语言,并尝试使用通道设置一种类似管道的结构。我只想在main()函数中读取一些内容,并将它们发送到process()函数进行处理,这里只是将值打印到屏幕上。

不幸的是,在下面的代码中,似乎process()函数从通道中没有读取任何内容,或者至少没有打印任何内容;我做错了什么?

package main

import (
	"fmt"
	"database/sql"
	_ "github.com/lib/pq"
	"time"
	"gopkg.in/redis.v3"
)

type Record struct {
	userId, myDate int
	prodUrl        string
}

func main() {

	//连接数据库
	db, err := sql.Open(...)
	defer db.Close()

	//在这里进行错误检查...

	//执行查询
	rows, err := db.Query("select userID,url,date from mytable limit 10")
	defer rows.Close()

	//在这里进行错误检查...

	//创建用于缓冲读取的通道
	bufferChan := make(chan *Record, 1000)
	go process(bufferChan)

	//遍历结果并将其发送到process()函数
	row := new(Record)
	for rows.Next() {
		err := rows.Scan(&row.userId, &row.prodUrl, &row.myDate)
		bufferChan <- row
		fmt.Printf("row sent %v", row.userId)
	}
}

//打印Record的值
func process(buffer chan *Record) {
	row := <-buffer
	fmt.Printf("row received: %d %v %d ", row.userId, row.prodUrl, row.myDate)
}
英文:

I'm exploring Go and trying to set up a sort of pipeline using channels. I just want to read something in main() and send them to process() for processing, in this case just print the value to the screen.

Unfortunately, in the code below, it appears that process() never reads from the channel, or at least it doesn't print anything; what am I doing wrong?

package main

import ( &quot;fmt&quot; ; &quot;database/sql&quot; ; _ &quot;github.com/lib/pq&quot; ; &quot;time&quot; ; &quot;gopkg.in/redis.v3&quot; )//; &quot;strconv&quot; )

type Record struct {
	userId, myDate int
	prodUrl string
}


func main(){
		
	//connect to db
	db, err := sql.Open(...)
	defer db.Close()
	
	//error check here...
	    
	//exec query
	rows, err := db.Query(&quot;select userID,url,date from mytable limit 10&quot;)
	defer rows.Close()
	
	//error check here...	
		
	//create channel to buffer rows read
	bufferChan := make(chan *Record,1000)
	go process(bufferChan)
	
	//iterate through results and send them to process()
	row := new(Record)
	for rows.Next(){
		err := rows.Scan(&amp;row.userId, &amp;row.prodUrl, &amp;row.myDate)		
		bufferChan &lt;- row
		fmt.Printf(&quot;row sent %v&quot;,row.userId)					
	}	
}

//prints Record values
func process (buffer chan *Record) {
	row := &lt;- buffer
	fmt.Printf(&quot;row received: %d %v %d &quot;, row.userId,row.prodUrl,row.myDate)
}

答案1

得分: 2

func process没有打印任何内容的原因是,func main在for循环rows.Next完成后退出,从而退出程序。你需要做几件事情。

  1. 在for循环之后添加close的调用,以表示向缓冲通道添加消息的结束,否则可能会导致死锁。所以调用close(bufferChan)。
  2. 在func process中使用range来迭代通道。
  3. 传递一个额外的通道给process,以便知道它何时完成,这样main就可以等待process完成。

请参考下面的代码片段示例:

package main

import "fmt"

func main() {
bufferChan := make(chan int, 1000)
done := make(chan bool)
go process(bufferChan, done)
for i := 0; i < 100; i++ {
bufferChan <- i
}
close(bufferChan)

select {
case <-done:
	fmt.Println("Done")
}

}

func process(c chan int, done chan bool) {
for s := range c {
fmt.Println(s)
}
done <- true

}

英文:

The reason for func process not printing anything is that you func main exits after the for loop for rows.Next finishes thereby exiting the program. You need to do couple of things.

  1. Add call to close after for loop to indicate end adding message to
    buffered channel else it can lead to deadlock. So call
    close(bufferChan)
  2. Use range to iterate over channel in your func process.
  3. Pass an additional channel to process to know when it finishes so
    that main can wait till process finishes.

Look at the code snippet below for example:

package main

import &quot;fmt&quot;

func main() {
	bufferChan := make(chan int, 1000)
	done := make(chan bool)
	go process(bufferChan, done)
	for i := 0; i &lt; 100; i++ {
		bufferChan &lt;- i
	}
	close(bufferChan)

	select {
	case &lt;-done:
		fmt.Println(&quot;Done&quot;)
	}

}

func process(c chan int, done chan bool) {
	for s := range c {
		fmt.Println(s)
	}	
	done &lt;- true

}

答案2

得分: 1

你的主函数退出了,所以整个程序结束了。它应该等待处理结束。此外,process函数应该使用range关键字循环遍历通道。

一个可行解决方案的框架如下:

package main

import "fmt"

func process(input chan int, done chan struct{}) {
    for i := range input {
        fmt.Println(i)
    }
    done <- struct{}{}
}

func main() {
    input := make(chan int)
    done := make(chan struct{})

    go process(input, done)

    for i := 1; i < 10; i++ {
        input <- i
    }
    close(input)

    <-done
}

Playground

英文:

Your main function exits so the whole program ends. It should wait for end of processing. Moreover, process function should loop over channel with range keyword.

Scaffolding for a working solution looks like that:

package main

import &quot;fmt&quot;

func process(input chan int, done chan struct{}) {
	for i := range input {
		fmt.Println(i)
	}
	done &lt;- struct{}{}
}

func main() {
	input := make(chan int)
	done := make(chan struct{})

	go process(input, done)

	for i := 1; i &lt; 10; i++ {
		input &lt;- i
	}
	close(input)

	&lt;-done
}

<kbd>Playground</kbd>

答案3

得分: 1

我相信你正在寻找io.pipe()的Go API,它可以在写入者和读取者之间创建一个同步的内存管道。这里没有缓冲区。它可以用于连接期望io.Reader的代码和期望io.Writer的代码。

在你的情况下,io.PipeWriter是从数据库中读取值的代码,而io.PipeReader是将值写入屏幕的代码。

下面是一个示例,演示了在没有任何缓冲区(如bytes.Buffer)的情况下流式传输数据的方法。

// 设置管道以直接将数据写入读取器。
pr, pw := io.Pipe()
// 将JSON编码的数据写入管道的写入器端。
// 在单独的并发goroutine中进行写入,并记得
// 关闭PipeWriter,以向配对的PipeReader发出
// 写入完成的信号。
go func() {
  err := json.NewEncoder(pw).Encode(&v)
  pw.Close()
}()
// 发送HTTP请求。从读取器中读取的任何内容
// 将在请求体中发送。
// 当数据被写入写入器时,它将可以从读取器中读取。
resp, err := http.Post("example.com", "application/json", pr)

参考链接:

https://medium.com/stupid-gopher-tricks/streaming-data-in-go-without-buffering-3285ddd2a1e5

英文:

I believe you are looking for io.pipe() go API which creates a synchronous in-memory pipe between a writer and reader/s. There is no buffering here. It can be used to connect code expecting an io.Reader with code expecting an io.Writer.

In your case, io.PipeWriter is the code "reading value from the database" and "io.PipeReader" is the code "writing the value to the screen".

Here, an example of streaming data without any buffer i.e bytes.Buffer.

// Set up the pipe to write data directly into the Reader.
pr, pw := io.Pipe()
// Write JSON-encoded data to the Writer end of the pipe.
// Write in a separate concurrent goroutine, and remember
// to Close the PipeWriter, to signal to the paired PipeReader
// that we’re done writing.
go func() {
  err := json.NewEncoder(pw).Encode(&amp;v)
  pw.Close()
}()
// Send the HTTP request. Whatever is read from the Reader
// will be sent in the request body.
// As data is written to the Writer, it will be available
// to read from the Reader.
resp, err := http.Post(“example.com”, “application/json”, pr)

Reference:

https://medium.com/stupid-gopher-tricks/streaming-data-in-go-without-buffering-3285ddd2a1e5

huangapple
  • 本文由 发表于 2015年11月27日 21:29:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/33958485.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定