Goroutine安全的通道关闭实际上并不关闭websocket。

huangapple go评论95阅读模式
英文:

Goroutine safe channel close doesn't actually close webscoket

问题

这是一个让我相当困扰的棘手问题。

基本上,我编写了一个集成微服务,使用Go客户端从币安加密货币交易所提供数据流。客户端发送启动消息,开始为某个符号启动数据流,并在某个时刻发送关闭消息以停止数据流。我的实现基本上是这样的:


func (c BinanceClient) StartDataStream(clientType bn.ClientType, symbol, interval string) error {
	
	switch clientType {

	case bn.SPOT_LIVE:
		wsKlineHandler := c.handlers.klineHandler.SpotKlineHandler
		wsErrHandler := c.handlers.klineHandler.ErrHandler

		_, stopC, err := binance.WsKlineServe(symbol, interval, wsKlineHandler, wsErrHandler)
		if err != nil {
			fmt.Println(err)
			return err
		} else {
			c.state.clientSymChanMap[clientType][symbol] = stopC
			return nil
		}
  ... 
}

clientSymChanMap 将 stopChannel 存储在一个嵌套的哈希映射中,以便我以后可以检索 stop channel 来停止数据源。相应地,已经实现了停止函数:


func (c BinanceClient) StopDataStream(clientType bn.ClientType, symbol string) {
	//mtd := "StopDataStream: "

	stopC := c.state.clientSymChanMap[clientType][symbol]

	if isClosed(stopC) {
		DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
	} else {
		close(stopC)
	}
	// Delete  channel from the map otherwise the next StopAll throws a NPE due to closing a dead channel
	delete(c.state.clientSymChanMap[clientType], symbol)
	return
}

为了防止已经关闭的通道引发 panic,我使用了一个检查函数,如果通道已经关闭,则返回 true。


func isClosed(ch <-chan struct{}) bool {
	select {
	case <-ch:
		return true
	default:
	}
	return false
}

看起来很好,但有一个问题。当我只为一个符号运行代码时,它按预期开始和关闭数据源。

然而,当启动多个数据源时,上述代码却永远不会关闭 websocket,而是一直持续流式传输数据。如果没有 isClosed 检查,我会得到尝试关闭已关闭通道的 panic,但是有了这个检查,嗯,什么都不会关闭。

当查看上述 binance.WsKlineServe 函数的实现时,很明显它只是在每次调用时包装一个新的 websocket,然后返回 done 和 stop 通道。

文档给出了以下用法示例:


wsKlineHandler := func(event *binance.WsKlineEvent) {
    fmt.Println(event)
}
errHandler := func(err error) {
    fmt.Println(err)
}
doneC, stopC, err := binance.WsKlineServe("LTCBTC", "1m", wsKlineHandler, errHandler)
if err != nil {
    fmt.Println(err)
    return
}
<-doneC 

因为 doneC 通道实际上会阻塞,所以我将其删除,并认为存储 stopC 通道,然后稍后使用它来停止数据源会起作用。然而,它只对单个实例起作用。当打开多个数据流时,这种方法就不起作用了。

有什么想法是什么原因,以及如何修复它?

英文:

This one is a tricky issue that bugs me quite a bit.

Essentially, I wrote an integration microservice that provides data streams from Binance crypto exchange using the Go client. A client sends a start messages, starts data stream for a symbol, and at some point, sends a close message to stop the stream. My implementation looks basically like this:


func (c BinanceClient) StartDataStream(clientType bn.ClientType, symbol, interval string) error {
	
	switch clientType {

	case bn.SPOT_LIVE:
		wsKlineHandler := c.handlers.klineHandler.SpotKlineHandler
		wsErrHandler := c.handlers.klineHandler.ErrHandler

		_, stopC, err := binance.WsKlineServe(symbol, interval, wsKlineHandler, wsErrHandler)
		if err != nil {
			fmt.Println(err)
			return err
		} else {
			c.state.clientSymChanMap[clientType][symbol] = stopC
			return nil
		}
  ... 
}

The clientSymChanMap stores the stopChannel in a nested hashmap so that I can retrieve the stop channel later to stop the data feed. The stop function has been implemented accordingly:


func (c BinanceClient) StopDataStream(clientType bn.ClientType, symbol string) {
	//mtd := "StopDataStream: "

	stopC := c.state.clientSymChanMap[clientType][symbol]

	if isClosed(stopC) {
		DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
	} else {
		close(stopC)
	}
	// Delete  channel from the map otherwise the next StopAll throws a NPE due to closing a dead channel
	delete(c.state.clientSymChanMap[clientType], symbol)
	return
}

To prevent panics from already closed channels, I use a check function that returns true in case the channel is already close.


func isClosed(ch <-chan struct{}) bool {
	select {
	case <-ch:
		return true
	default:
	}
	return false
}

Looks nice, but has a catch. When I run the code with starting data for just one symbol, it starts and closes the datafeed exactly as expected.

However, when starting multiple data feeds, then the above code somehow never closes the websocket and just keeps streaming data forever. Without the isClosed check, I get panics of trying to close a closed channel, but with the check in place, well, nothing gets closed.

When looking at the implementation of the above binance.WsKlineServe function, it's quite obvious that it just wraps a new websocket with each invocation and then returns the done & stop channel.

The documentation gives the following usage example:


wsKlineHandler := func(event *binance.WsKlineEvent) {
    fmt.Println(event)
}
errHandler := func(err error) {
    fmt.Println(err)
}
doneC, stopC, err := binance.WsKlineServe("LTCBTC", "1m", wsKlineHandler, errHandler)
if err != nil {
    fmt.Println(err)
    return
}
<-doneC 

Because the doneC channel actually blocks, I removed it and thought that storing the stopC channel and then use it later to stop the datafeed would work. However, it only does so for one single instance. When multiple streams are open, this doesn't work anymore.

Any idea what that's the case and how to fix it?

答案1

得分: 3

首先,这段代码存在危险:

if isClosed(stopC) {
    DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
} else {
    close(stopC) // <- 无法确定通道是否仍然打开
}

在你对通道状态进行轮询检查之后,不能保证在代码的下一行中通道仍然处于相同的状态。因此,如果该代码被并发调用,理论上可能会导致恐慌。


如果你希望在通道关闭时发生异步操作,最好从自己的 goroutine 中显式执行此操作。你可以尝试以下代码:

go func() {
    stopC := c.state.clientSymChanMap[clientType][symbol]
    <-stopC
    // stopC 现在肯定已关闭
    delete(c.state.clientSymChanMap[clientType], symbol)
}()

附注:你需要在 map 上使用某种互斥锁,因为删除操作是异步的,你需要确保对 map 的任何添加操作不会与此操作发生数据竞争。

附注:当通道超出作用域时,GC 会回收它们。如果你不再从中读取数据,它们不需要显式关闭以便被 GC 回收。

英文:

Firstly, this is dangerous:

if isClosed(stopC) {
    DbgPrint(&quot; Channel is already closed. Do nothing for: &quot; + symbol)
} else {
    close(stopC) // &lt;- can&#39;t be sure channel is still open
}

there is no guarantee that after your polling check of the channel state, that the channel will still be in that same state in the next line of code. So this code could in theory could panic if it's called concurrently.


If you want an asynchronous action to occur on the channel close - it's best to do this explicitly from its own goroutine. So you could try this:

go func() {

    stopC := c.state.clientSymChanMap[clientType][symbol]
    &lt;-stopC
    // stopC definitely closed now
    delete(c.state.clientSymChanMap[clientType], symbol)
}()

P.S. you do need some sort of mutex on your map, since the delete is asynchronous - you need to ensure any adds to the map don't datarace with this.

P.P.S Channels are reclaimed by the GC when they go out of scope. If you are no longer reading from it - they do not need to be explicitly closed to be reclaimed by the GC.

答案2

得分: 1

使用通道来停止goroutine或关闭某些内容非常棘手。有很多事情你可能做错或忘记做。

context.WithCancel将这种复杂性抽象化,使代码更易读和可维护。

一些代码片段:

ctx, cancel := context.WithCancel(context.TODO())
TheThingToCancel(ctx, ...)

// 每当你想要停止TheThingToCancel时,可以多次调用cancel()
cancel()

然后在一个for循环中,你经常会有这样一个select

for {
	select {
	case <-ctx.Done():
		return
	default:
	}

	// 做一些事情
}

下面是一些更接近你特定情况的代码,即打开连接:

func TheThingToCancel(ctx context.Context) (context.CancelFunc, error) {
	ctx, cancel := context.WithCancel(ctx)

	conn, err := net.Dial("tcp", ":12345")
	if err != nil {
		cancel()
		return nil, err
	}

	go func() {
		<-ctx.Done()
		_ = conn.Close()
	}()

	go func() {
		defer func() {
			_ = conn.Close()
			// 确保上下文始终被取消,以避免goroutine泄漏
			cancel()
		}()

		var bts = make([]byte, 1024)
		for {
			n, err := conn.Read(bts)
			if err != nil {
				return
			}
			fmt.Println(bts[:n])
		}
	}()

	return cancel, nil
}

它返回cancel函数,以便能够从外部关闭它。

可以多次取消上下文,而不会像关闭通道多次那样引发panic。这是一个优点。此外,您可以从其他上下文派生上下文,并通过关闭父上下文来关闭许多不同的上下文,从而停止不同的相关例程。经过精心设计,这对于关闭彼此相关且需要能够单独关闭的不同例程非常有用。

英文:

Using channels for stopping a goroutine or closing something is very tricky. There are lots of things you can do wrong or forget to do.

context.WithCancel abstracts that complexity away, making the code more readable and maintainable.

Some code snippets:

ctx, cancel := context.WitchCancel(context.TODO())
TheThingToCancel(ctx, ...)

// Whenever you want to stop TheThingToCancel. Can be called multiple times.
cancel()

Then in a for loop you'd often have a select like this:

for {
	select {
	case &lt;-ctx.Done():
		return
	default:
	}

	// do stuff
}

Here some code that is closer to your specific case of an open connection:

func TheThingToCancel(ctx context.Context) (context.CancelFunc, error) {
	ctx, cancel := context.WithCancel(ctx)

	conn, err := net.Dial(&quot;tcp&quot;, &quot;:12345&quot;)
	if err != nil {
		cancel()
		return nil, err
	}

	go func() {
		&lt;-ctx.Done()
		_ = conn.Close()
	}()

	go func() {
		defer func() {
			_ = conn.Close()
			// make sure context is always cancelled to avoid goroutine leak
			cancel()
		}()

		var bts = make([]byte, 1024)
		for {
			n, err := conn.Read(bts)
			if err != nil {
				return
			}
			fmt.Println(bts[:n])
		}
	}()

	return cancel, nil
}

It returns the cancel function to be able to close it from the outside.

Cancelling a context can be done many times over without a panic like would occur if a channel is closed multiple times. That is one advantage. Also you can derive contexts from other contexts and thereby close a lot of contexts that all stop different routines by closing a parent context. Carefully designed, this is very powerful for shutting down different routines belonging together that also need to be able to be shut down individually.

huangapple
  • 本文由 发表于 2021年8月31日 18:05:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/68996755.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定