英文:
Channel won't receive message sent from inside Go Routine
问题
我正在尝试更好地理解Go语言中的通道(channel)。
我希望有5个例程(routine)始终在运行。在第一个例程的特定时间点,我想尝试启动另一个例程。假设已经有5个例程在运行,我希望将下一个例程排队,并在其他例程中有一个完成后立即运行它。
我的逻辑是向生成器(spawner)发送一条消息,检查是否已经有5个进程在运行,如果是,则等待直到没有进程在运行,然后启动新进程。从我所看到的情况来看,p.complete <- struct{}{}并没有按预期工作,没有移除一个进程。在go例程之外,它的工作正常。
package main
import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"
)
type ProcessManager struct {
	spawner  chan struct{}
	complete chan struct{}
	process  int
}
func NewProcessManager() *ProcessManager {
	return &ProcessManager{
		spawner:  make(chan struct{}),
		complete: make(chan struct{}),
		process:  0,
	}
}
func (p *ProcessManager) Run(limit int) {
	for {
		select {
		case <-p.spawner:
			for {
				if p.process <= limit {
					fmt.Println("breaking for new process")
					break
				}
				time.Sleep(time.Second * 10)
			}
			p.process++
			go func() {
				fmt.Println("+ Starting goroutine")
				p.spawner <- struct{}{}
				time.Sleep(time.Second * 2)
				fmt.Println("- Stopping goroutine")
				p.complete <- struct{}{}
			}()
		case <-p.complete:
			fmt.Println("complete")
			p.process--
		}
	}
}
func main() {
	interruptChannel := make(chan os.Signal, 1)
	signal.Notify(interruptChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	pm := NewProcessManager()
	go pm.Run(5)
	pm.spawner <- struct{}{}
	<-interruptChannel
}
希望这可以帮助你解决问题。
英文:
I'm trying to get a better understanding of channel in GO.
Want 5 routines to be running at all times. At specific times during the first routine, I want to try to start another routine. Assuming 5 routines are already running, I want to queue up the next routine and run it as soon as one of the other routines has been completed.
My logic was call pass a message to spawner, check to see if there are 5 processes already running, if so, keep waiting until there isn't, and start up. From what I can tell, is is p.complete <- struct{}{} isn't working as expecting and removing a process. It works fine outside of the go routine.
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
type ProcessManager struct {
spawner  chan struct{}
complete chan struct{}
process  int
}
func NewProcessManager() *ProcessManager {
return &ProcessManager{
spawner:  make(chan struct{}),
complete: make(chan struct{}),
process:  0,
}
}
func (p *ProcessManager) Run(limit int) {
for {
select {
case <-p.spawner:
for {
if p.process <= limit {
fmt.Println("breaking for new process")
break
}
time.Sleep(time.Second * 10)
}
p.process++
go func() {
fmt.Println("+ Starting goroutine")
p.spawner <- struct{}{}
time.Sleep(time.Second * 2)
fmt.Println("- Stopping goroutine")
p.complete <- struct{}{}
}()
case <-p.complete:
fmt.Println("complete")
p.process--
}
}
}
func main() {
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
pm := NewProcessManager()
go pm.Run(5)
pm.spawner <- struct{}{}
<-interruptChannel
}
答案1
得分: 1
成功解决了这个问题,通过将进程切换到 process: make(chan int, 4)。
然后,我使用这个来阻塞,而不是使用 for 循环 p.process <- 1,
然后使用这个来标记一个例程已完成 <-p.process。
通道的长度(4)将确定在任何给定时间允许运行的最大处理数量。下面是我提供的更新后的测试代码:
func NewProcessManager() *ProcessManager {
    return &ProcessManager{
        spawner:  make(chan struct{}),
        complete: make(chan struct{}),
        process:  make(chan int, 4),
    }
}
func (p *ProcessManager) Run() {
    for {
        select {
        case <-p.spawner:
            p.process <- 1
            go func() {
                fmt.Println("+ Starting goroutine")
                // 在启动下一个例程之前做一些事情
                p.spawner <- struct{}{}
                // 做剩下的事情
                fmt.Println("- Stopping goroutine")
                <-p.process
            }()
        }
    }
}
希望对你有帮助!
英文:
Managed to solve this by switching process to process: make(chan int, 4)
Then I just used this to block instead of the for loop p.process <- 1
and then use this to mark a routine as completed <-p.process
The length of the channel (4) will determine the max number of processed allowed to run at any given time. Updated test I provided below:
func NewProcessManager() *ProcessManager {
return &ProcessManager{
spawner:  make(chan struct{}),
complete: make(chan struct{}),
process:  make(chan int, 4),
}
}
func (p *ProcessManager) Run() {
for {
select {
case <-p.spawner:
p.process <- 1
go func() {
fmt.Println("+ Starting goroutine")
// do stuff before starting next routine
p.spawner <- struct{}{}
// do stuff rest of stuff
fmt.Println("- Stopping goroutine")
<-p.process
}()
}
}
答案2
得分: 0
ProcessManager的整个目的在我看来是为了对跟踪运行进程的元数据进行串行访问。因此,它是顺序运行的。
在代码的这个部分,goroutine正在睡眠。在睡眠期间,无法运行select语句中的完整case。
回想一下无缓冲通道的以下特性:
默认情况下,发送和接收操作会阻塞,直到另一侧准备好。
因为通道是无缓冲的,所以这行代码必须阻塞,直到触发完整的case:
p.complete <- struct{}{}
英文:
The whole purpose of the ProcessManager in my eyes is to serialize access to the metadata keeping track of running processes. As such, it runs sequential.
case <-p.spawner:
    for {
        if p.process <= limit {
            fmt.Println("breaking for new process")
            break
        }
        time.Sleep(time.Second * 10)
    }
In this section of code, the goroutine is sleeping. While sleeping, the complete case in the select statement cannot run.
Recall the following property of unbuffered channels:
> By default, sends and receives block until the other side is ready.
Because the channels are unbuffered, this line of code must block until that complete case is triggered:
p.complete <- struct{}{}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论