有没有一种优雅的方法来暂停和恢复其他的goroutine?

huangapple go评论110阅读模式
英文:

Is there some elegant way to pause and resume any other goroutine?

问题

在我的情况下,我有成千上万个同时工作的goroutine作为work()。我还有一个sync() goroutine。当sync开始时,我需要任何其他goroutine在同步工作完成后暂停一段时间。这是我的代码:

  1. var channels []chan int
  2. var channels_mutex sync.Mutex
  3. func work() {
  4. channel := make(chan int, 1)
  5. channels_mutex.Lock()
  6. channels = append(channels, channel)
  7. channels_mutex.Unlock()
  8. for {
  9. for {
  10. sync_stat := <- channel // 在这里阻塞
  11. if sync_stat == 0 { // 如果同步完成
  12. break
  13. }
  14. }
  15. // 做一些工作
  16. if (一些条件) {
  17. return
  18. }
  19. }
  20. }
  21. func sync() {
  22. channels_mutex.Lock()
  23. // 做一些同步工作
  24. for int i := 0; i != len(channels); i++ {
  25. channels[i] <- 0
  26. }
  27. channels_mutex.Unlock()
  28. }

现在的问题是,由于<-在读取时总是阻塞的,每次进入sync_stat := <- channel都会被阻塞。我知道如果通道被关闭,它就不会被阻塞,但是由于我必须在work()退出之前使用这个通道,并且我没有找到重新打开关闭的通道的方法。

我怀疑自己走错了路,所以任何帮助都将不胜感激。有没有一种“优雅”的方法来暂停和恢复任何其他goroutine?

英文:

In my case, I have thousands of goroutines working simultaneously as work(). I also had a sync() goroutine. When sync starts, I need any other goroutine to pause for a while after sync job is done. Here is my code:

  1. var channels []chan int
  2. var channels_mutex sync.Mutex
  3. func work() {
  4. channel := make(chan int, 1)
  5. channels_mutex.Lock()
  6. channels = append(channels, channel)
  7. channels_mutex.Unlock()
  8. for {
  9. for {
  10. sync_stat := &lt;- channel // blocked here
  11. if sync_stat == 0 { // if sync complete
  12. break
  13. }
  14. }
  15. // Do some jobs
  16. if (some condition) {
  17. return
  18. }
  19. }
  20. }
  21. func sync() {
  22. channels_mutex.Lock()
  23. // do some sync
  24. for int i := 0; i != len(channels); i++ {
  25. channels[i] &lt;- 0
  26. }
  27. channels_mutex.Unlock()
  28. }

Now the problem is, since &lt;- is always blocking on read, every time goes to sync_stat := &lt;- channel is blocking. I know if the channel was closed it won't be blocked, but since I have to use this channel until work() exits, and I didn't find any way to reopen a closed channel.

I suspect myself on a wrong way, so any help is appreciated. Is there some "elegant" way to pause and resume any other goroutine?

答案1

得分: 27

