在Google Go中,共享资源与通道相关。

huangapple go评论102阅读模式
英文:

Shared resources with channels in google go

问题

我正在查看使用Google Go语言构建实时系统时,发现通过通道共享资源有些困惑。为了理解,我尝试让两个不同的goroutine对共享值进行相同次数的递增和递减操作,最终结果为0。我知道我的代码是错误的,但我还没有完全掌握它。有人可以解释一下这里有什么问题吗?

package main

import (
	. "fmt"
	. "runtime"
)

func increment(c chan int) {
	for x := 0; x < 10; x++ {
		a := <-c
		a++
		c <- a
	}
}

func decrement(c chan int) {
	for x := 0; x < 10; x++ {
		a := <-c
		a--
		c <- a
	}
}

func main() {
	GOMAXPROCS(NumCPU())
	c := make(chan int)
	go increment(c)
	go decrement(c)
	Println(<-c)
}

我可以使用互斥锁或信号量,类似于在CPython中所做的方式,但我想利用Go中的通道。

**更新

添加一个WaitGroup会改变程序的流程吗?我添加了一个WaitGroup,它运行得很好。不过,我在整个for循环之后添加了Done()函数,那么整个increment函数会在decrement函数之前运行吗?我希望它们能够“并行”运行,尽可能独立于彼此运行。

英文:

I am taking a look at the Google Go language as I am building a realtime system, and I find the sharing of resources through channels a bit confusing. I'm trying for the sake of understanding, to let to different goroutines to increment and decrement a shared value the same number of times, ending up at 0. I do know my code is wrong, but I'm not really getting the hang of it. Anybody care to explain what's wrong here?

package main

import (
	. &quot;fmt&quot;
	. &quot;runtime&quot;
)

func increment(c chan int) {
	for x := 0; x &lt; 10; x++ {
		a := &lt;-c
		a++
		c &lt;- a
	}
}

func decrement(c chan int) {
	for x := 0; x &lt; 10; x++ {
		a := &lt;-c
		a--
		c &lt;- a
	}
}

func main() {
	GOMAXPROCS(NumCPU())
	c := make(chan int)
	go increment(c)
	go decrement(c)
	Println(&lt;-c)
}

I could use a mutex or a semaphore similar to what I would do using C or Python, although I want to take advantage of the channels in Go.

> **UPDATE

Would adding a WaitGroup change the program flow? I added a WaitGroup, and it worked well. Although, I added the Done() function after the whole for loop, will then the whole increment run before decrement? I kind of want them to go 'in parallel' as far as it can, I know that only one routine can access I, but I want them to run independent of each other.

答案1

得分: 2

你的代码存在一些问题:

  1. 两个goroutine同时尝试从通道中读取数据。这会导致死锁,因为通道中没有数据可读取。

  2. Println(&lt;-c)只读取通道中的一个值,而不是一个结果。如果你等待两个goroutine都完成,它可能会读取到一个结果,但这需要添加一个WaitGroup。WaitGroup类似于信号量,允许每个goroutine递减一个待处理goroutine的计数器,并允许调用者等待它们完成某个任务。

  3. 由于发送操作会阻塞,如果没有读取者,而读取操作会阻塞,如果没有发送者,并且你首先等待两个goroutine完成,然后进行了一次多余的读取(Println读取),你需要一个带缓冲的通道,它在缓冲区中有一个额外的位置。

  4. 你需要在通道中推送一个初始值以启动进程。

我对你的代码进行了一些修改,现在这个示例可以工作了(请注意,它实际上不是增量->减量->增量->...,而是增量->增量->...->减量->减量->...,直到完成为止)。

package main

import (
	. "fmt"
	. "runtime"
	"sync"
)

func increment(c chan int, wg *sync.WaitGroup) {
	for x := 0; x < 10; x++ {
		a := <-c
		Println("增量读取 ", a)
		a++
		c <- a
	}
	Println("增量完成!")
	wg.Done()
}

func decrement(c chan int, wg *sync.WaitGroup) {
	for x := 0; x < 10; x++ {
		a := <-c
		Println("减量读取 ", a)
		a--
		c <- a
	}
	Println("减量完成!")
	wg.Done()
}

func main() {
	GOMAXPROCS(NumCPU())

	// 创建一个带有1个额外空间的带缓冲通道。这意味着如果没有读取者,你可以向其中发送一个额外的值,以便将最终结果推送到println中
	c := make(chan int, 1)

	// 创建一个等待组,以便在读取结果之前等待两个goroutine完成
	wg := sync.WaitGroup{}
	wg.Add(1) // 标记一个已启动
	go increment(c, &wg)
	wg.Add(1) // 标记另一个已启动。我们也可以使用Add(2)
	go decrement(c, &wg)

	// 现在我们将初始值推送到通道中,启动对话
	c <- 0

	// 让我们等待它们完成...
	wg.Wait()

	// 现在我们在通道的缓冲区中有了结果
	Println("总计:", <-c)
}

希望对你有帮助!

英文:

There are a few problems with your code:

  1. Both goroutines try to read from the channel at the same time. This is a deadlock as there is nothing in the channel to read.

  2. Println(&lt;-c) reads one value from the channel, not a result. It might read a result if you waited for both goroutines to finish, but that requires adding a WaitGroup. a Waitgroup is like a semaphore allowing each goroutine to decrement a counter of pending goroutines, and allowing the caller to wait for them to finish some task.

  3. Since sending blocks if there is no reader and reading is blocking if there's no sender, and you're a. waiting for both goroutines to finish first and b. doing one more read than writes (the Println read), you need a buffered channel, that has exactly one extra place in the buffer.

  4. You need to push an initial value in the channel for the process to start.

