英文:
Shared resources with channels in google go
问题
我正在查看使用Google Go
语言构建实时系统时,发现通过通道共享资源有些困惑。为了理解,我尝试让两个不同的goroutine
对共享值进行相同次数的递增和递减操作,最终结果为0。我知道我的代码是错误的,但我还没有完全掌握它。有人可以解释一下这里有什么问题吗?
package main
import (
. "fmt"
. "runtime"
)
func increment(c chan int) {
for x := 0; x < 10; x++ {
a := <-c
a++
c <- a
}
}
func decrement(c chan int) {
for x := 0; x < 10; x++ {
a := <-c
a--
c <- a
}
}
func main() {
GOMAXPROCS(NumCPU())
c := make(chan int)
go increment(c)
go decrement(c)
Println(<-c)
}
我可以使用互斥锁或信号量,类似于在C
或Python
中所做的方式,但我想利用Go
中的通道。
**更新
添加一个WaitGroup
会改变程序的流程吗?我添加了一个WaitGroup
,它运行得很好。不过,我在整个for
循环之后添加了Done()
函数,那么整个increment
函数会在decrement
函数之前运行吗?我希望它们能够“并行”运行,尽可能独立于彼此运行。
英文:
I am taking a look at the Google Go
language as I am building a realtime system, and I find the sharing of resources through channels a bit confusing. I'm trying for the sake of understanding, to let to different goroutines
to increment and decrement a shared value the same number of times, ending up at 0. I do know my code is wrong, but I'm not really getting the hang of it. Anybody care to explain what's wrong here?
package main
import (
. "fmt"
. "runtime"
)
func increment(c chan int) {
for x := 0; x < 10; x++ {
a := <-c
a++
c <- a
}
}
func decrement(c chan int) {
for x := 0; x < 10; x++ {
a := <-c
a--
c <- a
}
}
func main() {
GOMAXPROCS(NumCPU())
c := make(chan int)
go increment(c)
go decrement(c)
Println(<-c)
}
I could use a mutex or a semaphore similar to what I would do using C
or Python
, although I want to take advantage of the channels in Go
.
> **UPDATE
Would adding a WaitGroup
change the program flow? I added a WaitGroup
, and it worked well. Although, I added the Done()
function after the whole for loop, will then the whole increment
run before decrement
? I kind of want them to go 'in parallel' as far as it can, I know that only one routine can access I, but I want them to run independent of each other.
答案1
得分: 2
你的代码存在一些问题:
-
两个goroutine同时尝试从通道中读取数据。这会导致死锁,因为通道中没有数据可读取。
-
Println(<-c)
只读取通道中的一个值,而不是一个结果。如果你等待两个goroutine都完成,它可能会读取到一个结果,但这需要添加一个WaitGroup
。WaitGroup类似于信号量,允许每个goroutine递减一个待处理goroutine的计数器,并允许调用者等待它们完成某个任务。 -
由于发送操作会阻塞,如果没有读取者,而读取操作会阻塞,如果没有发送者,并且你首先等待两个goroutine完成,然后进行了一次多余的读取(Println读取),你需要一个
带缓冲的通道
,它在缓冲区中有一个额外的位置。 -
你需要在通道中推送一个初始值以启动进程。
我对你的代码进行了一些修改,现在这个示例可以工作了(请注意,它实际上不是增量->减量->增量->...,而是增量->增量->...->减量->减量->...,直到完成为止)。
package main
import (
. "fmt"
. "runtime"
"sync"
)
func increment(c chan int, wg *sync.WaitGroup) {
for x := 0; x < 10; x++ {
a := <-c
Println("增量读取 ", a)
a++
c <- a
}
Println("增量完成!")
wg.Done()
}
func decrement(c chan int, wg *sync.WaitGroup) {
for x := 0; x < 10; x++ {
a := <-c
Println("减量读取 ", a)
a--
c <- a
}
Println("减量完成!")
wg.Done()
}
func main() {
GOMAXPROCS(NumCPU())
// 创建一个带有1个额外空间的带缓冲通道。这意味着如果没有读取者,你可以向其中发送一个额外的值,以便将最终结果推送到println中
c := make(chan int, 1)
// 创建一个等待组,以便在读取结果之前等待两个goroutine完成
wg := sync.WaitGroup{}
wg.Add(1) // 标记一个已启动
go increment(c, &wg)
wg.Add(1) // 标记另一个已启动。我们也可以使用Add(2)
go decrement(c, &wg)
// 现在我们将初始值推送到通道中,启动对话
c <- 0
// 让我们等待它们完成...
wg.Wait()
// 现在我们在通道的缓冲区中有了结果
Println("总计:", <-c)
}
希望对你有帮助!
英文:
There are a few problems with your code:
-
Both goroutines try to read from the channel at the same time. This is a deadlock as there is nothing in the channel to read.
-
Println(<-c)
reads one value from the channel, not a result. It might read a result if you waited for both goroutines to finish, but that requires adding aWaitGroup
. a Waitgroup is like a semaphore allowing each goroutine to decrement a counter of pending goroutines, and allowing the caller to wait for them to finish some task. -
Since sending blocks if there is no reader and reading is blocking if there's no sender, and you're a. waiting for both goroutines to finish first and b. doing one more read than writes (the Println read), you need a
buffered channel
, that has exactly one extra place in the buffer. -
You need to push an initial value in the channel for the process to start.
I've changed your code a bit and this example now works (although notice that it's not realy increment->decrement->increment->.... but rathter increment->increment->...->decrement->decrement->.... until we're done.
package main
import (
. "fmt"
. "runtime"
"sync"
)
func increment(c chan int, wg *sync.WaitGroup) {
for x := 0; x < 10; x++ {
a := <-c
Println("increment read ", a)
a++
c <- a
}
Println("Incrment done!")
wg.Done()
}
func decrement(c chan int, wg *sync.WaitGroup) {
for x := 0; x < 10; x++ {
a := <-c
Println("Decrement read ", a)
a--
c <- a
}
Println("Dencrment done!")
wg.Done()
}
func main() {
GOMAXPROCS(NumCPU())
//we create a buffered channel with 1 extra space. This means
//you can send one extra value into it if there is no reader, allowing for the final result to be pushed to println
c := make(chan int, 1)
//we create a wait group so we can wait for both goroutines to finish before reading the result
wg := sync.WaitGroup{}
wg.Add(1) //mark one started
go increment(c, &wg)
wg.Add(1) //mark another one started. We can just do Add(2) BTW
go decrement(c, &wg)
//now we push the initial value to the channel, starting the dialog
c <- 0
//let's wait for them to finish...
wg.Wait()
//now we have the result in the channel's buffer
Println("Total: ", <-c )
}
答案2
得分: 1
这是一个完整的示例,展示了我认为你在谈论的共享状态引擎的类型。
请注意,在你的编辑中建议使用WaitGroup
来同步两个通道。
PS:不要使用import . "fmt"
,这被认为是不好的实践。
package main
import (
"fmt"
"runtime"
"sync"
)
// 引擎的命令
const (
INC = iota
DEC
ANSWER
QUIT
)
// 引擎接收命令并对一些共享状态进行操作
func engine(c chan int, reply chan int) {
counter := 0
for {
switch <-c {
case INC:
counter++
case DEC:
counter--
case ANSWER:
reply <- counter
case QUIT:
reply <- counter
return
}
}
}
// 增加n次,然后通过waitgroup信号完成
func increment(n int, c chan int, wg *sync.WaitGroup) {
defer wg.Done()
for x := 0; x < n; x++ {
c <- INC
}
}
// 减少n次,然后信号完成
func decrement(n int, c chan int, wg *sync.WaitGroup) {
defer wg.Done()
for x := 0; x < n; x++ {
c <- DEC
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
// 启动引擎
c := make(chan int)
reply := make(chan int)
go engine(c, reply)
// 进行加法和减法,并等待它们完成
wg := new(sync.WaitGroup)
wg.Add(2)
go increment(101, c, wg)
go decrement(100, c, wg)
wg.Wait()
// 读取答案
c <- ANSWER
fmt.Printf("总数是 %d\n", <-reply)
// 停止引擎
c <- QUIT
<-reply
fmt.Printf("全部完成\n")
}
这里是一个完整的示例,展示了我认为你在谈论的共享状态引擎的类型。
请注意,在你的编辑中建议使用WaitGroup
来同步两个通道。
PS:不要使用import . "fmt"
,这被认为是不好的实践。
英文:
Here is a complete example of the kind of shared state engine that I think you are talking about
Note use of WaitGroup
as you suggested in your edit to synchronise the two channels.
PS don't use import . "fmt"
it is considered to be bad practice.
package main
import (
"fmt"
"runtime"
"sync"
)
// Commands for the engine
const (
INC = iota
DEC
ANSWER
QUIT
)
// Engine which takes commands and acts on some shared state
func engine(c chan int, reply chan int) {
counter := 0
for {
switch <-c {
case INC:
counter++
case DEC:
counter--
case ANSWER:
reply <- counter
case QUIT:
reply <- counter
return
}
}
}
// Add n times then signal done via the waitgroup
func increment(n int, c chan int, wg *sync.WaitGroup) {
defer wg.Done()
for x := 0; x < n; x++ {
c <- INC
}
}
// Subtract n times then signal done
func decrement(n int, c chan int, wg *sync.WaitGroup) {
defer wg.Done()
for x := 0; x < n; x++ {
c <- DEC
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
// Start the engine
c := make(chan int)
reply := make(chan int)
go engine(c, reply)
// Do adding and subtracting and wait for them to finish
wg := new(sync.WaitGroup)
wg.Add(2)
go increment(101, c, wg)
go decrement(100, c, wg)
wg.Wait()
// Read the answer
c <- ANSWER
fmt.Printf("Total is %d\n", <-reply)
// Stop the engine
c <- QUIT
<-reply
fmt.Printf("All done\n")
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论