英文:
Large number of transient objects - avoiding contention
问题
我有一个用Go编写的新的TCP服务器,有100多个客户端连接到它上面。每个客户端都会流式传输数据,这些数据需要在中心位置进行查看,因为它们正在查看来自不同位置的无线电数据包,然后对其进行分析。代码是可以工作的,但是我发现在锁定方面存在很多争用和增加的CPU使用率,所以我想知道如何避免锁定(如果可能的话)或者在其周围进行优化。
由于TCP服务器为接收到的每个数据包启动一个GoRoutine,所以addMessage
函数需要一定程度的同步。这些数据包还会在稍后的另一个函数中进行分析,该函数会对映射进行RLock()
操作。
每秒钟调用一次的cullMessages()
函数会陷入自身,导致速度变慢,有时需要2-3秒才能运行完毕,这会加剧问题,因为下一个2-3个操作会排队等待解锁并立即运行!
有任何想法/思路都将不胜感激!
var dataMessagesMutex sync.RWMutex
var dataMessages map[string][]*trackingPacket_v1
// 从每个TCP客户端调用此函数,需要共享此数据
func addMessage(trackingPacket *trackingPacket_v1) {
dataMessagesMutex.Lock()
dataMessages[trackingPacket.packetID] = append(dataMessages[trackingPacket.packetID], trackingPacket)
dataMessagesMutex.Unlock()
}
// 循环调用的函数,需要根据年龄删除
func cullMessages() {
cullTS := time.Now().Add(-time.Second * MODES_MAX_MESSAGE_AGE)
dataMessagesMutex.Lock()
defer dataMessagesMutex.Unlock()
for avr, data := range dataMessages {
sort.Sort(PacketSorter(data))
highestIndex := 0
for i, messages := range data {
if cullTS.Sub(messages.ProcessedTime) > 0 {
// 需要在此处删除消息
messages = nil
highestIndex = i
}
}
// 将新的切片复制到data变量中
data = data[highestIndex+1:]
if len(data) == 0 {
// 空消息,删除
delete(dataMessages, avr)
}
}
}
UPDATE:
添加了分析函数
func processCandidates() {
mlatMessagesMutex.RLock()
defer dataMessagesMutex.RUnlock()
for _, data := range dataMessages {
numberOfMessages := len(data)
for a := 0; a < numberOfMessages; a++ {
packetA := data[a]
applicablePackets := []*trackingPacket_v1{packetA}
for b := 0; b < numberOfMessages; b++ {
// 不要比较相同的数据包
if b == a {
continue
}
packetB := data[b]
// 仅在时间戳阈值内考虑此数据包
tsDelta := math.Abs(packetA.NormalisedTS - packetB.NormalisedTS)
if tsDelta < MAX_MESSAGE_TS_DIFF {
// 最后,我们需要确保每个站点只包含一个消息
stationAlreadyRepresented := false
for i := 0; i < len(applicablePackets); i++ {
if applicablePackets[i].Sharecode == packetB.Sharecode {
stationAlreadyRepresented = true
}
}
if stationAlreadyRepresented == false {
applicablePackets = append(applicablePackets, packetB)
}
}
}
// 删除被认为太接近的站点
if len(applicablePackets) >= MIN_STATIONS_NEEDED {
applicablePackets = cullPackets(applicablePackets)
}
// 只要有足够的数据包...
if len(applicablePackets) >= MIN_STATIONS_NEEDED {
// 为此批次生成哈希...
hash := generateHashForPackets(applicablePackets)
batchIsUnique := true
for _, packet := range applicablePackets {
if packet.containsHash(hash) {
batchIsUnique = false
break
}
}
if batchIsUnique == true {
for _, packet := range applicablePackets {
packet.addHash(hash)
}
go sendOfDataForWork(applicablePackets)
}
}
}
}
}
英文:
I have a new TCP server written in Go that has 100+ clients attached to it. Each client streams in data that needs to be looked at centrally as they are looking at radio packets over the air waves from various locations which then get analysed. The code works but I am seeing a lot of contention and increased CPU around the locking and was after some thoughts on how to avoid the locking (if possible) or optimise around it.
As the TCP server spins up a GoRoutine for each packet received the addMessage
function needs a level of synchronisation. These packets also get analysed in another function later on that does a RLock()
on the map.
It is the cullMessages()
function that gets called once per second that really gets caught up in itself and can really slow down, sometimes taking 2-3 seconds to run which compounds the issue as the next 2-3 operations are queued waiting to unlock and run straight away!
Any ideas/thoughts would be appreciated!
var dataMessagesMutex sync.RWMutex
var dataMessages map[string][]*trackingPacket_v1
// Function is called from each TCP client who need to share this data
func addMessage(trackingPacket *trackingPacket_v1) {
dataMessagesMutex.Lock()
dataMessages[trackingPacket.packetID] = append(dataMessages[trackingPacket.packetID], trackingPacket)
dataMessagesMutex.Unlock()
}
// Function called on a loop, need to delete based on age here
func cullMessages() {
cullTS := time.Now().Add(-time.Second * MODES_MAX_MESSAGE_AGE)
dataMessagesMutex.Lock()
defer dataMessagesMutex.Unlock()
for avr, data := range dataMessages {
sort.Sort(PacketSorter(data))
highestIndex := 0
for i, messages := range data {
if cullTS.Sub(messages.ProcessedTime) > 0 {
// Need to delete the message here
messages = nil
highestIndex = i
}
}
// Copy the new slice into the data variable
data = data[highestIndex+1:]
if len(data) == 0 {
// Empty Messages, delete
delete(dataMessages, avr)
}
}
}
UPDATE:
Added analysis function
func processCandidates() {
mlatMessagesMutex.RLock()
defer dataMessagesMutex.RUnlock()
for _, data := range dataMessages {
numberOfMessages := len(data)
for a := 0; a < numberOfMessages; a++ {
packetA := data[a]
applicablePackets := []*trackingPacket_v1{packetA}
for b := 0; b < numberOfMessages; b++ {
// Don't compare identical packets
if b == a {
continue
}
packetB := data[b]
// Only consider this packet if it's within an acceptable
// timestamp threshold
tsDelta := math.Abs(packetA.NormalisedTS - packetB.NormalisedTS)
if tsDelta < MAX_MESSAGE_TS_DIFF {
// Finally, we need to make sure that only one message per
// station is included in our batch
stationAlreadyRepresented := false
for i := 0; i < len(applicablePackets); i++ {
if applicablePackets[i].Sharecode == packetB.Sharecode {
stationAlreadyRepresented = true
}
}
if stationAlreadyRepresented == false {
applicablePackets = append(applicablePackets, packetB)
}
}
}
// Remove any stations which are deemed too close to one another
if len(applicablePackets) >= MIN_STATIONS_NEEDED {
applicablePackets = cullPackets(applicablePackets)
}
// Provided we still have enough packets....
if len(applicablePackets) >= MIN_STATIONS_NEEDED {
// Generate a hash for this batch...
hash := generateHashForPackets(applicablePackets)
batchIsUnique := true
for _, packet := range applicablePackets {
if packet.containsHash(hash) {
batchIsUnique = false
break
}
}
if batchIsUnique == true {
for _, packet := range applicablePackets {
packet.addHash(hash)
}
go sendOfDataForWork(applicablePackets)
}
}
}
}
}
答案1
得分: 1
不要回答我要翻译的问题。以下是要翻译的内容:
不要使用一个大的地图,而是为每个数据包ID创建一个goroutine。一个调度器goroutine可以拥有一个map[string]chan *trackingPacket_v1
,并将传入的数据包发送到相应的通道上。然后,该数据包ID的goroutine将把数据包收集到一个本地切片中,并在一定时间间隔内对其进行整理和分析。
你需要找到一种方法来终止那些在MODES_MAX_MESSAGE_AGE时间内没有接收到数据包的goroutine。可能调度器goroutine会跟踪每个数据包ID最近一次出现的时间,并定期检查是否有太旧的数据包ID。然后,它会关闭这些通道并从map中删除它们。当分析goroutine发现它的通道已关闭时,它会退出。
英文:
Instead of having one big map, have a goroutine for each packetID. A dispatcher goroutine could have a map[string]chan *trackingPacket_v1
, and send the incoming packets on the appropriate channel. Then the goroutine for that packetID would collect the packets into a local slice, and cull them and analyze them at intervals.
Somehow you would need to terminate the goroutines that haven't received a packet in MODES_MAX_MESSAGE_AGE. Probably the dispatcher goroutine would keep track of when each packetID was most recently seen, and periodically go through and check for ones that were too old. Then it would close those channels and remove them from its map. When the analysis goroutine discovered that its channel had been closed, it would exit.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论