I've changed your code a bit and this example now works (although notice that it's not realy increment->decrement->increment->.... but rathter increment->increment->...->decrement->decrement->.... until we're done.

package main

import (
	. &quot;fmt&quot;
	. &quot;runtime&quot;
	&quot;sync&quot;
)

func increment(c chan int, wg *sync.WaitGroup) {
	for x := 0; x &lt; 10; x++ {
		a := &lt;-c
		Println(&quot;increment read &quot;, a)
		a++
		c &lt;- a
	}
	Println(&quot;Incrment done!&quot;)
	wg.Done()
}

func decrement(c chan int, wg *sync.WaitGroup) {
	for x := 0; x &lt; 10; x++ {
		a := &lt;-c
		Println(&quot;Decrement read &quot;, a)		
		a--
		c &lt;- a
	}
	Println(&quot;Dencrment done!&quot;)	
	wg.Done()
}

func main() {
	GOMAXPROCS(NumCPU())
	
	//we create a buffered channel with 1 extra space. This means 
	//you can send one extra value into it if there is no reader, allowing for the final result to be pushed to println
	c := make(chan int, 1)
	
	//we create a wait group so we can wait for both goroutines to finish before reading the result
	wg := sync.WaitGroup{}
	wg.Add(1) //mark one started
	go increment(c, &amp;wg)
	wg.Add(1) //mark another one started. We can just do Add(2) BTW
	go decrement(c, &amp;wg)
	
	//now we push the initial value to the channel, starting the dialog
	c &lt;- 0
	
	//let&#39;s wait for them to finish...
	wg.Wait()

	//now we have the result in the channel&#39;s buffer
	Println(&quot;Total: &quot;, &lt;-c )
}

答案2

得分: 1

这是一个完整的示例,展示了我认为你在谈论的共享状态引擎的类型。

请注意,在你的编辑中建议使用WaitGroup来同步两个通道。

PS:不要使用import . "fmt",这被认为是不好的实践。

package main

import (
	"fmt"
	"runtime"
	"sync"
)

// 引擎的命令
const (
	INC = iota
	DEC
	ANSWER
	QUIT
)

// 引擎接收命令并对一些共享状态进行操作
func engine(c chan int, reply chan int) {
	counter := 0
	for {
		switch <-c {
		case INC:
			counter++
		case DEC:
			counter--
		case ANSWER:
			reply <- counter
		case QUIT:
			reply <- counter
			return

		}
	}
}

// 增加n次,然后通过waitgroup信号完成
func increment(n int, c chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for x := 0; x < n; x++ {
		c <- INC
	}
}

// 减少n次,然后信号完成
func decrement(n int, c chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for x := 0; x < n; x++ {
		c <- DEC
	}
}

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())

	// 启动引擎
	c := make(chan int)
	reply := make(chan int)
	go engine(c, reply)

	// 进行加法和减法,并等待它们完成
	wg := new(sync.WaitGroup)
	wg.Add(2)
	go increment(101, c, wg)
	go decrement(100, c, wg)
	wg.Wait()

	// 读取答案
	c <- ANSWER
	fmt.Printf("总数是 %d\n", <-reply)

	// 停止引擎
	c <- QUIT
	<-reply
	fmt.Printf("全部完成\n")
}

这里是一个完整的示例,展示了我认为你在谈论的共享状态引擎的类型。

请注意,在你的编辑中建议使用WaitGroup来同步两个通道。

PS:不要使用import . "fmt",这被认为是不好的实践。

英文:

Here is a complete example of the kind of shared state engine that I think you are talking about

Note use of WaitGroup as you suggested in your edit to synchronise the two channels.

PS don't use import . &quot;fmt&quot; it is considered to be bad practice.

package main
import (
&quot;fmt&quot;
&quot;runtime&quot;
&quot;sync&quot;
)
// Commands for the engine
const (
INC = iota
DEC
ANSWER
QUIT
)
// Engine which takes commands and acts on some shared state
func engine(c chan int, reply chan int) {
counter := 0
for {
switch &lt;-c {
case INC:
counter++
case DEC:
counter--
case ANSWER:
reply &lt;- counter
case QUIT:
reply &lt;- counter
return
}
}
}
// Add n times then signal done via the waitgroup
func increment(n int, c chan int, wg *sync.WaitGroup) {
defer wg.Done()
for x := 0; x &lt; n; x++ {
c &lt;- INC
}
}
// Subtract n times then signal done
func decrement(n int, c chan int, wg *sync.WaitGroup) {
defer wg.Done()
for x := 0; x &lt; n; x++ {
c &lt;- DEC
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
// Start the engine
c := make(chan int)
reply := make(chan int)
go engine(c, reply)
// Do adding and subtracting and wait for them to finish
wg := new(sync.WaitGroup)
wg.Add(2)
go increment(101, c, wg)
go decrement(100, c, wg)
wg.Wait()
// Read the answer
c &lt;- ANSWER
fmt.Printf(&quot;Total is %d\n&quot;, &lt;-reply)
// Stop the engine
c &lt;- QUIT
&lt;-reply
fmt.Printf(&quot;All done\n&quot;)
}

huangapple
  • 本文由 发表于 2014年1月17日 14:42:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/21179473.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定