Go并发切片访问

huangapple go评论89阅读模式
英文:

Go concurrent slice access

问题

我在Go中进行一些流处理,并且在尝试找出如何在“Go方式”下进行此操作时遇到了困难,而不使用锁。

这个虚构的例子展示了我面临的问题。

  • 我们一次获取一个thing
  • 有一个goroutine将它们缓冲到名为things的切片中。
  • things变满时,即len(things) == 100,它会被某种方式处理并重置。
  • things变满之前,有n个并发的goroutine需要访问things
  • 从其他goroutine访问“不完整”的things是不可预测的。
  • doSomethingWithPartialdoSomethingWithComplete都不需要改变things

代码:

var m sync.Mutex
var count int64
things := make([]int64, 0, 100)

// 不断生成和使用数据的切片
go func() {
  for {
    m.Lock()
    if len(things) == 100 {
      // doSomethingWithComplete不修改things
      doSomethingWithComplete(things)
      things = make([]int64, 0, 100)
    }
    things = append(things, count)
    m.Unlock()
    count++
  }
}()

// doSomethingWithPartial需要在它们准备好之前访问things
for {
  m.Lock()
  // doSomethingWithPartial不修改things
  doSomethingWithPartial(things)
  m.Unlock()
}
  1. 我知道切片是不可变的,那么我可以删除互斥锁并期望它仍然工作吗(我假设不行)。

  2. 我该如何重构代码以使用通道而不是互斥锁。

编辑: 这是我想出的不使用互斥锁的解决方案

package main

import (
	"fmt"
	"sync"
	"time"
)

func Incrementor() chan int {
	ch := make(chan int)
	go func() {
		count := 0
		for {
			ch <- count
			count++
		}
	}()
	return ch
}

type Foo struct {
	things   []int
	requests chan chan []int
	stream   chan int
	C        chan []int
}

func NewFoo() *Foo {
	foo := &Foo{
		things:   make([]int, 0, 100),
		requests: make(chan chan []int),
		stream:   Incrementor(),
		C:        make(chan []int),
	}
	go foo.Launch()
	return foo
}

func (f *Foo) Launch() {
	for {
		select {
		case ch := <-f.requests:
			ch <- f.things
		case thing := <-f.stream:
			if len(f.things) == 100 {
				f.C <- f.things
				f.things = make([]int, 0, 100)
			}
			f.things = append(f.things, thing)
		}
	}
}

func (f *Foo) Things() []int {
	ch := make(chan []int)
	f.requests <- ch
	return <-ch
}

func main() {

	foo := NewFoo()

	var wg sync.WaitGroup
	wg.Add(10)

	for i := 0; i < 10; i++ {
		go func(i int) {
			time.Sleep(time.Millisecond * time.Duration(i) * 100)
			things := foo.Things()
			fmt.Println("got things:", len(things))
			wg.Done()
		}(i)
	}

	go func() {
		for _ = range foo.C {
			// 对things做一些操作
		}
	}()

	wg.Wait()
}
英文:

I'm doing some stream processing in Go and got stuck trying to figure out how to do this the "Go way" without locks.

This contrived example shows the problem I'm facing.

  • We get one thing at a time.
  • There is a goroutine which buffers them into a slice called things.
  • When things becomes full len(things) == 100 then it is processed somehow and reset
  • There are n number of concurrent goroutines that need to access things before it's full
  • Access to the "incomplete" things from other goroutines is not predictable.
  • Neither doSomethingWithPartial nor doSomethingWithComplete needs to mutate things

Code:

var m sync.Mutex
var count int64
things := make([]int64, 0, 100)

// slices of data are constantly being generated and used
go func() {
  for {
    m.Lock()
    if len(things) == 100 {
      // doSomethingWithComplete does not modify things
      doSomethingWithComplete(things)
      things = make([]int64, 0, 100)
    }
    things = append(things, count)
    m.Unlock()
    count++
  }
}()

// doSomethingWithPartial needs to access the things before they&#39;re ready
for {
  m.Lock()
  // doSomethingWithPartial does not modify things
  doSomethingWithPartial(things)
  m.Unlock()
}
  1. <s>I know that slices are immutable so does that mean I can remove the mutex and expect it to still work (I assume no)</s>.

  2. How can I refactor this to use channels instead of a mutex.

Edit: Here's the solution I came up with that does not use a mutex

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

func Incrementor() chan int {
	ch := make(chan int)
	go func() {
		count := 0
		for {
			ch &lt;- count
			count++
		}
	}()
	return ch
}

type Foo struct {
	things   []int
	requests chan chan []int
	stream   chan int
	C        chan []int
}

func NewFoo() *Foo {
	foo := &amp;Foo{
		things:   make([]int, 0, 100),
		requests: make(chan chan []int),
		stream:   Incrementor(),
		C:        make(chan []int),
	}
	go foo.Launch()
	return foo
}

func (f *Foo) Launch() {
	for {
		select {
		case ch := &lt;-f.requests:
			ch &lt;- f.things
		case thing := &lt;-f.stream:
			if len(f.things) == 100 {
				f.C &lt;- f.things
				f.things = make([]int, 0, 100)
			}
			f.things = append(f.things, thing)
		}
	}
}

func (f *Foo) Things() []int {
	ch := make(chan []int)
	f.requests &lt;- ch
	return &lt;-ch
}

func main() {

	foo := NewFoo()

	var wg sync.WaitGroup
	wg.Add(10)

	for i := 0; i &lt; 10; i++ {
		go func(i int) {
			time.Sleep(time.Millisecond * time.Duration(i) * 100)
			things := foo.Things()
			fmt.Println(&quot;got things:&quot;, len(things))
			wg.Done()
		}(i)
	}

	go func() {
		for _ = range foo.C {
			// do something with things
		}
	}()

	wg.Wait()
}

答案1

得分: 1

应该注意到,“Go way”可能只是使用互斥锁来解决这个问题。使用通道来解决这个问题可能更有趣,但是对于这个特定的问题来说,互斥锁可能更简单和更容易理解。

英文:

It should be noted that the "Go way" is probably just to use a mutex for this. It's fun to work out how to do it with a channel but a mutex is probably simpler and easier to reason about for this particular problem.

huangapple
  • 本文由 发表于 2013年4月19日 00:46:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/16088740.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定