非阻塞读取与select

huangapple go评论113阅读模式
英文:

non-blocking read with select

问题

假设我们有以下关于服务器的抽象(XMPP,但在这里并不重要):

  1. type Server struct {
  2. Addr string
  3. Conn net.Conn
  4. tlsCon *tls.Conn
  5. R *bufio.Reader
  6. SSL bool
  7. reader chan string
  8. }
  9. func createServer(domain string) (s *Server, err error) {
  10. conn, err := net.Dial("tcp", domain+":5222")
  11. if err != nil {
  12. return nil, err
  13. }
  14. s = &Server{domain, conn, nil, bufio.NewReader(conn), false, make(chan string, 8)}
  15. go func(t *Server) {
  16. for {
  17. buf := bufsPool.Get().([]byte)
  18. n, err := s.R.Read(buf)
  19. if err == nil {
  20. s.reader <- string(buf[:n])
  21. } else {
  22. fmt.Println(err)
  23. }
  24. }
  25. }(s)
  26. return s, err
  27. }
  28. func (s *Server) read() (t string) {
  29. t = ""
  30. Inn:
  31. for {
  32. select {
  33. case u := <-s.reader:
  34. fmt.Println("debug", u)
  35. t += u
  36. default:
  37. break Inn
  38. }
  39. }
  40. return t
  41. }

这个设计的思路很简单:创建一个连接,如果一切顺利,为它获取一个带缓冲的读取器(我不使用conn.read函数,只是因为如果服务器要求,我会启动TLS连接并重新分配R为基于它创建的读取器,但现在不是这种情况)。

现在我们有了两个函数,writeread

  1. func (s *Server) read() (t string) {
  2. t = ""
  3. Inn:
  4. for {
  5. select {
  6. case u := <-s.reader:
  7. fmt.Println("debug", u)
  8. t += u
  9. default:
  10. break Inn
  11. }
  12. }
  13. return t
  14. }

所以我希望read函数能够接收由从套接字读取的goroutine(在createServer()中启动的那个)发送的数据。我的想法是调用write然后读取响应。我创建了所有这些是因为服务器有时会将响应作为两部分发送,例如,我必须传统地调用read()两次。但是这并没有起作用,我的read函数(见上文)根本没有返回任何内容。最有可能的原因是服务器没有来得及发送数据,而我的函数因为通道中没有数据而退出。但一个问题是,尽管我多次调用writereadread始终返回空值。

所以我猜我可能有一些一般性的设计错误,问题是社区能否帮助我找到它。谢谢。

英文:

Let's say we have the following abstraction for a server (XMPP, but here it doesn't matter a lot):

  1. type Server struct {
  2. Addr string
  3. Conn net.Conn
  4. tlsCon *tls.Conn
  5. R *bufio.Reader
  6. SSL bool
  7. reader chan string
  8. }

And a helper function to init it:

  1. func createServer(domain string) (s *Server, err error) {
  2. conn, err := net.Dial(&quot;tcp&quot;, domain+&quot;:5222&quot;)
  3. if err != nil {
  4. return nil, err
  5. }
  6. s = &amp;Server{domain, conn, nil, bufio.NewReader(conn), false, make(chan string, 8)}
  7. go func(t *Server) {
  8. for {
  9. buf := bufsPool.Get().([]byte)
  10. n, err := s.R.Read(buf)
  11. if err == nil {
  12. s.reader &lt;- string(buf[:n])
  13. } else {
  14. fmt.Println(err)
  15. }
  16. }
  17. }(s)
  18. return s, err
  19. }

The idea is simple: create a connection and, if everything went well, get a buffered reader for it (I don't use the conn.read function just because, if server requires, I start TLS connection and reassign R to reader created based of it, but now this isn't the case).

Now we have the two functions, write and read:

  1. func (s *Server) read() (t string) {
  2. t = &quot;&quot;
  3. Inn:
  4. for {
  5. select {
  6. case u := &lt;-s.reader:
  7. fmt.Println(&quot;debug&quot;, u)
  8. t += u
  9. default:
  10. break Inn
  11. }
  12. }
  13. return t
  14. }

So I want the read function to receive data sent by the goroutine which reads from socket (the one started in createServer()) from chan . The idea is to call write and than read the response. I created all of this because the server some times sends response as two parts, e.g. I have to do traditional read() 2 times.
But this didn't work, my read function (see above) simply returns nothing. Most probably, that's because the server doesn't manage to send back the data and my function exits because there's nothing in the chan. But one concern is that although I call write and read multiple times, read always returns nothing.

So I guess I have some general design error and the question is if the community can help me find it. Thanks.

答案1

得分: 1

问题在于你的select选择了默认分支,因为读取通道中还没有任何内容,所以它立即中断了循环。(https://golang.org/ref/spec#Select_statements)

你希望读取操作阻塞,直到接收到足够的数据。例如,如果你知道响应以"\n"结尾,那么就继续读取并且在接收到"\n"或通道关闭之前不中断。

也许一个更好的解决方案是在goroutine中使用bufio.Scanner与读取器一起使用,如果传入的数据以换行符分隔,并使用chan string将整个字符串传递给另一个goroutine。

你还可以使用Scanner.Split来设置不同的分割函数。

(参见关于tcp和分隔符的这个问题和答案: https://stackoverflow.com/questions/25766193/golang-tcp-client-server-data-delimiter)

编辑: 使用xml.Decoder.Token可以从流中持续读取标记,并适当处理它们。你可以将其与Decode (这将解码下一个标记)或DecodeElement (这允许你解码刚刚读取的标记)结合使用来解码xml。

英文:

The problem is that your select selects the default branch because there's nothing in the reader channel yet, so it breaks the for immediately. (https://golang.org/ref/spec#Select_statements)

You want read to block until you receive enough data for it. E.g. if you know that your response needs to end with a "\n", keep reading and don't break until you get a "\n", or the channel is closed.

Perhaps a better solution would be to use bufio.Scanner in goroutine with the reader, if the incoming data is newline delimited, and use a chan string to pass the whole string to the other goroutine.

You can also use Scanner.Split to set a different splitter function.

(see also this question and answer about tcp and delimiters: https://stackoverflow.com/questions/25766193/golang-tcp-client-server-data-delimiter)

Edit: Using xml.Decoder.Token you can keep reading the tokens from the stream, and handle them appropriately. You can combine this with Decode (this will decode the next token) or DecodeElement (this allows you to decode the just read token) to decode the xml.

huangapple
  • 本文由 发表于 2016年3月24日 15:36:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/36195197.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定