通道无法接收来自Go协程内部发送的消息。

huangapple go评论71阅读模式
英文:

Channel won't receive message sent from inside Go Routine

问题

我正在尝试更好地理解Go语言中的通道(channel)。

我希望有5个例程(routine)始终在运行。在第一个例程的特定时间点,我想尝试启动另一个例程。假设已经有5个例程在运行,我希望将下一个例程排队,并在其他例程中有一个完成后立即运行它。

我的逻辑是向生成器(spawner)发送一条消息,检查是否已经有5个进程在运行,如果是,则等待直到没有进程在运行,然后启动新进程。从我所看到的情况来看,p.complete <- struct{}{}并没有按预期工作,没有移除一个进程。在go例程之外,它的工作正常。

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"
)

type ProcessManager struct {
	spawner  chan struct{}
	complete chan struct{}
	process  int
}

func NewProcessManager() *ProcessManager {
	return &ProcessManager{
		spawner:  make(chan struct{}),
		complete: make(chan struct{}),
		process:  0,
	}
}

func (p *ProcessManager) Run(limit int) {
	for {
		select {
		case <-p.spawner:
			for {
				if p.process <= limit {
					fmt.Println("breaking for new process")
					break
				}
				time.Sleep(time.Second * 10)
			}
			p.process++

			go func() {
				fmt.Println("+ Starting goroutine")
				p.spawner <- struct{}{}
				time.Sleep(time.Second * 2)
				fmt.Println("- Stopping goroutine")
				p.complete <- struct{}{}

			}()

		case <-p.complete:
			fmt.Println("complete")
			p.process--
		}
	}
}

func main() {
	interruptChannel := make(chan os.Signal, 1)
	signal.Notify(interruptChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	pm := NewProcessManager()
	go pm.Run(5)
	pm.spawner <- struct{}{}
	<-interruptChannel
}

希望这可以帮助你解决问题。

英文:

I'm trying to get a better understanding of channel in GO.

Want 5 routines to be running at all times. At specific times during the first routine, I want to try to start another routine. Assuming 5 routines are already running, I want to queue up the next routine and run it as soon as one of the other routines has been completed.

My logic was call pass a message to spawner, check to see if there are 5 processes already running, if so, keep waiting until there isn't, and start up. From what I can tell, is is p.complete &lt;- struct{}{} isn't working as expecting and removing a process. It works fine outside of the go routine.

package main
import (
&quot;fmt&quot;
&quot;os&quot;
&quot;os/signal&quot;
&quot;syscall&quot;
&quot;time&quot;
)
type ProcessManager struct {
spawner  chan struct{}
complete chan struct{}
process  int
}
func NewProcessManager() *ProcessManager {
return &amp;ProcessManager{
spawner:  make(chan struct{}),
complete: make(chan struct{}),
process:  0,
}
}
func (p *ProcessManager) Run(limit int) {
for {
select {
case &lt;-p.spawner:
for {
if p.process &lt;= limit {
fmt.Println(&quot;breaking for new process&quot;)
break
}
time.Sleep(time.Second * 10)
}
p.process++
go func() {
fmt.Println(&quot;+ Starting goroutine&quot;)
p.spawner &lt;- struct{}{}
time.Sleep(time.Second * 2)
fmt.Println(&quot;- Stopping goroutine&quot;)
p.complete &lt;- struct{}{}
}()
case &lt;-p.complete:
fmt.Println(&quot;complete&quot;)
p.process--
}
}
}
func main() {
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
pm := NewProcessManager()
go pm.Run(5)
pm.spawner &lt;- struct{}{}
&lt;-interruptChannel
}

答案1

得分: 1

成功解决了这个问题,通过将进程切换到 process: make(chan int, 4)

然后,我使用这个来阻塞,而不是使用 for 循环 p.process <- 1
然后使用这个来标记一个例程已完成 <-p.process

通道的长度(4)将确定在任何给定时间允许运行的最大处理数量。下面是我提供的更新后的测试代码:

func NewProcessManager() *ProcessManager {
    return &ProcessManager{
        spawner:  make(chan struct{}),
        complete: make(chan struct{}),
        process:  make(chan int, 4),
    }
}

func (p *ProcessManager) Run() {
    for {
        select {
        case <-p.spawner:
            p.process <- 1
            go func() {
                fmt.Println("+ Starting goroutine")
                // 在启动下一个例程之前做一些事情
                p.spawner <- struct{}{}
                // 做剩下的事情
                fmt.Println("- Stopping goroutine")
                <-p.process
            }()
        }
    }
}

希望对你有帮助!

英文:

Managed to solve this by switching process to process: make(chan int, 4)

Then I just used this to block instead of the for loop p.process &lt;- 1
and then use this to mark a routine as completed &lt;-p.process

The length of the channel (4) will determine the max number of processed allowed to run at any given time. Updated test I provided below:

func NewProcessManager() *ProcessManager {
return &amp;ProcessManager{
spawner:  make(chan struct{}),
complete: make(chan struct{}),
process:  make(chan int, 4),
}
}
func (p *ProcessManager) Run() {
for {
select {
case &lt;-p.spawner:
p.process &lt;- 1
go func() {
fmt.Println(&quot;+ Starting goroutine&quot;)
// do stuff before starting next routine
p.spawner &lt;- struct{}{}
// do stuff rest of stuff
fmt.Println(&quot;- Stopping goroutine&quot;)
&lt;-p.process
}()
}
}

答案2

得分: 0

ProcessManager的整个目的在我看来是为了对跟踪运行进程的元数据进行串行访问。因此,它是顺序运行的。

在代码的这个部分,goroutine正在睡眠。在睡眠期间,无法运行select语句中的完整case。

回想一下无缓冲通道的以下特性:

默认情况下,发送和接收操作会阻塞,直到另一侧准备好。

因为通道是无缓冲的,所以这行代码必须阻塞,直到触发完整的case:

p.complete <- struct{}{}
英文:

The whole purpose of the ProcessManager in my eyes is to serialize access to the metadata keeping track of running processes. As such, it runs sequential.

case &lt;-p.spawner:
    for {
        if p.process &lt;= limit {
            fmt.Println(&quot;breaking for new process&quot;)
            break
        }
        time.Sleep(time.Second * 10)
    }

In this section of code, the goroutine is sleeping. While sleeping, the complete case in the select statement cannot run.

Recall the following property of unbuffered channels:

> By default, sends and receives block until the other side is ready.

Because the channels are unbuffered, this line of code must block until that complete case is triggered:

p.complete &lt;- struct{}{}

huangapple
  • 本文由 发表于 2022年1月9日 13:44:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/70638747.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定