如何在 Go Fiber 中注册新的 WebSocket 连接?

huangapple go评论84阅读模式
英文:

How to register new websocket connections gofiber

问题

我正在为你翻译以下内容:

我正在尝试设置一个简单的 WebSocket 服务器,它应该在未知的时间间隔内为客户端提供一些内容。

我的代码目前如下所示:

router.go

func SetupRoutes(app *fiber.App) error {

	app.Get("/whop/validate", handler.HandleWhopValidate)
    /*其他非 WebSocket 路由*/

    /*...*/

	app.Get("/ws/monitor", websocket.New(wsHandler.HandleWsMonitor))

	app.Use(func(c *fiber.Ctx) error {
		c.SendStatus(404)
		return c.Next()
	})

	return nil
}

handler.go

package handlers

import (
	"fmt"
	"log"

	"github.com/gofiber/websocket/v2"
)

var register = make(chan *websocket.Conn)
var unregister = make(chan *websocket.Conn)

func HandleWsMonitor(c *websocket.Conn) {
	go SocketListener()
	defer func() {
		unregister <- c
		//可能需要在重新关闭之前检查连接是否已经关闭?
		c.Close()
	}()
	//将连接发送到通道中
	register <- c
	for {
		messageType, message, err := c.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Println("read error:", err)
			}
			return
		}
		if messageType == websocket.TextMessage {
			log.Println("got textmessage:", string(message))
		} else {
			log.Println("received message of type:", messageType)
		}
	}
}

func SocketListener() {
	for {
		select {
		case c := <-register:
			messageType, message, err := c.ReadMessage()
			if err != nil {
				log.Println(err)
				unregister <- c
				return
			}

			fmt.Printf("Got message of type: %d\nMessage:%s\n", messageType, string(message))
			fmt.Printf("Connection Params: %s\n", c.Params("id"))
			//将连接添加到客户端列表中
		case c := <-unregister:
			//从客户端列表中移除连接
			c.Close()
			fmt.Printf("Closed connection\n")

		}

	}
}

我遇到的问题是,当我连接到 WebSocket 时,我的注册的 select case 没有被触发(我希望使用之前提供给客户端的 UUID 将客户端连接注册到一个映射中)。

client.go

package main

import (
	"flag"
	"log"
	"net/url"

	"github.com/fasthttp/websocket"
)

type Client struct {
	C *websocket.Conn
}

func main() {
	addr := flag.String("addr", "localhost:8080", "http service address")
	u := url.URL{
		Scheme:   "ws",
		Host:     *addr,
		Path:     "/ws/monitor",
		RawQuery: "id=12",
	}
	wsClient := &Client{}

	log.Printf("connecting to %s\n", u.String())
	// 连接到 WebSocket 服务器
	conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil)
	if err != nil {
		log.Fatal("Dial:", err)
	}
	wsClient.C = conn
	if resp != nil {
		log.Println("Got response:", resp)
	}
	defer wsClient.closeConn()
}

func (client *Client) closeConn() {
	err := client.C.WriteMessage(
		websocket.CloseMessage,
		websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
	)
	if err != nil {
		log.Println("Write close:", err)
		return
	}
	client.C.Close()
	log.Println("Connection closed")
}

handler.go 中是否有什么我忽略的地方,或者我在连接到服务器时是否应该采取不同的方法?

英文:

I'm trying to set up a simple websocket server that should serve the client some content at unknown intervals.

My code currently looks like this:

router.go

func SetupRoutes(app *fiber.App) error {
app.Get(&quot;/whop/validate&quot;, handler.HandleWhopValidate)
/*Other non-websocket routes*/
/*...*/
app.Get(&quot;/ws/monitor&quot;, websocket.New(wsHandler.HandleWsMonitor))
app.Use(func(c *fiber.Ctx) error {
c.SendStatus(404)
return c.Next()
})
return nil
}

handler.go