如果我理解正确,您想要N个工作者和一个控制器,可以随意暂停、恢复和停止工作者。以下代码将实现这一功能。

  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. )
  7. // 可能的工作者状态。
  8. const (
  9. Stopped = 0
  10. Paused = 1
  11. Running = 2
  12. )
  13. // 最大工作者数量。
  14. const WorkerCount = 1000
  15. func main() {
  16. // 启动工作者。
  17. var wg sync.WaitGroup
  18. wg.Add(WorkerCount + 1)
  19. workers := make([]chan int, WorkerCount)
  20. for i := range workers {
  21. workers[i] = make(chan int, 1)
  22. go func(i int) {
  23. worker(i, workers[i])
  24. wg.Done()
  25. }(i)
  26. }
  27. // 启动控制器例程。
  28. go func() {
  29. controller(workers)
  30. wg.Done()
  31. }()
  32. // 等待所有goroutine完成。
  33. wg.Wait()
  34. }
  35. func worker(id int, ws <-chan int) {
  36. state := Paused // 开始处于暂停状态。
  37. for {
  38. select {
  39. case state = <-ws:
  40. switch state {
  41. case Stopped:
  42. fmt.Printf("Worker %d: Stopped\n", id)
  43. return
  44. case Running:
  45. fmt.Printf("Worker %d: Running\n", id)
  46. case Paused:
  47. fmt.Printf("Worker %d: Paused\n", id)
  48. }
  49. default:
  50. // 我们使用runtime.Gosched()来防止在这种情况下发生死锁。
  51. // 如果在这里执行了工作并且让出给调度器,则不需要它。
  52. runtime.Gosched()
  53. if state == Paused {
  54. break
  55. }
  56. // 在这里执行实际的工作。
  57. }
  58. }
  59. }
  60. // controller处理所有工作者的当前状态。它们可以被指示为运行、暂停或完全停止。
  61. func controller(workers []chan int) {
  62. // 启动工作者
  63. setState(workers, Running)
  64. // 暂停工作者。
  65. setState(workers, Paused)
  66. // 恢复工作者。
  67. setState(workers, Running)
  68. // 关闭工作者。
  69. setState(workers, Stopped)
  70. }
  71. // setState更改所有给定工作者的状态。
  72. func setState(workers []chan int, state int) {
  73. for _, w := range workers {
  74. w <- state
  75. }
  76. }
英文:

If I understand you correctly, you want N number of workers and one controller, which can pause, resume and stop the workers at will. The following code will do just that.

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;runtime&quot;
  5. &quot;sync&quot;
  6. )
  7. // Possible worker states.
  8. const (
  9. Stopped = 0
  10. Paused = 1
  11. Running = 2
  12. )
  13. // Maximum number of workers.
  14. const WorkerCount = 1000
  15. func main() {
  16. // Launch workers.
  17. var wg sync.WaitGroup
  18. wg.Add(WorkerCount + 1)
  19. workers := make([]chan int, WorkerCount)
  20. for i := range workers {
  21. workers[i] = make(chan int, 1)
  22. go func(i int) {
  23. worker(i, workers[i])
  24. wg.Done()
  25. }(i)
  26. }
  27. // Launch controller routine.
  28. go func() {
  29. controller(workers)
  30. wg.Done()
  31. }()
  32. // Wait for all goroutines to finish.
  33. wg.Wait()
  34. }
  35. func worker(id int, ws &lt;-chan int) {
  36. state := Paused // Begin in the paused state.
  37. for {
  38. select {
  39. case state = &lt;-ws:
  40. switch state {
  41. case Stopped:
  42. fmt.Printf(&quot;Worker %d: Stopped\n&quot;, id)
  43. return
  44. case Running:
  45. fmt.Printf(&quot;Worker %d: Running\n&quot;, id)
  46. case Paused:
  47. fmt.Printf(&quot;Worker %d: Paused\n&quot;, id)
  48. }
  49. default:
  50. // We use runtime.Gosched() to prevent a deadlock in this case.
  51. // It will not be needed of work is performed here which yields
  52. // to the scheduler.
  53. runtime.Gosched()
  54. if state == Paused {
  55. break
  56. }
  57. // Do actual work here.
  58. }
  59. }
  60. }
  61. // controller handles the current state of all workers. They can be
  62. // instructed to be either running, paused or stopped entirely.
  63. func controller(workers []chan int) {
  64. // Start workers
  65. setState(workers, Running)
  66. // Pause workers.
  67. setState(workers, Paused)
  68. // Unpause workers.
  69. setState(workers, Running)
  70. // Shutdown workers.
  71. setState(workers, Stopped)
  72. }
  73. // setState changes the state of all given workers.
  74. func setState(workers []chan int, state int) {
  75. for _, w := range workers {
  76. w &lt;- state
  77. }
  78. }

huangapple
  • 本文由 发表于 2013年4月19日 17:26:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/16101409.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定