英文:
How to fix buffered write sigterm in gorilla websocket?
问题
我正在使用Go语言编写一个WebSocket客户端,它发送控制消息以请求数据流,并使用自定义消息处理程序处理传入的数据。我认为这应该是相当标准的,但与此同时,我已经调试了相当长的时间。
该应用程序使用多个WebSocket处理不同的数据流,因此我将WebSocket客户端封装到一个单独的结构体中以隔离一切。
代码清单非常冗长,所以我用一个gist提供了一个自包含的测试。
然而,关键部分是下面显示的结构体初始化和写入函数。
// WebSocketClient 返回WebSocket客户端连接
type WebSocketClient struct {
url string
sendBuf chan []byte
ctx context.Context
ctxCancel context.CancelFunc
mu sync.RWMutex
wsconn *websocket.Conn
}
// NewWebSocketClient 创建新的WebSocket连接
func NewWebSocketClient(url string, messageHandler WsHandler, errorHandler WsErrHandler) (*WebSocketClient, error) {
client := new(WebSocketClient)
client.ctx, client.ctxCancel = context.WithCancel(context.Background())
client.url = url
client.sendBuf = make(chan []byte, 1)
go client.listen(messageHandler, errorHandler)
go client.listenWrite()
go client.ping()
return client, nil
}
...
// Write 将数据写入WebSocket服务器
func (conn *WebSocketClient) Write(data []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*150)
defer cancel()
for {
select {
case conn.sendBuf <- data: // : runtime error: invalid memory address or nil pointer dereference
return nil
case <-ctx.Done():
return fmt.Errorf("上下文已取消")
}
}
}
func (conn *WebSocketClient) listenWrite() {
for data := range conn.sendBuf {
ws := conn.Connect()
if ws == nil {
err := fmt.Errorf("conn.ws为nil")
logError(err)
log.Println("listenWrite 没有WebSocket连接")
continue
}
if err := ws.WriteMessage(
websocket.TextMessage,
data,
); err != nil {
log.Println("listenWrite", nil, "WebSocket写入错误")
}
}
}
我观察到的奇怪现象是:
-
如果互斥锁和发送缓冲区是结构体的一部分,我会得到各种信号终止错误。
-
当互斥锁和发送缓冲区在结构体外部声明为变量时,读取和写入工作得很好,但关闭连接会导致信号终止错误。
我有点困惑,没有实际的解释,但因为客户端嵌入到一个微服务中,一个正确的连接关闭将是受欢迎的。
另一件事是,如测试中所示,我经常收到以下警告:
"使用已关闭的网络连接,无法读取WebSocket消息"
老实说,我认为这段代码一开始就不正确,但我还没有找到一个更简单和更健壮的解决方案。
有关如何编写一个能够在多个实例中运行,并且每个实例都可以处理与不同WebSocket的读写连接的稳健WebSocket客户端的指导,请指点一下。
英文:
I'm writing a websocket client in Go that sends out a control message to request a data stream and then processes the incoming data using a custom message hander. I thought this would be pretty standard, but meanwhile I am debugging this for quite some time.
The app uses multiple websockets for handling different streams, therefore I have encapsulated the web socket client into a separate struct to isolate everything.
The code listing is quite verbose, so I made a gist with a self contained test.
The critical part though are the struct init & write function, as shown below.
// WebSocketClient return websocket client connection
type WebSocketClient struct {
url string
sendBuf chan []byte
ctx context.Context
ctxCancel context.CancelFunc
mu sync.RWMutex
wsconn *websocket.Conn
}
// NewWebSocketClient create new websocket connection
func NewWebSocketClient(url string, messageHandler WsHandler, errorHandler WsErrHandler) (*WebSocketClient, error) {
client := new(WebSocketClient)
client.ctx, client.ctxCancel = context.WithCancel(context.Background())
client.url = url
client.sendBuf = make(chan []byte, 1)
go client.listen(messageHandler, errorHandler)
go client.listenWrite()
go client.ping()
return client, nil
}
...
// Write data to the websocket server
func (conn *WebSocketClient) Write(data []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*150)
defer cancel()
for {
select {
case conn.sendBuf <- data: // : runtime error: invalid memory address or nil pointer dereference
return nil
case <-ctx.Done():
return fmt.Errorf("context canceled")
}
}
}
func (conn *WebSocketClient) listenWrite() {
for data := range conn.sendBuf {
ws := conn.Connect()
if ws == nil {
err := fmt.Errorf("conn.ws is nil")
logError(err)
log.Println("listenWrite No websocket connection")
continue
}
if err := ws.WriteMessage(
websocket.TextMessage,
data,
); err != nil {
log.Println("listenWrite", nil, "WebSocket Write Error")
}
}
}
The strange thing I am observing:
-
If the mutex & send buffer are part of the struct, I get sigterms all over the place
-
When the mutex & send buffer are declared as var outside the struct, reading & writing works nicely, but closing the connection ends with a sigtmerm....
I am bit perplex and have no practical explanation, but because the client gets embedded into a microservice, a proper connection close would be welcome.
The other thing is, as seen in the test, I get frequently the following warning;
" use of closed network connection Cannot read websocket message"
Honestly, I think this code isn't right in the first place, but I haven't found a simpler and more robust solution yet.
Any pointers towards how write a robust web socket client that can run in multiple instances with each one handling read/ write connection to a different web sockets?
答案1
得分: 1
一些事情:
- closeWS 错误使用了
Mutex
它在函数顶部获取了锁,但是在返回点处锁没有被释放。只需将以下代码放在函数顶部:
conn.mu.Lock()
defer conn.mu.Unlock()
当你在测试中关闭客户端时,这可能会取消连接上下文等操作。但是在监听循环中,上下文取消只在连接/读取/处理之外进行轮询:
for {
select {
case <-conn.ctx.Done():
return
case <-ticker.C:
for {
// 获取连接
// 获取消息
// 处理消息
}
}
}
如果你想避免看到连接关闭错误消息,请像这样重新检查上下文取消:
ws := conn.Connect()
if ws == nil {
return
}
_, bytMsg, err := ws.ReadMessage()
if err != nil {
// 再次轮询上下文取消 - 因为这可能是错误的真正原因
select {
case <-conn.ctx.Done():
return
default:
}
errorHandler(err)
log.Println("listen", err, "Cannot read websocket message")
conn.closeWs()
return
}
英文:
A couple of things:
- closeWS misuses the
Mutex
it acquires the lock at the top of the function - but there's return points where the lock will not be released. Just put this at the top of the function:
conn.mu.Lock()
defer conn.mu.Unlock()
When you close the client in your test - this will presumable cancel the connnection context etc. - but in your listener loop the context cancelation is only polled outside the connection/read/processing:
for {
select {
case <-conn.ctx.Done():
return
case <-ticker.C:
for {
// get conn
// get msg
// process msg
}
}
}
if you want to avoid seeing a connection close error message - recheck the context cancelation like so:
ws := conn.Connect()
if ws == nil {
return
}
_, bytMsg, err := ws.ReadMessage()
if err != nil {
// poll context cancelation again - as this may be the real reason for the error
select {
case <-conn.ctx.Done():
return
default:
}
errorHandler(err)
log.Println("listen", err, "Cannot read websocket message")
conn.closeWs()
return
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论