package handlers
import (
&quot;fmt&quot;
&quot;log&quot;
&quot;github.com/gofiber/websocket/v2&quot;
)
var register = make(chan *websocket.Conn)
var unregister = make(chan *websocket.Conn)
func HandleWsMonitor(c *websocket.Conn) {
go SocketListener()
defer func() {
unregister &lt;- c
//may need to check whether connection is already closed before re-closing?
c.Close()
}()
//sends conn into channel
register &lt;- c
for {
messageType, message, err := c.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Println(&quot;read error:&quot;, err)
}
return
}
if messageType == websocket.TextMessage {
log.Println(&quot;got textmessage:&quot;, string(message))
} else {
log.Println(&quot;received message of type:&quot;, messageType)
}
}
}
func SocketListener() {
for {
select {
case c := &lt;-register:
messageType, message, err := c.ReadMessage()
if err != nil {
log.Println(err)
unregister &lt;- c
return
}
fmt.Printf(&quot;Got message of type: %d\nMessage:%s\n&quot;, messageType, string(message))
fmt.Printf(&quot;Connection Params: %s\n&quot;, c.Params(&quot;id&quot;))
//append to list of co
case c := &lt;-unregister:
//remove conection from list of clients
c.Close()
fmt.Printf(&quot;Closed connection\n&quot;)
}
}
}

The issue I'm having is that when I connect to the websocket, my select case for register is not hit (I'd want to register the client connection to a map using a uuid that is previously provided to the client).

client.go

package main
import (
&quot;flag&quot;
&quot;log&quot;
&quot;net/url&quot;
&quot;github.com/fasthttp/websocket&quot;
)
type Client struct {
C *websocket.Conn
}
func main() {
addr := flag.String(&quot;addr&quot;, &quot;localhost:8080&quot;, &quot;http service address&quot;)
u := url.URL{
Scheme:   &quot;ws&quot;,
Host:     *addr,
Path:     &quot;/ws/monitor&quot;,
RawQuery: &quot;id=12&quot;,
}
wsClient := &amp;Client{}
log.Printf(&quot;connecting to %s\n&quot;, u.String())
// Connect to the WebSocket server
conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatal(&quot;Dial:&quot;, err)
}
wsClient.C = conn
if resp != nil {
log.Println(&quot;Got response:&quot;, resp)
}
defer wsClient.closeConn()
}
func (client *Client) closeConn() {
err := client.C.WriteMessage(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, &quot;&quot;),
)
if err != nil {
log.Println(&quot;Write close:&quot;, err)
return
}
client.C.Close()
log.Println(&quot;Connection closed&quot;)
}

Is there something I'm missing in handler.go or should i just be taking a different approach when connecting to the server with my client?

答案1

得分: 1

根据我的测试,注册选择(register)确实被触发了(我附上了我使用的代码)。

但是我在代码中发现了其他问题:

  1. unregister通道是无缓冲的,在SocketListener中的unregister <- c会被阻塞。当代码执行到unregister <- c时,它与case c := <-unregister之间发生了死锁。
  2. 看起来我们只需要一个SocketListener的goroutine来处理整个服务器。如果是这样的话,它应该移出HandleWsMonitor函数之外。
  3. HandleWsMonitorSocketListener都从连接中读取数据。SocketListener的责任是什么?看起来它不应该从连接中读取数据。

再仔细思考一下,似乎你可以直接在HandleWsMonitor中向映射表中添加连接和删除连接。完全可以移除SocketListener。在设计中,简洁性应该是一个关键目标。参考KISS原则

package main

import (
	"log"

	"github.com/gofiber/fiber/v2"
	"github.com/gofiber/websocket/v2"
)

var (
	register   = make(chan *websocket.Conn)
	unregister = make(chan *websocket.Conn)
)

func main() {
	// 让我们更容易找到打印日志的行。
	log.SetFlags(log.Lshortfile)
	app := fiber.New()

	app.Get("/ws/monitor", websocket.New(HandleWsMonitor))

	log.Fatal(app.Listen(":8080"))
}

