Golang ZeroMQ: REQ/REP 无意义的非阻塞

huangapple go评论104阅读模式

Golang ZeroMQ: REQ/REP senseless non-blocking




  1. server.recv()
  2. client.send()
  3. client.recv()
  4. server.send()








Recv: https://godoc.org/github.com/pebbe/zmq4#Socket.Recv



  1. package connection
  2. import (
  3. "zmq4"
  4. "fmt"
  5. "time"
  6. )
  7. const ERRTMPUNAV="resource temporarily unavailable"
  8. func checkError(e error){
  9. if e != nil {
  10. panic(e)
  11. }
  12. }
  13. func CreateRepNode(address string,onMessage chan<- string,send <-chan string,closeConn <-chan bool){
  14. stop:=false
  15. socket,err:=zmq4.NewSocket(zmq4.REP)
  16. checkError(err)
  17. err=socket.Bind(address)
  18. checkError(err)
  19. go func(socket *zmq4.Socket){
  20. for {
  21. msg,err:=socket.Recv(zmq4.DONTWAIT)
  22. fmt.Println("server message"+msg)
  23. if stop==true {
  24. return
  25. }
  26. if err != nil {
  27. rateLimit := time.Tick(100 * time.Millisecond)
  28. <-rateLimit
  29. continue
  30. }
  31. checkError(err)
  32. onMessage<-msg
  33. rep:=<-send
  34. _,err=socket.Send(rep,zmq4.DONTWAIT)
  35. }
  36. }(socket)
  37. <-closeConn
  38. stop=true
  39. }
  40. func CreateReqNode(address string,onMessage chan<- string,send <-chan string,closeConn <-chan bool){
  41. stop:=false
  42. socket,err:=zmq4.NewSocket(zmq4.REQ)
  43. checkError(err)
  44. err=socket.Connect(address)
  45. checkError(err)
  46. go func(){
  47. for {
  48. msg:=<-send
  49. if stop==true {
  50. return
  51. }
  52. _,err:=socket.Send(msg,zmq4.DONTWAIT)
  53. for {
  54. msg,err=socket.Recv(zmq4.DONTWAIT)
  55. fmt.Println("client got message "+msg)
  56. if err!=nil {
  57. if err.Error()==ERRTMPUNAV {
  58. w:=time.Tick(100*time.Millisecond)
  59. <-w
  60. continue
  61. }
  62. }
  63. break
  64. }
  65. onMessage<-msg
  66. }
  67. }()
  68. <-closeConn
  69. stop=true
  70. }

On Python, the ZeroMQ .recv()/.send() operations are blocking, which is just perfect for REQ/REP.<br>In Golang, I must pass a zmq.DONTWAIT to the .recv() and .send() operation in order to make it work.

But the thing is, the flow needs to be lock step, so:

  1. server.recv()
  2. client.send()
  3. client.recv()
  4. server.send()

And between 3 and 4 the weirdness starts, because they are async.

When the client has sent a message and the server has not received it yet but client tries to receive a response, the lock step is no lock step any more.

Is there some kind of zmq.DOBLOCK in contrast to zmq.DONTWAIT?

Or did I get something wrong here?


I am using this go binding in C for zeromq: https://godoc.org/github.com/pebbe/zmq4#Type

As you can see here the .recv() needs a input flag, which is one of the both on the second ref:

Recv: https://godoc.org/github.com/pebbe/zmq4#Socket.Recv

Flags to be passed: https://github.com/pebbe/zmq4/blob/master/zmq4.go#L403

