英文:
Golang concurrency access to slice
问题
我的用例:
- 在主进程中将
items
(小结构体)追加到slice
中 - 每100个项目,我想在一个
processor
协程中处理项目(然后从slice
中弹出它们) - 项目以非常快的速度连续到达
我了解到,如果有至少两个协程在使用一个变量(在我的情况下是slice
)进行写操作,那么必须处理并发(使用互斥锁或类似方法)。
我的问题:
- 如果我不使用互斥锁处理对
slice
的读写操作,是否会有问题?(例如,当processor
正在处理1-100号项目时,第101号项目到达) - 对于输入项目流保持“流畅”,最好的并发技术是什么?
免责声明:
- 我不想要任何事件排队,我需要按给定的“捆绑”大小处理项目。
英文:
My use case:
- append
items
(small struct) to aslice
in the main process - every 100 items I want to process items in a
processor
go routine (then pop them fromslice
) - items comme in very fast continuously
I read that if there is at least one "write" in more then two goroutines using a variable (slice
in my case), one shall handle the concurrency (mutex or similar).
My questions:
- If I do not handle with a mutex the r/w on
slice
do I risk problems ? (ie. Item 101 arrives while theprocessor
is working on 1-100s) - What is the best concurrency technique for the incoming item flow to remain "fluent" ?
Disclaimer:
- I do not want any event queueing, I need to process items in a given "bundle" size
答案1
得分: 3
实际上,在这里你不需要锁。以下是一个可行的代码:
package main
import (
"fmt"
"sync"
)
type myStruct struct {
Cpt int
}
func main() {
buf := make([]myStruct, 0, 100)
wg := sync.WaitGroup{}
// 主进程
// 追加一百万次
for i := 0; i < 10e6; i++ {
// 锁定缓冲区
// 追加
buf = append(buf, myStruct{Cpt: i})
// 是否达到100个项目?
if len(buf) >= 100 {
// 是的,创建一个从缓冲区中获取的切片
processSlice := make([]myStruct, 100)
copy(processSlice, buf[0:100])
// 清空缓冲区
buf = buf[:0]
// 并行运行处理器
// 将一个元素添加到等待组
wg.Add(1)
go processor(&wg, processSlice)
}
}
// 等待所有处理器完成
wg.Wait()
}
func processor(wg *sync.WaitGroup, processSlice []myStruct) {
// 处理完成后从等待组中移除一个元素
defer wg.Done()
// 执行一些处理
fmt.Printf("正在处理从%d到%d的项目\n", processSlice[0].Cpt, processSlice[99].Cpt)
}
关于你的问题和这个解决方案,有几点需要注意:
-
如果你想要在喂养过程中最小化停止时间(例如,尽快响应HTTP调用),那么最简单的方法就是只复制部分,并在go例程中运行处理器函数。通过这样做,你需要动态创建一个唯一的处理切片,并将缓冲区的内容复制到其中。
-
sync.WaitGroup
对象用于确保最后一个处理器函数在退出程序之前已经结束。
请注意,这不是一个完美的解决方案:如果你长时间运行这个模式,并且输入数据的速度比处理器处理切片的速度快100倍以上,那么会出现以下情况:
- 内存中会有越来越多的 processSlice 实例 -> 有可能填满内存并触发交换
- 并行处理器的 goroutine 会越来越多 -> 同样的内存风险,并且需要在同一时间处理更多的任务,导致每个调用变慢,问题会自我放大。
这最终会导致系统崩溃。
解决这个问题的方法是有限的工作线程数,以确保不会崩溃。然而,当工作线程数已满时,喂养过程将会等待,这并不符合你的要求。然而,这是一个很好的解决方案,可以吸收强度随时间变化的负载。
总的来说,只要记住,如果你提供的数据超过了你可以同时处理的数据量,你的程序最终会达到一个极限,无法处理,因此必须减慢输入获取速度或崩溃。这是数学问题!
英文:
Actually you don't need a lock here. Here is a working code:
package main
import (
"fmt"
"sync"
)
type myStruct struct {
Cpt int
}
func main() {
buf := make([]myStruct, 0, 100)
wg := sync.WaitGroup{}
// Main process
// Appending one million times
for i := 0; i < 10e6; i++ {
// Locking buffer
// Appending
buf = append(buf, myStruct{Cpt: i})
// Did we reach 100 items ?
if len(buf) >= 100 {
// Yes we did. Creating a slice from the buffer
processSlice := make([]myStruct, 100)
copy(processSlice, buf[0:100])
// Emptying buffer
buf = buf[:0]
// Running processor in parallel
// Adding one element to waitgroup
wg.Add(1)
go processor(&wg, processSlice)
}
}
// Waiting for all processors to finish
wg.Wait()
}
func processor(wg *sync.WaitGroup, processSlice []myStruct) {
// Removing one element to waitgroup when done
defer wg.Done()
// Doing some process
fmt.Printf("Procesing items from %d to %d\n", processSlice[0].Cpt, processSlice[99].Cpt)
}
A few notes about your problem and this solution:
-
If you want a minimal stop time in your feeding process (e.g, to respond as fast as possible to a HTTP call), then the minimal thing to do is just the copy part, and run the processor function in a go routine. By doing so, you have to create a unique process slice dynamically and copying the content of your buffer inside it.
-
The
sync.WaitGroup
object is needed to ensure that the last processor function has ended before exiting the program.
Note that this is not a perfect solution: If you run this pattern for a long time, and the input data comes in more than 100 times faster than the processor processes the slices, then there are going to be:
- More and more processSlice instances in RAM -> Risks for filling the RAM and hitting the swap
- More and more parallel processor goroutines -> Same risks for the RAM, and more to process in the same time, making each of the calls be slower and the problem gets self-feeding.
This will end up in the system crashing at some point.
The solution for this is to have a limited number of workers that ensures there is no crash. However, when this number of workers is fully busy, then there will be wait in the feeding process, which does not answer what you want. However this is a good solution to absorb a charge which intensity is changing in time.
In general, just remember that if you feed more data than you can process in the same time, your program will just reach a limit at some point where it can't handle it so it has to slow down input acquisition or crash. This is mathematical!
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论