func HandleWsMonitor(c *websocket.Conn) {
	// 看起来我们只需要一个SocketListener的goroutine来处理整个服务器。
	// 如果是这样的话,下一行应该移出这个函数之外。
	go SocketListener()
	defer func() {
		unregister <- c
		c.Close()
	}()

	register <- c
	for {
		messageType, message, err := c.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Println("read error:", err)
			}
			return
		}
		if messageType == websocket.TextMessage {
			log.Println("got textmessage:", string(message))
		} else {
			log.Println("received message of type:", messageType)
		}
	}
}

func SocketListener() {
	for {
		select {
		case c := <-register:
			// 这在日志中出现了。
			log.Println("case c := <-register")
			messageType, message, err := c.ReadMessage()
			if err != nil {
				log.Println(err)
				// unregister是无缓冲的,发送操作会被阻塞。
				unregister <- c
				// 如果我们只使用一个SocketListener的goroutine,那么它不应该在这里返回。
				return
			}

			log.Printf("Got message of type: %d\nMessage:%s\n", messageType, string(message))
			log.Printf("Connection Params: %s\n", c.Params("id"))
		case c := <-unregister:
			c.Close()
			log.Println("Closed connection")

		}
	}
}
英文:

The select case for register did hit according to my test (the code I used is attached to the bottom of this answer).

But I found other issues in the code:

  1. The unregister chan is unbuffered, and unregister &lt;- c in SocketListener will be blocked. When the code reaches unregister &lt;- c, there is a deadlock between it and case c := &lt;-unregister.
  2. It seems we only need one SocketListener goroutine for the whole server. If this is the case, it should be moved outside of HandleWsMonitor.
  3. Both HandleWsMonitor and SocketListener read from the connection. What's the responsibility of SocketListener? It seems that it should not read from the connection.

Thinking it more, it seems that you can add connections to and delete connections from the map in HandleWsMonitor directly. SocketListener can be removed entirely. Simplicity should be a key goal in design. See the KISS principle.

package main

import (
	&quot;log&quot;

	&quot;github.com/gofiber/fiber/v2&quot;
	&quot;github.com/gofiber/websocket/v2&quot;
)

var (
	register   = make(chan *websocket.Conn)
	unregister = make(chan *websocket.Conn)
)

func main() {
	// Make it easy to find out which line prints the log.
	log.SetFlags(log.Lshortfile)
	app := fiber.New()

	app.Get(&quot;/ws/monitor&quot;, websocket.New(HandleWsMonitor))

	log.Fatal(app.Listen(&quot;:8080&quot;))
}

func HandleWsMonitor(c *websocket.Conn) {
	// It seems the we only need one SocketListener goroutine for the whole server.
	// If this is the case, the next line should be moved outside of this func.
	go SocketListener()
	defer func() {
		unregister &lt;- c
		c.Close()
	}()

	register &lt;- c
	for {
		messageType, message, err := c.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Println(&quot;read error:&quot;, err)
			}
			return
		}
		if messageType == websocket.TextMessage {
			log.Println(&quot;got textmessage:&quot;, string(message))
		} else {
			log.Println(&quot;received message of type:&quot;, messageType)
		}
	}
}

func SocketListener() {
	for {
		select {
		case c := &lt;-register:
			// This did appear in the log.
			log.Println(&quot;case c := &lt;-register&quot;)
			messageType, message, err := c.ReadMessage()
			if err != nil {
				log.Println(err)
				// unregister is unbuffered, the sending will be blocked.
				unregister &lt;- c
				// If we use only one SocketListener goroutine then it should
				// not return here.
				return
			}

			log.Printf(&quot;Got message of type: %d\nMessage:%s\n&quot;, messageType, string(message))
			log.Printf(&quot;Connection Params: %s\n&quot;, c.Params(&quot;id&quot;))
		case c := &lt;-unregister:
			c.Close()
			log.Println(&quot;Closed connection&quot;)

		}
	}
}

huangapple
  • 本文由 发表于 2023年4月28日 20:35:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/76129643.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定