英文:
Channel won't receive message sent from inside Go Routine
问题
我正在尝试更好地理解Go语言中的通道(channel)。
我希望有5个例程(routine)始终在运行。在第一个例程的特定时间点,我想尝试启动另一个例程。假设已经有5个例程在运行,我希望将下一个例程排队,并在其他例程中有一个完成后立即运行它。
我的逻辑是向生成器(spawner)发送一条消息,检查是否已经有5个进程在运行,如果是,则等待直到没有进程在运行,然后启动新进程。从我所看到的情况来看,p.complete <- struct{}{}
并没有按预期工作,没有移除一个进程。在go例程之外,它的工作正常。
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
type ProcessManager struct {
spawner chan struct{}
complete chan struct{}
process int
}
func NewProcessManager() *ProcessManager {
return &ProcessManager{
spawner: make(chan struct{}),
complete: make(chan struct{}),
process: 0,
}
}
func (p *ProcessManager) Run(limit int) {
for {
select {
case <-p.spawner:
for {
if p.process <= limit {
fmt.Println("breaking for new process")
break
}
time.Sleep(time.Second * 10)
}
p.process++
go func() {
fmt.Println("+ Starting goroutine")
p.spawner <- struct{}{}
time.Sleep(time.Second * 2)
fmt.Println("- Stopping goroutine")
p.complete <- struct{}{}
}()
case <-p.complete:
fmt.Println("complete")
p.process--
}
}
}
func main() {
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
pm := NewProcessManager()
go pm.Run(5)
pm.spawner <- struct{}{}
<-interruptChannel
}
希望这可以帮助你解决问题。
英文:
I'm trying to get a better understanding of channel in GO.
Want 5 routines to be running at all times. At specific times during the first routine, I want to try to start another routine. Assuming 5 routines are already running, I want to queue up the next routine and run it as soon as one of the other routines has been completed.
My logic was call pass a message to spawner, check to see if there are 5 processes already running, if so, keep waiting until there isn't, and start up. From what I can tell, is is p.complete <- struct{}{}
isn't working as expecting and removing a process. It works fine outside of the go routine.
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
type ProcessManager struct {
spawner chan struct{}
complete chan struct{}
process int
}
func NewProcessManager() *ProcessManager {
return &ProcessManager{
spawner: make(chan struct{}),
complete: make(chan struct{}),
process: 0,
}
}
func (p *ProcessManager) Run(limit int) {
for {
select {
case <-p.spawner:
for {
if p.process <= limit {
fmt.Println("breaking for new process")
break
}
time.Sleep(time.Second * 10)
}
p.process++
go func() {
fmt.Println("+ Starting goroutine")
p.spawner <- struct{}{}
time.Sleep(time.Second * 2)
fmt.Println("- Stopping goroutine")
p.complete <- struct{}{}
}()
case <-p.complete:
fmt.Println("complete")
p.process--
}
}
}
func main() {
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
pm := NewProcessManager()
go pm.Run(5)
pm.spawner <- struct{}{}
<-interruptChannel
}
答案1
得分: 1
成功解决了这个问题,通过将进程切换到 process: make(chan int, 4)
。
然后,我使用这个来阻塞,而不是使用 for 循环 p.process <- 1
,
然后使用这个来标记一个例程已完成 <-p.process
。
通道的长度(4)将确定在任何给定时间允许运行的最大处理数量。下面是我提供的更新后的测试代码:
func NewProcessManager() *ProcessManager {
return &ProcessManager{
spawner: make(chan struct{}),
complete: make(chan struct{}),
process: make(chan int, 4),
}
}
func (p *ProcessManager) Run() {
for {
select {
case <-p.spawner:
p.process <- 1
go func() {
fmt.Println("+ Starting goroutine")
// 在启动下一个例程之前做一些事情
p.spawner <- struct{}{}
// 做剩下的事情
fmt.Println("- Stopping goroutine")
<-p.process
}()
}
}
}
希望对你有帮助!
英文:
Managed to solve this by switching process to process: make(chan int, 4)
Then I just used this to block instead of the for loop p.process <- 1
and then use this to mark a routine as completed <-p.process
The length of the channel (4) will determine the max number of processed allowed to run at any given time. Updated test I provided below:
func NewProcessManager() *ProcessManager {
return &ProcessManager{
spawner: make(chan struct{}),
complete: make(chan struct{}),
process: make(chan int, 4),
}
}
func (p *ProcessManager) Run() {
for {
select {
case <-p.spawner:
p.process <- 1
go func() {
fmt.Println("+ Starting goroutine")
// do stuff before starting next routine
p.spawner <- struct{}{}
// do stuff rest of stuff
fmt.Println("- Stopping goroutine")
<-p.process
}()
}
}
答案2
得分: 0
ProcessManager
的整个目的在我看来是为了对跟踪运行进程的元数据进行串行访问。因此,它是顺序运行的。
在代码的这个部分,goroutine正在睡眠。在睡眠期间,无法运行select语句中的完整case。
回想一下无缓冲通道的以下特性:
默认情况下,发送和接收操作会阻塞,直到另一侧准备好。
因为通道是无缓冲的,所以这行代码必须阻塞,直到触发完整的case:
p.complete <- struct{}{}
英文:
The whole purpose of the ProcessManager
in my eyes is to serialize access to the metadata keeping track of running processes. As such, it runs sequential.
case <-p.spawner:
for {
if p.process <= limit {
fmt.Println("breaking for new process")
break
}
time.Sleep(time.Second * 10)
}
In this section of code, the goroutine is sleeping. While sleeping, the complete case in the select statement cannot run.
Recall the following property of unbuffered channels:
> By default, sends and receives block until the other side is ready.
Because the channels are unbuffered, this line of code must block until that complete case is triggered:
p.complete <- struct{}{}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论