Golang Gorilla Websocket在120秒后停止接收信息。

huangapple go评论118阅读模式
英文:

Golang Gorilla Websocket stops receiving information at 120 seconds

问题

我目前正在尝试连接到CEX.IO比特币交易所的WebSocket,但不仅与CEX.IO有问题,还与其他交易所也有问题。所有的连接在大约120秒的时候都会断开,这让我觉得可能存在某种TTL问题。主包中的Process() goroutine最终只是挂起并等待来自readLoop的数据,而readLoop则停止接收数据。我在代码中包含了一些只读API密钥,你可以测试一下。

以下是连接到cexio WebSocket的代码。这是他们API的链接:https://cex.io/websocket-api

  1. package cexio
  2. import (
  3. "github.com/gorilla/websocket"
  4. "github.com/satori/go.uuid"
  5. "encoding/hex"
  6. "encoding/json"
  7. "crypto/hmac"
  8. "crypto/sha256"
  9. "bytes"
  10. "strconv"
  11. "time"
  12. "fmt"
  13. )
  14. const Url = "wss://ws.cex.io/ws/"
  15. type Connection struct {
  16. conn *websocket.Conn
  17. }
  18. type IntraAppMessage struct {
  19. SocketMessage GenericMessage
  20. ProgramMessage ProgramMessage
  21. }
  22. type GenericMessage struct {
  23. Event string `json:"e"`
  24. Data interface{} `json:"data"`
  25. Auth AuthData `json:"auth,omitempty"`
  26. Ok string `json:"ok,omitempty"`
  27. Oid string `json:"oid,omitempty"`
  28. Time int64 `json:"time,omitempty"`
  29. }
  30. type ProgramMessage struct {
  31. Error string
  32. }
  33. type AuthData struct {
  34. Key string `json:"key"`
  35. Signature string `json:"signature"`
  36. Timestamp int64 `json:"timestamp"`
  37. }
  38. type OrderBookSubscribeData struct {
  39. Pair [2]string `json:"pair"`
  40. Subscribe bool `json:"subscribe"`
  41. Depth int `json:"depth"`
  42. }
  43. func (c *Connection) SendPong() error {
  44. pongMsg := GenericMessage{
  45. Event: "pong",
  46. }
  47. err := c.conn.WriteJSON(pongMsg)
  48. if err != nil {
  49. return nil
  50. }
  51. deadline := time.Now().Add(15*time.Second)
  52. err = c.conn.WriteControl(websocket.PongMessage, nil, deadline)
  53. if err != nil {
  54. return err
  55. }
  56. return nil
  57. }
  58. func (c *Connection) SendPing() error {
  59. pingMsg := GenericMessage{
  60. Event: "get-balance",
  61. Oid: uuid.NewV4().String(),
  62. }
  63. err := c.conn.WriteJSON(pingMsg)
  64. if err != nil {
  65. return err
  66. }
  67. deadline := time.Now().Add(15*time.Second)
  68. err = c.conn.WriteControl(websocket.PingMessage, nil, deadline)
  69. if err != nil {
  70. return err
  71. }
  72. return nil
  73. }
  74. func (c *Connection) Connect() error {
  75. dialer := *websocket.DefaultDialer
  76. wsConn, _, err := dialer.Dial(Url, nil)
  77. if err != nil {
  78. return err
  79. }
  80. c.conn = wsConn
  81. for {
  82. _, msgBytes, err := c.conn.ReadMessage()
  83. if err != nil {
  84. c.Disconnect()
  85. return err
  86. }
  87. fmt.Println(string(msgBytes))
  88. var m GenericMessage
  89. err = json.Unmarshal(msgBytes, &m)
  90. if err != nil {
  91. c.Disconnect()
  92. return err
  93. }
  94. if m.Event != "connected" {
  95. c.Disconnect()
  96. return err
  97. } else {
  98. break
  99. }
  100. }
  101. return nil
  102. }
  103. func (c *Connection) Disconnect() error {
  104. return c.conn.Close()
  105. }
  106. func (c *Connection) ReadLoop(ch chan<- IntraAppMessage) {
  107. for {
  108. fmt.Println("starting new read")
  109. _, msgBytes, err := c.conn.ReadMessage()
  110. if err != nil {
  111. ch <- IntraAppMessage{
  112. ProgramMessage: ProgramMessage{
  113. Error: err.Error(),
  114. },
  115. }
  116. continue
  117. }
  118. var m GenericMessage
  119. err = json.Unmarshal(msgBytes, &m)
  120. if err != nil {
  121. ch <- IntraAppMessage{
  122. ProgramMessage: ProgramMessage{
  123. Error: err.Error(),
  124. },
  125. }
  126. continue
  127. }
  128. ch <- IntraAppMessage{
  129. SocketMessage: m,
  130. }
  131. }
  132. }
  133. func CreateSignature(timestamp int64, key, secret string) string {
  134. secretBytes := []byte(secret)
  135. h := hmac.New(sha256.New, secretBytes)
  136. var buffer bytes.Buffer
  137. buffer.WriteString(strconv.FormatInt(timestamp, 10))
  138. buffer.WriteString(key)
  139. h.Write(buffer.Bytes())
  140. return hex.EncodeToString(h.Sum(nil))
  141. }
  142. func (c *Connection) Authenticate(key, secret string) error {
  143. timestamp := time.Now().Unix()
  144. signature := CreateSignature(timestamp, key, secret)
  145. var authMsg GenericMessage
  146. authMsg.Event = "auth"
  147. authMsg.Auth = AuthData{
  148. Key: key,
  149. Signature: signature,
  150. Timestamp: timestamp,
  151. }
  152. err := c.conn.WriteJSON(authMsg)
  153. if err != nil {
  154. return err
  155. }
  156. for {
  157. _, msgBytes, err := c.conn.ReadMessage()
  158. if err != nil {
  159. c.Disconnect()
  160. return err
  161. }
  162. fmt.Println(string(msgBytes))
  163. var m GenericMessage
  164. err = json.Unmarshal(msgBytes, &m)
  165. if err != nil {
  166. c.Disconnect()
  167. return err
  168. }
  169. if m.Event != "auth" && m.Ok != "ok" {
  170. c.Disconnect()
  171. return err
  172. } else {
  173. break
  174. }
  175. }
  176. return nil
  177. }
  178. func (c *Connection) SubscribeToOrderBook(pair [2]string) error {
  179. sendMsg := GenericMessage{
  180. Event: "order-book-subscribe",
  181. Data: OrderBookSubscribeData{
  182. Pair: pair,
  183. Subscribe: true,
  184. Depth: 0,
  185. },
  186. Oid: uuid.NewV4().String(),
  187. }
  188. err := c.conn.WriteJSON(sendMsg)
  189. if err != nil {
  190. return err
  191. }
  192. return nil
  193. }
  194. func (c *Connection) GetBalance() error {
  195. sendMsg := GenericMessage{
  196. Event: "get-balance",
  197. Oid: uuid.NewV4().String(),
  198. }
  199. err := c.conn.WriteJSON(sendMsg)
  200. if err != nil {
  201. return err
  202. }
  203. return nil
  204. }

