英文:
Deadlock with buffered channel
问题
我有一些代码,它是一个作业调度程序,从许多TCP套接字中汇总大量数据。这段代码是对https://stackoverflow.com/questions/41748628/large-number-of-transient-objects-avoiding-contention/41749224?noredirect=1#comment70692534_41749224的一种方法的结果,它在很大程度上工作正常,CPU使用率大大降低,锁定也不再是问题。
不时地,我的应用程序会锁定,而“Channel length”日志是唯一一直重复的内容,因为数据仍然从我的套接字中进来。然而,计数仍然保持在5000,没有进行下游处理。
我认为问题可能是竞争条件,可能会在jobDispatcher
的select
中的channel <- msg
这一行上出现问题。问题是我无法弄清楚如何验证这一点。
我怀疑,由于select可以随机接收项目,goroutine正在返回,而shutdownChan没有机会处理。然后数据到达inboundFromTCP并被阻塞!
也许有人会发现这里有一些非常明显的错误,并提供一个解决方案!?
var MessageQueue = make(chan *trackingPacket_v1, 5000)
func init() {
go jobDispatcher(MessageQueue)
}
func addMessage(trackingPacket *trackingPacket_v1) {
// Send the packet to the buffered queue!
log.Println("Channel length:", len(MessageQueue))
MessageQueue <- trackingPacket
}
func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) {
var channelMap = make(map[string]chan *trackingPacket_v1)
// Channel that listens for the strings that want to exit
shutdownChan := make(chan string)
for {
select {
case msg := <-inboundFromTCP:
log.Println("Got packet", msg.Avr)
channel, ok := channelMap[msg.Avr]
if !ok {
packetChan := make(chan *trackingPacket_v1)
channelMap[msg.Avr] = packetChan
go processPackets(packetChan, shutdownChan, msg.Avr)
packetChan <- msg
continue
}
channel <- msg
case shutdownString := <-shutdownChan:
log.Println("Shutting down:", shutdownString)
channel, ok := channelMap[shutdownString]
if ok {
delete(channelMap, shutdownString)
close(channel)
}
}
}
}
func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id string) {
var messages = []*trackingPacket_v1{}
tickChan := time.NewTicker(time.Second * 1)
defer tickChan.Stop()
hasCheckedData := false
for {
select {
case msg := <-ch:
log.Println("Got a messages for", id)
messages = append(messages, msg)
hasCheckedData = false
case <-tickChan.C:
messages = cullChanMessages(messages)
if len(messages) == 0 {
messages = nil
shutdown <- id
return
}
// No point running checking when packets have not changed!!
if hasCheckedData == false {
processMLATCandidatesFromChan(messages)
hasCheckedData = true
}
case <-time.After(time.Duration(time.Second * 60)):
log.Println("This channel has been around for 60 seconds which is too much, kill it")
messages = nil
shutdown <- id
return
}
}
}
更新 01/20/16
我尝试使用channelMap
作为全局变量,并进行了一些互斥锁定,但最终仍然发生了死锁。
稍微调整了代码,仍然会发生死锁,但我不明白这个问题出在哪里!
https://play.golang.org/p/PGpISU4XBJ
更新 01/21/17
根据一些建议,我将其放入了一个独立的工作示例中,以便人们可以查看。https://play.golang.org/p/88zT7hBLeD
这是一个长时间运行的进程,所以需要在本地机器上运行,因为playground会终止它。希望这能帮助找到问题的根源!
英文:
I have some code that is a job dispatcher and is collating a large amount of data from lots of TCP sockets. This code is a result of an approach to https://stackoverflow.com/questions/41748628/large-number-of-transient-objects-avoiding-contention/41749224?noredirect=1#comment70692534_41749224 and it largely works with CPU usage down a huge amount and locking not an issue now either.
From time to time my application locks up and the "Channel length" log is the only thing that keeps repeating as data is still coming in from my sockets. However the count remains at 5000 and no downstream processing is taking place.
I think the issue might be a race condition and the line it is possibly getting hung up on is channel <- msg
within the select
of the jobDispatcher
. Trouble is I can't work out how to verify this.
I suspect that as select can take items at random the goroutine is returning and the shutdownChan doesn't have a chance to process. Then data hits inboundFromTCP and it blocks!
Someone might spot something really obviously wrong here. And offer a solution hopefully!?
var MessageQueue = make(chan *trackingPacket_v1, 5000)
func init() {
go jobDispatcher(MessageQueue)
}
func addMessage(trackingPacket *trackingPacket_v1) {
// Send the packet to the buffered queue!
log.Println("Channel length:", len(MessageQueue))
MessageQueue <- trackingPacket
}
func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) {
var channelMap = make(map[string]chan *trackingPacket_v1)
// Channel that listens for the strings that want to exit
shutdownChan := make(chan string)
for {
select {
case msg := <-inboundFromTCP:
log.Println("Got packet", msg.Avr)
channel, ok := channelMap[msg.Avr]
if !ok {
packetChan := make(chan *trackingPacket_v1)
channelMap[msg.Avr] = packetChan
go processPackets(packetChan, shutdownChan, msg.Avr)
packetChan <- msg
continue
}
channel <- msg
case shutdownString := <-shutdownChan:
log.Println("Shutting down:", shutdownString)
channel, ok := channelMap[shutdownString]
if ok {
delete(channelMap, shutdownString)
close(channel)
}
}
}
}
func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id string) {
var messages = []*trackingPacket_v1{}
tickChan := time.NewTicker(time.Second * 1)
defer tickChan.Stop()
hasCheckedData := false
for {
select {
case msg := <-ch:
log.Println("Got a messages for", id)
messages = append(messages, msg)
hasCheckedData = false
case <-tickChan.C:
messages = cullChanMessages(messages)
if len(messages) == 0 {
messages = nil
shutdown <- id
return
}
// No point running checking when packets have not changed!!
if hasCheckedData == false {
processMLATCandidatesFromChan(messages)
hasCheckedData = true
}
case <-time.After(time.Duration(time.Second * 60)):
log.Println("This channel has been around for 60 seconds which is too much, kill it")
messages = nil
shutdown <- id
return
}
}
}
Update 01/20/16
I tried to rework with the channelMap
as a global with some mutex locking but it ended up deadlocking still.
Slightly tweaked the code, still locks but I don't see how this one does!!
https://play.golang.org/p/PGpISU4XBJ
Update 01/21/17
After some recommendations I put this into a standalone working example so people can see. https://play.golang.org/p/88zT7hBLeD
It is a long running process so will need running locally on a machine as the playground kills it. Hopefully this will help get to the bottom of it!
答案1
得分: 2
我猜测你的问题是在执行channel <- msg
的同时,另一个goroutine正在执行shutdown <- id
时出现了阻塞。
由于channel
和shutdown
通道都没有缓冲区,它们会阻塞等待接收者。它们可能会因为等待另一侧可用而发生死锁。
有几种方法可以解决这个问题。你可以将这两个通道都声明为带有缓冲区大小为1的通道。
或者,你可以像Google的context包那样,通过关闭shutdown通道来发送关闭信号,而不是通过发送关闭消息来发出信号。可以参考https://golang.org/pkg/context/,特别是WithCancel
、WithDeadline
和Done
函数。
你可以尝试使用context来替代自己的关闭通道和超时代码。
JimB提到了一个关于在goroutine可能仍在接收通道消息时关闭它的观点。你应该发送关闭消息(或关闭、取消上下文),并继续处理消息,直到你的ch
通道关闭(通过case msg, ok := <-ch:
检测),这将在发送者接收到关闭消息后发生。
这样,你就可以获取到在实际发生关闭之前接收到的所有消息,并且应该避免第二次死锁。
英文:
I'm guessing that your problem is getting stuck doing this channel <- msg
at the same time as the other goroutine is doing shutdown <- id
.
Since neither the channel
nor the shutdown
channels are buffered, they block waiting for a receiver. And they can deadlock waiting for the other side to become available.
There are a couple of ways to fix it. You could declare both of those channels with a buffer of 1.
Or instead of signalling by sending a shutdown message, you could do what Google's context package does and send a shutdown signal by closing the shutdown channel. Look at https://golang.org/pkg/context/ especially WithCancel
, WithDeadline
and the Done
functions.
You might be able to use context to remove your own shutdown channel and timeout code.
And JimB has a point about shutting down the goroutine while it might still be receiving on the channel. What you should do is send the shutdown message (or close, or cancel the context) and continue to process messages until your ch
channel is closed (detect that with case msg, ok := <-ch:
), which would happen after the shutdown is received by the sender.
That way you get all of the messages that were incoming until the shutdown actually happened, and should avoid a second deadlock.
答案2
得分: -2
我是新手学习Go语言,对于这段代码有些疑问:
case msg := <-inboundFromTCP:
log.Println("Got packet", msg.Avr)
channel, ok := channelMap[msg.Avr]
if !ok {
packetChan := make(chan *trackingPacket_v1)
channelMap[msg.Avr] = packetChan
go processPackets(packetChan, shutdownChan, msg.Avr)
packetChan <- msg
continue
}
channel <- msg
在这里,你是不是将一些东西放入了通道(无缓冲的通道?)
channel, ok := channelMap[msg.Avr]
所以,在你添加消息之前,你不需要清空该通道吗?
channel <- msg
就像我说的,我对Go语言还不熟悉,希望我没有说错。
英文:
I'm new to Go but in this code here
case msg := <-inboundFromTCP:
log.Println("Got packet", msg.Avr)
channel, ok := channelMap[msg.Avr]
if !ok {
packetChan := make(chan *trackingPacket_v1)
channelMap[msg.Avr] = packetChan
go processPackets(packetChan, shutdownChan, msg.Avr)
packetChan <- msg
continue
}
channel <- msg
Aren't you putting something in channel (unbuffered?) here
channel, ok := channelMap[msg.Avr]
So wouldn't you need to empty out that channel before you can add the msg here?
channel <- msg
Like I said, I'm new to Go so I hope I'm not being goofy.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论