有没有一种优雅的方法来暂停和恢复其他的goroutine?

huangapple go评论80阅读模式
英文:

Is there some elegant way to pause and resume any other goroutine?

问题

在我的情况下,我有成千上万个同时工作的goroutine作为work()。我还有一个sync() goroutine。当sync开始时,我需要任何其他goroutine在同步工作完成后暂停一段时间。这是我的代码:

var channels []chan int
var channels_mutex sync.Mutex

func work() {
  channel := make(chan int, 1)
  channels_mutex.Lock()  
  channels = append(channels, channel)
  channels_mutex.Unlock()
  for {
    for {
      sync_stat := <- channel // 在这里阻塞
      if sync_stat == 0 { // 如果同步完成
        break  
      }
    }
    // 做一些工作
    if (一些条件) {
      return
    }
  }
}

func sync() {
  channels_mutex.Lock()
  // 做一些同步工作

  for int i := 0; i != len(channels); i++ {
    channels[i] <- 0
  }
  channels_mutex.Unlock()
}

现在的问题是,由于<-在读取时总是阻塞的,每次进入sync_stat := <- channel都会被阻塞。我知道如果通道被关闭,它就不会被阻塞,但是由于我必须在work()退出之前使用这个通道,并且我没有找到重新打开关闭的通道的方法。

我怀疑自己走错了路,所以任何帮助都将不胜感激。有没有一种“优雅”的方法来暂停和恢复任何其他goroutine?

英文:

In my case, I have thousands of goroutines working simultaneously as work(). I also had a sync() goroutine. When sync starts, I need any other goroutine to pause for a while after sync job is done. Here is my code:

var channels []chan int
var channels_mutex sync.Mutex

func work() {
  channel := make(chan int, 1)
  channels_mutex.Lock()  
  channels = append(channels, channel)
  channels_mutex.Unlock()
  for {
    for {
      sync_stat := &lt;- channel // blocked here
      if sync_stat == 0 { // if sync complete
        break  
      }
    }
    // Do some jobs
    if (some condition) {
      return
    }
  }
}

func sync() {
  channels_mutex.Lock()
  // do some sync

  for int i := 0; i != len(channels); i++ {
    channels[i] &lt;- 0
  }
  channels_mutex.Unlock()
}

Now the problem is, since &lt;- is always blocking on read, every time goes to sync_stat := &lt;- channel is blocking. I know if the channel was closed it won't be blocked, but since I have to use this channel until work() exits, and I didn't find any way to reopen a closed channel.

I suspect myself on a wrong way, so any help is appreciated. Is there some "elegant" way to pause and resume any other goroutine?

答案1

得分: 27

如果我理解正确,您想要N个工作者和一个控制器,可以随意暂停、恢复和停止工作者。以下代码将实现这一功能。

package main

import (
	"fmt"
	"runtime"
	"sync"
)

// 可能的工作者状态。
const (
	Stopped = 0
	Paused  = 1
	Running = 2
)

// 最大工作者数量。
const WorkerCount = 1000

func main() {
	// 启动工作者。
	var wg sync.WaitGroup
	wg.Add(WorkerCount + 1)

	workers := make([]chan int, WorkerCount)
	for i := range workers {
		workers[i] = make(chan int, 1)

		go func(i int) {
			worker(i, workers[i])
			wg.Done()
		}(i)
	}

	// 启动控制器例程。
	go func() {
		controller(workers)
		wg.Done()
	}()

	// 等待所有goroutine完成。
	wg.Wait()
}

func worker(id int, ws <-chan int) {
	state := Paused // 开始处于暂停状态。

	for {
		select {
		case state = <-ws:
			switch state {
			case Stopped:
				fmt.Printf("Worker %d: Stopped\n", id)
				return
			case Running:
				fmt.Printf("Worker %d: Running\n", id)
			case Paused:
				fmt.Printf("Worker %d: Paused\n", id)
			}

		default:
			// 我们使用runtime.Gosched()来防止在这种情况下发生死锁。
			// 如果在这里执行了工作并且让出给调度器,则不需要它。
			runtime.Gosched()

			if state == Paused {
				break
			}

			// 在这里执行实际的工作。
		}
	}
}

// controller处理所有工作者的当前状态。它们可以被指示为运行、暂停或完全停止。
func controller(workers []chan int) {
	// 启动工作者
	setState(workers, Running)

	// 暂停工作者。
	setState(workers, Paused)

	// 恢复工作者。
	setState(workers, Running)

	// 关闭工作者。
	setState(workers, Stopped)
}

// setState更改所有给定工作者的状态。
func setState(workers []chan int, state int) {
	for _, w := range workers {
		w <- state
	}
}
英文:

If I understand you correctly, you want N number of workers and one controller, which can pause, resume and stop the workers at will. The following code will do just that.

package main

import (
	&quot;fmt&quot;
	&quot;runtime&quot;
	&quot;sync&quot;
)

// Possible worker states.
const (
	Stopped = 0
	Paused  = 1
	Running = 2
)

// Maximum number of workers.
const WorkerCount = 1000

func main() {
	// Launch workers.
	var wg sync.WaitGroup
	wg.Add(WorkerCount + 1)

	workers := make([]chan int, WorkerCount)
	for i := range workers {
		workers[i] = make(chan int, 1)

		go func(i int) {
			worker(i, workers[i])
			wg.Done()
		}(i)
	}

	// Launch controller routine.
	go func() {
		controller(workers)
		wg.Done()
	}()

	// Wait for all goroutines to finish.
	wg.Wait()
}

func worker(id int, ws &lt;-chan int) {
	state := Paused // Begin in the paused state.

	for {
		select {
		case state = &lt;-ws:
			switch state {
			case Stopped:
				fmt.Printf(&quot;Worker %d: Stopped\n&quot;, id)
				return
			case Running:
				fmt.Printf(&quot;Worker %d: Running\n&quot;, id)
			case Paused:
				fmt.Printf(&quot;Worker %d: Paused\n&quot;, id)
			}

		default:
			// We use runtime.Gosched() to prevent a deadlock in this case.
			// It will not be needed of work is performed here which yields
			// to the scheduler.
			runtime.Gosched()

			if state == Paused {
				break
			}

			// Do actual work here.
		}
	}
}

// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
	// Start workers
	setState(workers, Running)

	// Pause workers.
	setState(workers, Paused)

	// Unpause workers.
	setState(workers, Running)

	// Shutdown workers.
	setState(workers, Stopped)
}

// setState changes the state of all given workers.
func setState(workers []chan int, state int) {
	for _, w := range workers {
		w &lt;- state
	}
}

huangapple
  • 本文由 发表于 2013年4月19日 17:26:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/16101409.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定