如何修复Gorilla WebSocket中的缓冲写入SIGTERM问题?

huangapple go评论82阅读模式
英文:

How to fix buffered write sigterm in gorilla websocket?

问题

我正在使用Go语言编写一个WebSocket客户端,它发送控制消息以请求数据流,并使用自定义消息处理程序处理传入的数据。我认为这应该是相当标准的,但与此同时,我已经调试了相当长的时间。

该应用程序使用多个WebSocket处理不同的数据流,因此我将WebSocket客户端封装到一个单独的结构体中以隔离一切。

代码清单非常冗长,所以我用一个gist提供了一个自包含的测试

然而,关键部分是下面显示的结构体初始化和写入函数。

// WebSocketClient 返回WebSocket客户端连接
type WebSocketClient struct {
	url       string
	sendBuf   chan []byte
	ctx       context.Context
	ctxCancel context.CancelFunc
	mu        sync.RWMutex
	wsconn    *websocket.Conn
}

// NewWebSocketClient 创建新的WebSocket连接
func NewWebSocketClient(url string, messageHandler WsHandler, errorHandler WsErrHandler) (*WebSocketClient, error) {
	client := new(WebSocketClient)
	client.ctx, client.ctxCancel = context.WithCancel(context.Background())
	client.url = url
	client.sendBuf = make(chan []byte, 1)

	go client.listen(messageHandler, errorHandler)
	go client.listenWrite()
	go client.ping()
	return client, nil
}

...
// Write 将数据写入WebSocket服务器
func (conn *WebSocketClient) Write(data []byte) error {
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*150)
	defer cancel()

	for {
		select {
		case conn.sendBuf <- data: // : runtime error: invalid memory address or nil pointer dereference
			return nil
		case <-ctx.Done():
			return fmt.Errorf("上下文已取消")
		}
	}
}

func (conn *WebSocketClient) listenWrite() {
	for data := range conn.sendBuf {
		ws := conn.Connect()
		if ws == nil {
			err := fmt.Errorf("conn.ws为nil")
			logError(err)
			log.Println("listenWrite 没有WebSocket连接")
			continue
		}

		if err := ws.WriteMessage(
			websocket.TextMessage,
			data,
		); err != nil {
			log.Println("listenWrite", nil, "WebSocket写入错误")
		}
	}
}

我观察到的奇怪现象是:

  • 如果互斥锁和发送缓冲区是结构体的一部分,我会得到各种信号终止错误。

  • 当互斥锁和发送缓冲区在结构体外部声明为变量时,读取和写入工作得很好,但关闭连接会导致信号终止错误。

我有点困惑,没有实际的解释,但因为客户端嵌入到一个微服务中,一个正确的连接关闭将是受欢迎的。

另一件事是,如测试中所示,我经常收到以下警告:

"使用已关闭的网络连接,无法读取WebSocket消息"

老实说,我认为这段代码一开始就不正确,但我还没有找到一个更简单和更健壮的解决方案。

有关如何编写一个能够在多个实例中运行,并且每个实例都可以处理与不同WebSocket的读写连接的稳健WebSocket客户端的指导,请指点一下。

英文:

I'm writing a websocket client in Go that sends out a control message to request a data stream and then processes the incoming data using a custom message hander. I thought this would be pretty standard, but meanwhile I am debugging this for quite some time.

The app uses multiple websockets for handling different streams, therefore I have encapsulated the web socket client into a separate struct to isolate everything.

The code listing is quite verbose, so I made a gist with a self contained test.

The critical part though are the struct init & write function, as shown below.


// WebSocketClient return websocket client connection
type WebSocketClient struct {
	url       string
	sendBuf   chan []byte
	ctx       context.Context
	ctxCancel context.CancelFunc
	mu        sync.RWMutex
	wsconn    *websocket.Conn
}

