英文:
Is there some elegant way to pause and resume any other goroutine?
问题
在我的情况下,我有成千上万个同时工作的goroutine作为work()
。我还有一个sync()
goroutine。当sync
开始时,我需要任何其他goroutine在同步工作完成后暂停一段时间。这是我的代码:
var channels []chan int
var channels_mutex sync.Mutex
func work() {
channel := make(chan int, 1)
channels_mutex.Lock()
channels = append(channels, channel)
channels_mutex.Unlock()
for {
for {
sync_stat := <- channel // 在这里阻塞
if sync_stat == 0 { // 如果同步完成
break
}
}
// 做一些工作
if (一些条件) {
return
}
}
}
func sync() {
channels_mutex.Lock()
// 做一些同步工作
for int i := 0; i != len(channels); i++ {
channels[i] <- 0
}
channels_mutex.Unlock()
}
现在的问题是,由于<-
在读取时总是阻塞的,每次进入sync_stat := <- channel
都会被阻塞。我知道如果通道被关闭,它就不会被阻塞,但是由于我必须在work()
退出之前使用这个通道,并且我没有找到重新打开关闭的通道的方法。
我怀疑自己走错了路,所以任何帮助都将不胜感激。有没有一种“优雅”的方法来暂停和恢复任何其他goroutine?
英文:
In my case, I have thousands of goroutines working simultaneously as work()
. I also had a sync()
goroutine. When sync
starts, I need any other goroutine to pause for a while after sync job is done. Here is my code:
var channels []chan int
var channels_mutex sync.Mutex
func work() {
channel := make(chan int, 1)
channels_mutex.Lock()
channels = append(channels, channel)
channels_mutex.Unlock()
for {
for {
sync_stat := <- channel // blocked here
if sync_stat == 0 { // if sync complete
break
}
}
// Do some jobs
if (some condition) {
return
}
}
}
func sync() {
channels_mutex.Lock()
// do some sync
for int i := 0; i != len(channels); i++ {
channels[i] <- 0
}
channels_mutex.Unlock()
}
Now the problem is, since <-
is always blocking on read, every time goes to sync_stat := <- channel
is blocking. I know if the channel was closed it won't be blocked, but since I have to use this channel until work()
exits, and I didn't find any way to reopen a closed channel.
I suspect myself on a wrong way, so any help is appreciated. Is there some "elegant" way to pause and resume any other goroutine?
答案1
得分: 27
如果我理解正确,您想要N个工作者和一个控制器,可以随意暂停、恢复和停止工作者。以下代码将实现这一功能。
package main
import (
"fmt"
"runtime"
"sync"
)
// 可能的工作者状态。
const (
Stopped = 0
Paused = 1
Running = 2
)
// 最大工作者数量。
const WorkerCount = 1000
func main() {
// 启动工作者。
var wg sync.WaitGroup
wg.Add(WorkerCount + 1)
workers := make([]chan int, WorkerCount)
for i := range workers {
workers[i] = make(chan int, 1)
go func(i int) {
worker(i, workers[i])
wg.Done()
}(i)
}
// 启动控制器例程。
go func() {
controller(workers)
wg.Done()
}()
// 等待所有goroutine完成。
wg.Wait()
}
func worker(id int, ws <-chan int) {
state := Paused // 开始处于暂停状态。
for {
select {
case state = <-ws:
switch state {
case Stopped:
fmt.Printf("Worker %d: Stopped\n", id)
return
case Running:
fmt.Printf("Worker %d: Running\n", id)
case Paused:
fmt.Printf("Worker %d: Paused\n", id)
}
default:
// 我们使用runtime.Gosched()来防止在这种情况下发生死锁。
// 如果在这里执行了工作并且让出给调度器,则不需要它。
runtime.Gosched()
if state == Paused {
break
}
// 在这里执行实际的工作。
}
}
}
// controller处理所有工作者的当前状态。它们可以被指示为运行、暂停或完全停止。
func controller(workers []chan int) {
// 启动工作者
setState(workers, Running)
// 暂停工作者。
setState(workers, Paused)
// 恢复工作者。
setState(workers, Running)
// 关闭工作者。
setState(workers, Stopped)
}
// setState更改所有给定工作者的状态。
func setState(workers []chan int, state int) {
for _, w := range workers {
w <- state
}
}
英文:
If I understand you correctly, you want N number of workers and one controller, which can pause, resume and stop the workers at will. The following code will do just that.
package main
import (
"fmt"
"runtime"
"sync"
)
// Possible worker states.
const (
Stopped = 0
Paused = 1
Running = 2
)
// Maximum number of workers.
const WorkerCount = 1000
func main() {
// Launch workers.
var wg sync.WaitGroup
wg.Add(WorkerCount + 1)
workers := make([]chan int, WorkerCount)
for i := range workers {
workers[i] = make(chan int, 1)
go func(i int) {
worker(i, workers[i])
wg.Done()
}(i)
}
// Launch controller routine.
go func() {
controller(workers)
wg.Done()
}()
// Wait for all goroutines to finish.
wg.Wait()
}
func worker(id int, ws <-chan int) {
state := Paused // Begin in the paused state.
for {
select {
case state = <-ws:
switch state {
case Stopped:
fmt.Printf("Worker %d: Stopped\n", id)
return
case Running:
fmt.Printf("Worker %d: Running\n", id)
case Paused:
fmt.Printf("Worker %d: Paused\n", id)
}
default:
// We use runtime.Gosched() to prevent a deadlock in this case.
// It will not be needed of work is performed here which yields
// to the scheduler.
runtime.Gosched()
if state == Paused {
break
}
// Do actual work here.
}
}
}
// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
// Start workers
setState(workers, Running)
// Pause workers.
setState(workers, Paused)
// Unpause workers.
setState(workers, Running)
// Shutdown workers.
setState(workers, Stopped)
}
// setState changes the state of all given workers.
func setState(workers []chan int, state int) {
for _, w := range workers {
w <- state
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论