希望对你有所帮助!

英文:

I'm currently trying to connect to the CEX.IO bitcoin exchange's websocket, but have been having issues not only with CEX.IO but with others too. All of my connections drop around the 120-second mark which makes me think there is some TTL problem going on. The Process() goroutine in the main package ends up just hanging and waiting for data from the readLoop which just stops receiving data. I've included some read-only API keys in the code so you can test if you'd like.

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;bitbucket.org/tradedefender/cryptocurrency/exchange-connector/cexio&quot;
  5. &quot;github.com/shopspring/decimal&quot;
  6. &quot;encoding/json&quot;
  7. &quot;time&quot;
  8. )
  9. type OrderBook struct {
  10. Asks []Ask
  11. Bids []Bid
  12. }
  13. type Ask struct {
  14. Rate decimal.Decimal
  15. Amount decimal.Decimal
  16. }
  17. type Bid struct {
  18. Rate decimal.Decimal
  19. Amount decimal.Decimal
  20. }
  21. func main() {
  22. cexioConn := new(cexio.Connection)
  23. err := cexioConn.Connect()
  24. if err != nil {
  25. fmt.Errorf(&quot;error: %s&quot;, err.Error())
  26. }
  27. err = cexioConn.Authenticate(&quot;TLwYkktLf7Im6nqSKt6UO1IrU&quot;, &quot;9ImOJcR7Qj3LMIyPCzky0D7WE&quot;)
  28. if err != nil {
  29. fmt.Errorf(&quot;error: %s&quot;, err.Error())
  30. }
  31. readChannel := make(chan cexio.IntraAppMessage, 25)
  32. go cexioConn.ReadLoop(readChannel)
  33. processor := Processor{
  34. WatchPairs: [][2]string{
  35. [2]string{
  36. &quot;BTC&quot;, &quot;USD&quot;,
  37. },
  38. },
  39. conn: cexioConn,
  40. }
  41. go processor.Process(readChannel)
  42. // LOL
  43. for {
  44. continue
  45. }
  46. }
  47. type Processor struct {
  48. WatchPairs [][2]string
  49. conn *cexio.Connection
  50. }
  51. func (p *Processor) Process(ch &lt;-chan cexio.IntraAppMessage) {
  52. p.conn.SubscribeToOrderBook(p.WatchPairs[0])
  53. pingTimer := time.Now().Unix()
  54. for {
  55. fmt.Printf(&quot;(%v)\n&quot;, time.Now().Unix())
  56. if (time.Now().Unix() - pingTimer) &gt;= 10 {
  57. fmt.Println(&quot;sending ping&quot;)
  58. p.conn.SendPing()
  59. pingTimer = time.Now().Unix()
  60. }
  61. readMsg := &lt;- ch
  62. output, _ := json.Marshal(readMsg.SocketMessage)
  63. fmt.Println(string(output))
  64. if readMsg.SocketMessage.Event == &quot;ping&quot; {
  65. fmt.Println(&quot;sending pong&quot;)
  66. p.conn.SendPong()
  67. pingTimer = time.Now().Unix()
  68. }
  69. }
  70. }