// NewWebSocketClient create new websocket connection
func NewWebSocketClient(url string, messageHandler WsHandler, errorHandler WsErrHandler) (*WebSocketClient, error) {
	client := new(WebSocketClient)
	client.ctx, client.ctxCancel = context.WithCancel(context.Background())
	client.url = url
	client.sendBuf = make(chan []byte, 1)

	go client.listen(messageHandler, errorHandler)
	go client.listenWrite()
	go client.ping()
	return client, nil
}

...
// Write data to the websocket server
func (conn *WebSocketClient) Write(data []byte) error {
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*150)
	defer cancel()

	for {
		select {
		case conn.sendBuf &lt;- data: // : runtime error: invalid memory address or nil pointer dereference
			return nil
		case &lt;-ctx.Done():
			return fmt.Errorf(&quot;context canceled&quot;)
		}
	}
}

func (conn *WebSocketClient) listenWrite() {
	for data := range conn.sendBuf {
		ws := conn.Connect()
		if ws == nil {
			err := fmt.Errorf(&quot;conn.ws is nil&quot;)
			logError(err)
			log.Println(&quot;listenWrite No websocket connection&quot;)
			continue
		}

		if err := ws.WriteMessage(
			websocket.TextMessage,
			data,
		); err != nil {
			log.Println(&quot;listenWrite&quot;, nil, &quot;WebSocket Write Error&quot;)
		}
	}
}

The strange thing I am observing:

  • If the mutex & send buffer are part of the struct, I get sigterms all over the place

  • When the mutex & send buffer are declared as var outside the struct, reading & writing works nicely, but closing the connection ends with a sigtmerm....

I am bit perplex and have no practical explanation, but because the client gets embedded into a microservice, a proper connection close would be welcome.

The other thing is, as seen in the test, I get frequently the following warning;

" use of closed network connection Cannot read websocket message"

Honestly, I think this code isn't right in the first place, but I haven't found a simpler and more robust solution yet.

Any pointers towards how write a robust web socket client that can run in multiple instances with each one handling read/ write connection to a different web sockets?

答案1

得分: 1

一些事情:

它在函数顶部获取了锁,但是在返回点处锁没有被释放。只需将以下代码放在函数顶部:

conn.mu.Lock()
defer conn.mu.Unlock()

当你在测试中关闭客户端时,这可能会取消连接上下文等操作。但是在监听循环中,上下文取消只在连接/读取/处理之外进行轮询:

for {
	select {
	case <-conn.ctx.Done():
		return
	case <-ticker.C:
		for {
			// 获取连接
            // 获取消息
            // 处理消息
		}
	}
}

如果你想避免看到连接关闭错误消息,请像这样重新检查上下文取消:

ws := conn.Connect()
if ws == nil {
	return
}

_, bytMsg, err := ws.ReadMessage()
if err != nil {

    // 再次轮询上下文取消 - 因为这可能是错误的真正原因
    select {
    case <-conn.ctx.Done():
	    return
    default:
    }

    errorHandler(err)
    log.Println("listen", err, "Cannot read websocket message")
    conn.closeWs()
	return
}
英文:

A couple of things:

it acquires the lock at the top of the function - but there's return points where the lock will not be released. Just put this at the top of the function:

conn.mu.Lock()
defer conn.mu.Unlock()

When you close the client in your test - this will presumable cancel the connnection context etc. - but in your listener loop the context cancelation is only polled outside the connection/read/processing:

for {
	select {
	case &lt;-conn.ctx.Done():
		return
	case &lt;-ticker.C:
		for {
			// get conn
            // get msg
            // process msg
		}
	}
}

if you want to avoid seeing a connection close error message - recheck the context cancelation like so:

ws := conn.Connect()
if ws == nil {
	return
}

_, bytMsg, err := ws.ReadMessage()
if err != nil {

    // poll context cancelation again - as this may be the real reason for the error
    select {
    case &lt;-conn.ctx.Done():
	    return
    default:
    }

    errorHandler(err)
    log.Println(&quot;listen&quot;, err, &quot;Cannot read websocket message&quot;)
    conn.closeWs()
	return
}

huangapple
  • 本文由 发表于 2021年8月26日 18:32:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/68936923.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定