将消息从Kafka发送到WebSocket。

huangapple go评论104阅读模式
英文:

Send messages from kafka into websocket

问题

我使用gin-framework在golang中创建了一个Web服务。在这个项目中,我还消费了来自特定主题的一些kafka消息。我想要实现的目标是将我从主题接收到的消息发送到WebSocket中。所以通信是单向的,多个人可以连接到WebSocket并查看消息。

我考虑使用通道,在接收kafka消息的函数中,我有如下代码:

ch <- KafkaMessage

而在gin框架中,我创建了如下代码:

requestRouterRPCv1.GET("wf-live", wsWorkFlowLive)

func wsWorkFlowLive(c *gin.Context) {
    ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
    if err != nil {
        log.Println("error get connection")
        log.Fatal(err)
    }
    defer ws.Close()

    err = ws.WriteJSON(<-ch)

    if err != nil {
        log.Println("error write message: " + err.Error())
    }
}

var upGrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

这是我用来测试WebSocket的HTML代码片段:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8" />
    <title>index</title>
</head>
<body>
    <h1>test websocket</h1>
    <p id="message-json"></p>
    <p id="message-text"></p>
    <script>
        function jsonWS() {
            var ws = new WebSocket("ws://ws.acme.com/ws/v1/wf-live");

            ws.onmessage = function (evt) {
                console.log("Received Message: " + evt.data);
                document.getElementById("message-json").innerText += evt.data;
            };

            ws.onclose = function (evt) {
                console.log("Connection closed.");
            };
        }

        // Start websocket
        jsonWS();
    </script>
</body>
</html>

但是我错过了一些东西,因为一旦我收到第一条kafka消息,就会出现以下错误行为:

  1. 只显示第一条消息,之后连接会迅速关闭。
  2. 要看到后续的消息,我必须刷新页面,这不是WebSocket的方式。
  3. 因为连接关闭,通道没有被读取,所以它在消费函数中被阻塞。我不能有这种行为。
  4. 为了避免第3点,我认为我需要一个机制,在只有一个或多个WebSocket连接时才将消息发送到通道中。
英文:

I created a web service with gin-framework in golang. In this project I also consume some kafka messages that comes from a particular topic. What I'm trying to achive is to pour out the messages that I receive from the topic into the websocket. So the communication is only one way and more than 1 person can connect to the web socket and see the messages coming in.
I tought to use channel, so inside the function that receives the kafka message I have something like this:

ch &lt;- KafkaMessage

While in gin framework I create something like this:

requestRouterRPCv1.GET(&quot;wf-live&quot;, wsWorkFlowLive)

func wsWorkFlowLive(c *gin.Context) {
	ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
	if err != nil {
		log.Println(&quot;error get connection&quot;)
		log.Fatal(err)
	}
	defer ws.Close()

	err = ws.WriteJSON(&lt;-ch)

	if err != nil {
		log.Println(&quot;error write message: &quot; + err.Error())
	}
}

var upGrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

Here the html snipped that I use to test the websocket:

 &lt;!DOCTYPE html&gt;
   &lt;html lang=&quot;en&quot;&gt;
     &lt;head&gt;
       &lt;meta charset=&quot;UTF-8&quot; /&gt;
       &lt;title&gt;index&lt;/title&gt;
    &lt;/head&gt;
     &lt;body&gt;
       &lt;h1&gt;test websocket&lt;/h1&gt;
       &lt;p id=&quot;message-json&quot;&gt;&lt;/p&gt;
      &lt;p id=&quot;message-text&quot;&gt;&lt;/p&gt;
            &lt;script&gt;
        function jsonWS() {
          var ws = new WebSocket(&quot;ws://ws.acme.com/ws/v1/wf-live&quot;);

          ws.onmessage = function (evt) {
            console.log(&quot;Received Message: &quot; + evt.data);
            document.getElementById(&quot;message-json&quot;).innerText += evt.data;
          };

          ws.onclose = function (evt) {
            console.log(&quot;Connection closed.&quot;);
          };
        }

        // Start websocket
        jsonWS();
      &lt;/script&gt;
    &lt;/body&gt;
  &lt;/html&gt;

But I miss something, I'm quite a newbie, becasue I have the following wrong behaviours once I recevie the first kafka message:

  1. Only the first message is shown, after that connection is quickly closed
  2. To see the second ones I have to refresh the page, and this is not the websocket way
  3. Because the connection is closed the channel it not red, so it's stuck in the cosume function until I read it. I cannot have this behaviour
  4. to avoid the point 3, I think I have to have a mechanism where I send the message to the channel only when one or more ws is connected.

答案1

得分: 1

你离答案很近,只是缺少两个部分。

1- 使用select语句来持续接收来自kafka通道的新消息。

2- 维持一个活动的websocket连接。这个答案有更多详细信息。

让我知道这对你是否有效。

英文:

You are close. Just missing two things.

1- Using select statement to keep receiving new messages from the kafka channels.

2- Maintain an active websocket connecting. This answer have a bit more details

Let me know whether this works out for you.

答案2

得分: 1

Gorilla聊天示例与您的需求接近。该示例将从任何客户端接收到的消息广播给所有连接的客户端。按照以下步骤将代码调整为您的用例:

  • 客户端读取泵更改为丢弃接收到的消息,而不是将消息发送到中心。

    func (c *Client) readPump() {
        defer func() {
      	  c.hub.unregister <- c
      	  c.conn.Close()
        }()
        c.conn.SetReadLimit(maxMessageSize)
        c.conn.SetReadDeadline(time.Now().Add(pongWait))
        c.conn.SetPongHandler(func(string) error { 
        c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
        for {
            if _, _, err := c.NextReader(); err != nil {
              break
            }
        }
    }
    
  • 将您的Kafka读取循环更改为将消息发送到Hub.broadcast通道。

    for {
     	  msg, err := c.ReadMessage(xxx)
        if err != nil {
              // 处理错误
        }
        hub.broadcast <- msg
    }
    
  • 删除将客户端发送队列中的消息合并为单个websocket消息的代码,或者调整客户端以处理单个websocket消息中的多个Kafka消息。

英文:

The Gorilla chat example is close to what you need. The example broadcasts messages received from any client to all connected clients. Do the following to adapt the code to your use case:

  • Change the client read pump to discard received messages instead of sending messages to the hub.

    func (c *Client) readPump() {
        defer func() {
      	  c.hub.unregister &lt;- c
      	  c.conn.Close()
        }()
        c.conn.SetReadLimit(maxMessageSize)
        c.conn.SetReadDeadline(time.Now().Add(pongWait))
        c.conn.SetPongHandler(func(string) error { 
        c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
        for {
            if _, _, err := c.NextReader(); err != nil {
              break
            }
        }
    }
    
  • Change your Kafka read loop to send messages to the Hub.broadcast channel.

    for {
     	  msg, err := c.ReadMessage(xxx)
        if err != nil {
              // handle error
        }
        hub.broadcast &lt;- msg
    }
    
  • Delete the code that coalesces messages in the client send queue to a single websocket message or adjust the client to handle multiple Kafka messages in a single websocket message.

huangapple
  • 本文由 发表于 2022年3月23日 06:27:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/71579671.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定