This is the current code I got to make a workaround which feels somewhat ugly:

  1. package connection
  2. import (
  3. &quot;zmq4&quot;
  4. &quot;fmt&quot;
  5. &quot;time&quot;
  6. )
  7. const ERRTMPUNAV=&quot;resource temporarily unavailable&quot;
  8. func checkError(e error){
  9. if e != nil {
  10. panic(e)
  11. }
  12. }
  13. func CreateRepNode(address string,onMessage chan&lt;- string,send &lt;-chan string,closeConn &lt;-chan bool){
  14. stop:=false
  15. socket,err:=zmq4.NewSocket(zmq4.REP)
  16. checkError(err)
  17. err=socket.Bind(address)
  18. checkError(err)
  19. go func(socket *zmq4.Socket){
  20. for {
  21. msg,err:=socket.Recv(zmq4.DONTWAIT)
  22. fmt.Println(&quot;server message&quot;+msg)
  23. if stop==true {
  24. return
  25. }
  26. if err != nil {
  27. rateLimit := time.Tick(100 * time.Millisecond)
  28. &lt;-rateLimit
  29. continue
  30. }
  31. checkError(err)
  32. onMessage&lt;-msg
  33. rep:=&lt;-send
  34. _,err=socket.Send(rep,zmq4.DONTWAIT)
  35. }
  36. }(socket)
  37. &lt;-closeConn
  38. stop=true
  39. }
  40. func CreateReqNode(address string,onMessage chan&lt;- string,send &lt;-chan string,closeConn &lt;-chan bool){
  41. stop:=false
  42. socket,err:=zmq4.NewSocket(zmq4.REQ)
  43. checkError(err)
  44. err=socket.Connect(address)
  45. checkError(err)
  46. go func(){
  47. for {
  48. msg:=&lt;-send
  49. if stop==true {
  50. return
  51. }
  52. _,err:=socket.Send(msg,zmq4.DONTWAIT)
  53. for {
  54. msg,err=socket.Recv(zmq4.DONTWAIT)
  55. fmt.Println(&quot;client got message &quot;+msg)
  56. if err!=nil {
  57. if err.Error()==ERRTMPUNAV {
  58. w:=time.Tick(100*time.Millisecond)
  59. &lt;-w
  60. continue
  61. }
  62. }
  63. break
  64. }
  65. onMessage&lt;-msg
  66. }
  67. }()
  68. &lt;-closeConn
  69. stop=true
  70. }


得分: 3



  • 尽量避免使用阻塞设计(非阻塞设计可以让您完全控制所有事物的到来,而不会在任何无限/不可控的等待循环中“挂起”,这在已经开发的死锁中尤为糟糕)。

  • 避免依赖于仅具有单一、基本类型的形式通信模式的源代码示例,而应为可能出现问题的所有情况开发强大的生存能力处理策略(例如,在传输网络中丢失信号、消息丢失、资源超载等)。




我现在可以为您进一步的问题提供帮助,让您对这个主题有一个更全面的了解,其中包括更多的论点、一个简单的信令平面/消息平面示意图,以及指向Pieter HINTJENS的一本必读书籍的直接链接。



ZeroMQ trivial elementary archetypes are more a set of building blocks than a production-grade solution to any need.

Go-lang is a very powerful, modern language with coroutines and other smart tools for a controlled concurrency, so forgive me to state the following list of recommendations:

  • avoid blocking designs wherever one can ( a non-blocking design leaves one in full control of all things as they come ... not "hanging" in any infinite/uncontrollable waiting loop, the worse in an already developed deadlock )

  • avoid relying on a SLOC examples with a single, elementary type of a Formal Communication Pattern, one shall rather develop a robust survivability-handler strategy for all the cases where something may go wrong ( Loss-of-Signal in transport network, Loss-of-Message, DDoS-level of resources overloads, ... )

Redesign hint - do not use REQ/REP at all. Yes, never...

ZeroMQ Scaleable Formal Communication Pattern REQ/REP is fine for learning ZeroMQ, but is lethal in real production grade deployment. For details, read here.

Next think about internally unconditional patterns, alike PAIR ( though marked experimental, for some use-cases it works great ), XREQ/XREP, PUSH/PULL or some composite signalling/transport multi-socket custom-designed own pattern.

The best next step?

What I can do for your further questions right now is to direct you to see a bigger picture on this subject with more arguments, a simple signalling-plane / messaging-plane illustration and a direct link to a must-read book from Pieter HINTJENS.

The book is worth one's time and efforts. If one is serious into distributed systems design, you will love it altogether with Pieter's passion for Zero-sharing, Zero-blocking, (almost) Zero-copy et al.

  • 本文由 发表于 2016年11月4日 20:22:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/40422572.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
