英文:
Go concurrent slice access
问题
我在Go中进行一些流处理,并且在尝试找出如何在“Go方式”下进行此操作时遇到了困难,而不使用锁。
这个虚构的例子展示了我面临的问题。
- 我们一次获取一个
thing
。 - 有一个goroutine将它们缓冲到名为
things
的切片中。 - 当
things
变满时,即len(things) == 100
,它会被某种方式处理并重置。 - 在
things
变满之前,有n
个并发的goroutine需要访问things
。 - 从其他goroutine访问“不完整”的
things
是不可预测的。 doSomethingWithPartial
和doSomethingWithComplete
都不需要改变things
。
代码:
var m sync.Mutex
var count int64
things := make([]int64, 0, 100)
// 不断生成和使用数据的切片
go func() {
for {
m.Lock()
if len(things) == 100 {
// doSomethingWithComplete不修改things
doSomethingWithComplete(things)
things = make([]int64, 0, 100)
}
things = append(things, count)
m.Unlock()
count++
}
}()
// doSomethingWithPartial需要在它们准备好之前访问things
for {
m.Lock()
// doSomethingWithPartial不修改things
doSomethingWithPartial(things)
m.Unlock()
}
-
我知道切片是不可变的,那么我可以删除互斥锁并期望它仍然工作吗(我假设不行)。
-
我该如何重构代码以使用通道而不是互斥锁。
编辑: 这是我想出的不使用互斥锁的解决方案
package main
import (
"fmt"
"sync"
"time"
)
func Incrementor() chan int {
ch := make(chan int)
go func() {
count := 0
for {
ch <- count
count++
}
}()
return ch
}
type Foo struct {
things []int
requests chan chan []int
stream chan int
C chan []int
}
func NewFoo() *Foo {
foo := &Foo{
things: make([]int, 0, 100),
requests: make(chan chan []int),
stream: Incrementor(),
C: make(chan []int),
}
go foo.Launch()
return foo
}
func (f *Foo) Launch() {
for {
select {
case ch := <-f.requests:
ch <- f.things
case thing := <-f.stream:
if len(f.things) == 100 {
f.C <- f.things
f.things = make([]int, 0, 100)
}
f.things = append(f.things, thing)
}
}
}
func (f *Foo) Things() []int {
ch := make(chan []int)
f.requests <- ch
return <-ch
}
func main() {
foo := NewFoo()
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Millisecond * time.Duration(i) * 100)
things := foo.Things()
fmt.Println("got things:", len(things))
wg.Done()
}(i)
}
go func() {
for _ = range foo.C {
// 对things做一些操作
}
}()
wg.Wait()
}
英文:
I'm doing some stream processing in Go and got stuck trying to figure out how to do this the "Go way" without locks.
This contrived example shows the problem I'm facing.
- We get one
thing
at a time. - There is a goroutine which buffers them into a slice called
things
. - When
things
becomes fulllen(things) == 100
then it is processed somehow and reset - There are
n
number of concurrent goroutines that need to accessthings
before it's full - Access to the "incomplete"
things
from other goroutines is not predictable. - Neither
doSomethingWithPartial
nordoSomethingWithComplete
needs to mutatethings
Code:
var m sync.Mutex
var count int64
things := make([]int64, 0, 100)
// slices of data are constantly being generated and used
go func() {
for {
m.Lock()
if len(things) == 100 {
// doSomethingWithComplete does not modify things
doSomethingWithComplete(things)
things = make([]int64, 0, 100)
}
things = append(things, count)
m.Unlock()
count++
}
}()
// doSomethingWithPartial needs to access the things before they're ready
for {
m.Lock()
// doSomethingWithPartial does not modify things
doSomethingWithPartial(things)
m.Unlock()
}
-
<s>I know that slices are immutable so does that mean I can remove the mutex and expect it to still work (I assume no)</s>.
-
How can I refactor this to use channels instead of a mutex.
Edit: Here's the solution I came up with that does not use a mutex
package main
import (
"fmt"
"sync"
"time"
)
func Incrementor() chan int {
ch := make(chan int)
go func() {
count := 0
for {
ch <- count
count++
}
}()
return ch
}
type Foo struct {
things []int
requests chan chan []int
stream chan int
C chan []int
}
func NewFoo() *Foo {
foo := &Foo{
things: make([]int, 0, 100),
requests: make(chan chan []int),
stream: Incrementor(),
C: make(chan []int),
}
go foo.Launch()
return foo
}
func (f *Foo) Launch() {
for {
select {
case ch := <-f.requests:
ch <- f.things
case thing := <-f.stream:
if len(f.things) == 100 {
f.C <- f.things
f.things = make([]int, 0, 100)
}
f.things = append(f.things, thing)
}
}
}
func (f *Foo) Things() []int {
ch := make(chan []int)
f.requests <- ch
return <-ch
}
func main() {
foo := NewFoo()
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Millisecond * time.Duration(i) * 100)
things := foo.Things()
fmt.Println("got things:", len(things))
wg.Done()
}(i)
}
go func() {
for _ = range foo.C {
// do something with things
}
}()
wg.Wait()
}
答案1
得分: 1
应该注意到,“Go way”可能只是使用互斥锁来解决这个问题。使用通道来解决这个问题可能更有趣,但是对于这个特定的问题来说,互斥锁可能更简单和更容易理解。
英文:
It should be noted that the "Go way" is probably just to use a mutex for this. It's fun to work out how to do it with a channel but a mutex is probably simpler and easier to reason about for this particular problem.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论