Below is the connector to the cexio websocket. Here is a link to their API: https://cex.io/websocket-api

  1. package cexio
  2. import (
  3. &quot;github.com/gorilla/websocket&quot;
  4. //&quot;github.com/shopspring/decimal&quot;
  5. &quot;github.com/satori/go.uuid&quot;
  6. &quot;encoding/hex&quot;
  7. &quot;encoding/json&quot;
  8. &quot;crypto/hmac&quot;
  9. &quot;crypto/sha256&quot;
  10. &quot;bytes&quot;
  11. &quot;strconv&quot;
  12. &quot;time&quot;
  13. &quot;fmt&quot;
  14. )
  15. const Url = &quot;wss://ws.cex.io/ws/&quot;
  16. type Connection struct {
  17. conn *websocket.Conn
  18. }
  19. type IntraAppMessage struct {
  20. SocketMessage GenericMessage
  21. ProgramMessage ProgramMessage
  22. }
  23. type GenericMessage struct {
  24. Event string `json:&quot;e&quot;`
  25. Data interface{} `json:&quot;data&quot;`
  26. Auth AuthData `json:&quot;auth,omitempty&quot;`
  27. Ok string `json:&quot;ok,omitempty&quot;`
  28. Oid string `json:&quot;oid,omitempty&quot;`
  29. Time int64 `json:&quot;time,omitempty&quot;`
  30. }
  31. type ProgramMessage struct {
  32. Error string
  33. }
  34. type AuthData struct {
  35. Key string `json:&quot;key&quot;`
  36. Signature string `json:&quot;signature&quot;`
  37. Timestamp int64 `json:&quot;timestamp&quot;`
  38. }
  39. type OrderBookSubscribeData struct {
  40. Pair [2]string `json:&quot;pair&quot;`
  41. Subscribe bool `json:&quot;subscribe&quot;`
  42. Depth int `json:&quot;depth&quot;`
  43. }
  44. func (c *Connection) SendPong() error {
  45. pongMsg := GenericMessage{
  46. Event: &quot;pong&quot;,
  47. }
  48. err := c.conn.WriteJSON(pongMsg)
  49. if err != nil {
  50. return nil
  51. }
  52. deadline := time.Now().Add(15*time.Second)
  53. err = c.conn.WriteControl(websocket.PongMessage, nil, deadline)
  54. if err != nil {
  55. return err
  56. }
  57. return nil
  58. }
  59. func (c *Connection) SendPing() error {
  60. pingMsg := GenericMessage{
  61. Event: &quot;get-balance&quot;,
  62. Oid: uuid.NewV4().String(),
  63. }
  64. err := c.conn.WriteJSON(pingMsg)
  65. if err != nil {
  66. return err
  67. }
  68. deadline := time.Now().Add(15*time.Second)
  69. err = c.conn.WriteControl(websocket.PingMessage, nil, deadline)
  70. if err != nil {
  71. return err
  72. }
  73. return nil
  74. }
  75. func (c *Connection) Connect() error {
  76. dialer := *websocket.DefaultDialer
  77. wsConn, _, err := dialer.Dial(Url, nil)
  78. if err != nil {
  79. return err
  80. }
  81. c.conn = wsConn
  82. //c.conn.SetPingHandler(c.HandlePing)
  83. for {
  84. _, msgBytes, err := c.conn.ReadMessage()
  85. if err != nil {
  86. c.Disconnect()
  87. return err
  88. }
  89. fmt.Println(string(msgBytes))
  90. var m GenericMessage
  91. err = json.Unmarshal(msgBytes, &amp;m)
  92. if err != nil {
  93. c.Disconnect()
  94. return err
  95. }
  96. if m.Event != &quot;connected&quot; {
  97. c.Disconnect()
  98. return err
  99. } else {
  100. break
  101. }
  102. }
  103. return nil
  104. }
  105. func (c *Connection) Disconnect() error {
  106. return c.conn.Close()
  107. }
  108. func (c *Connection) ReadLoop(ch chan&lt;- IntraAppMessage) {
  109. for {
  110. fmt.Println(&quot;starting new read&quot;)
  111. _, msgBytes, err := c.conn.ReadMessage()
  112. if err != nil {
  113. ch &lt;- IntraAppMessage{
  114. ProgramMessage: ProgramMessage{
  115. Error: err.Error(),
  116. },
  117. }
  118. continue
  119. }
  120. var m GenericMessage
  121. err = json.Unmarshal(msgBytes, &amp;m)
  122. if err != nil {
  123. ch &lt;- IntraAppMessage{
  124. ProgramMessage: ProgramMessage{
  125. Error: err.Error(),
  126. },
  127. }
  128. continue
  129. }
  130. ch &lt;- IntraAppMessage{
  131. SocketMessage: m,
  132. }
  133. }
  134. }
  135. func CreateSignature(timestamp int64, key, secret string) string {
  136. secretBytes := []byte(secret)
  137. h := hmac.New(sha256.New, secretBytes)
  138. var buffer bytes.Buffer
  139. buffer.WriteString(strconv.FormatInt(timestamp, 10))
  140. buffer.WriteString(key)
  141. h.Write(buffer.Bytes())
  142. return hex.EncodeToString(h.Sum(nil))
  143. }
  144. func (c *Connection) Authenticate(key, secret string) error {
  145. timestamp := time.Now().Unix()
  146. signature := CreateSignature(timestamp, key, secret)
  147. var authMsg GenericMessage
  148. authMsg.Event = &quot;auth&quot;
  149. authMsg.Auth = AuthData{
  150. Key: key,
  151. Signature: signature,
  152. Timestamp: timestamp,
  153. }
  154. err := c.conn.WriteJSON(authMsg)
  155. if err != nil {
  156. return err
  157. }
  158. for {
  159. _, msgBytes, err := c.conn.ReadMessage()
  160. if err != nil {
  161. c.Disconnect()
  162. return err
  163. }
  164. fmt.Println(string(msgBytes))
  165. var m GenericMessage
  166. err = json.Unmarshal(msgBytes, &amp;m)
  167. if err != nil {
  168. c.Disconnect()
  169. return err
  170. }
  171. if m.Event != &quot;auth&quot; &amp;&amp; m.Ok != &quot;ok&quot; {
  172. c.Disconnect()
  173. return err
  174. } else {
  175. break
  176. }
  177. }
  178. return nil
  179. }
  180. func (c *Connection) SubscribeToOrderBook(pair [2]string) error {
  181. sendMsg := GenericMessage{
  182. Event: &quot;order-book-subscribe&quot;,
  183. Data: OrderBookSubscribeData{
  184. Pair: pair,
  185. Subscribe: true,
  186. Depth: 0,
  187. },
  188. Oid: uuid.NewV4().String(),
  189. }
  190. err := c.conn.WriteJSON(sendMsg)
  191. if err != nil {
  192. return err
  193. }
  194. return nil
  195. }
  196. func (c *Connection) GetBalance() error {
  197. sendMsg := GenericMessage{
  198. Event: &quot;get-balance&quot;,
  199. Oid: uuid.NewV4().String(),
  200. }
  201. err := c.conn.WriteJSON(sendMsg)
  202. if err != nil {
  203. return err
  204. }
  205. return nil
  206. }

答案1

得分: 0

解决方法是在主函数的末尾删除以下代码:

  1. for {
  2. continue
  3. }
英文:

Solution was to remove the

  1. for {
  2. continue
  3. }

at the end of the main function

huangapple
  • 本文由 发表于 2017年3月24日 23:39:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/43003683.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定