如何正确地在执行工作池之间进行延迟?

huangapple go评论154阅读模式
英文:

How to properly delay between executing a pool of workers

问题

你好

我正在尝试实现工作执行之间的正确延迟,例如,需要工作人员完成30个任务,然后休眠5秒钟,我该如何在代码中跟踪确切完成了30个任务,然后才休眠5秒钟

下面是创建一个包含30个工作人员的池的代码,他们依次以无序的方式执行30个任务,以下是代码:


import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Job struct {
	id       int
	randomno int
}
type Result struct {
	job         Job
	sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
	sum := 0
	no := number
	for no != 0 {
		digit := no % 10
		sum += digit
		no /= 10
	}
	time.Sleep(2 * time.Second)
	return sum
}
func worker(wg *sync.WaitGroup) {
	for job := range jobs {
		output := Result{job, digits(job.randomno)}
		results <- output
	}
	wg.Done()
}
func createWorkerPool(noOfWorkers int) {
	var wg sync.WaitGroup
	for i := 0; i < noOfWorkers; i++ {
		wg.Add(1)
		go worker(&wg)
	}

	wg.Wait()
	close(results)
}
func allocate(noOfJobs int) {
	for i := 0; i < noOfJobs; i++ {
		if i != 0 && i%30 == 0 {
			fmt.Printf("SLEEPAGE 5 sec...")
			time.Sleep(10 * time.Second)

		}
		randomno := rand.Intn(999)
		job := Job{i, randomno}
		jobs <- job
	}
	close(jobs)
}
func result(done chan bool) {
	for result := range results {
		fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
	}
	done <- true
}
func main() {
	startTime := time.Now()
	noOfJobs := 100
	go allocate(noOfJobs)
	done := make(chan bool)
	go result(done)
	noOfWorkers := 30
	createWorkerPool(noOfWorkers)
	<-done
	endTime := time.Now()
	diff := endTime.Sub(startTime)
	fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

play: https://go.dev/play/p/lehl7hoo-kp

不清楚如何确保完成30个任务以及在哪里插入延迟,对于任何帮助,我将非常感激。

英文:

Good day,

I'm trying to implement the correct delay between the execution of workers, for example, it is necessary for the workers to complete 30 tasks and go to sleep for 5 seconds, how can I track in the code that exactly 30 tasks have been completed and only after that go to sleep for 5 seconds?

Below is the code that creates a pool of 30 workers, who, in turn, perform tasks of 30 pieces at a time in an unordered manner, here is the code:


import (
	&quot;fmt&quot;
	&quot;math/rand&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

type Job struct {
	id       int
	randomno int
}
type Result struct {
	job         Job
	sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
	sum := 0
	no := number
	for no != 0 {
		digit := no % 10
		sum += digit
		no /= 10
	}
	time.Sleep(2 * time.Second)
	return sum
}
func worker(wg *sync.WaitGroup) {
	for job := range jobs {
		output := Result{job, digits(job.randomno)}
		results &lt;- output
	}
	wg.Done()
}
func createWorkerPool(noOfWorkers int) {
	var wg sync.WaitGroup
	for i := 0; i &lt; noOfWorkers; i++ {
		wg.Add(1)
		go worker(&amp;wg)
	}

	wg.Wait()
	close(results)
}
func allocate(noOfJobs int) {
	for i := 0; i &lt; noOfJobs; i++ {
		if i != 0 &amp;&amp; i%30 == 0 {
			fmt.Printf(&quot;SLEEPAGE 5 sec...&quot;)
			time.Sleep(10 * time.Second)

		}
		randomno := rand.Intn(999)
		job := Job{i, randomno}
		jobs &lt;- job
	}
	close(jobs)
}
func result(done chan bool) {
	for result := range results {
		fmt.Printf(&quot;Job id %d, input random no %d , sum of digits %d\n&quot;, result.job.id, result.job.randomno, result.sumofdigits)
	}
	done &lt;- true
}
func main() {
	startTime := time.Now()
	noOfJobs := 100
	go allocate(noOfJobs)
	done := make(chan bool)
	go result(done)
	noOfWorkers := 30
	createWorkerPool(noOfWorkers)
	&lt;-done
	endTime := time.Now()
	diff := endTime.Sub(startTime)
	fmt.Println(&quot;total time taken &quot;, diff.Seconds(), &quot;seconds&quot;)
}

play: https://go.dev/play/p/lehl7hoo-kp

It is not clear exactly how to make sure that 30 tasks are completed and where to insert the delay, I will be grateful for any help

答案1

得分: 1

好的,让我们从这个工作示例开始:

func Test_t(t *testing.T) {

	// just a published, this publishes result on a chan
	publish := func(s int, ch chan int, wg *sync.WaitGroup) {
		ch <- s // this is blocking!!!
		wg.Done()
	}

	wg := &sync.WaitGroup{}
	wg.Add(100)

	// we'll use done channel to notify the work is done
	res := make(chan int)
	done := make(chan struct{})
	// create worker that will notify that all results were published
	go func() {
		wg.Wait()
		done <- struct{}{}
	}()

	// let's create a jobs that publish on our res chan
	// please note all goroutines are created immediately
	for i := 0; i < 100; i++ {
		go publish(i, res, wg)
	}

	// lets get 30 args and then wait
	var resCounter int
forloop:
	for {
		select {
		case ss := <-res:
			println(ss)
			resCounter += 1
			// break the loop
			if resCounter%30 == 0 {
				// after receiving 30 results we are blocking this thread
				// no more results will be taken from the channel for 5 seconds
				println("received 30 results, waiting...")
				time.Sleep(5 * time.Second)
			}
		case <-done:
			// we are done here, let's break this infinite loop
			break forloop
		}
	}
}

我希望这个示例能更清楚地展示如何实现。

那么,你的代码有什么问题呢?
老实说,看起来没什么问题(我的意思是,30个结果被发布,然后代码等待,然后又是30个结果,以此类推),但问题是你想在哪里等待

我猜有几种可能性:

  • 创建工作线程(这是你现在的代码工作方式,我看到它以30个一组发布作业;请注意,digit函数中的2秒延迟仅适用于执行代码的goroutine)

  • 触发工作线程(因此,“等待”代码应该在工作函数中,不允许运行更多的工作线程-因此它必须监视已发布的结果数量)

  • 处理结果(这是我的代码如何工作的,适当的同步在forloop中)

英文:

Okey, so let's start with this working example:

func Test_t(t *testing.T) {

	// just a published, this publishes result on a chan
	publish := func(s int, ch chan int, wg *sync.WaitGroup) {
		ch &lt;- s // this is blocking!!!
		wg.Done()
	}

	wg := &amp;sync.WaitGroup{}
	wg.Add(100)

	// we&#39;ll use done channel to notify the work is done
	res := make(chan int)
	done := make(chan struct{})
	// create worker that will notify that all results were published
	go func() {
		wg.Wait()
		done &lt;- struct{}{}
	}()
	
	// let&#39;s create a jobs that publish on our res chan
	// please note all goroutines are created immediately
	for i := 0; i &lt; 100; i++ {
		go publish(i, res, wg)
	}

	// lets get 30 args and then wait
	var resCounter int
forloop:
	for {
		select {
		case ss := &lt;-res:
			println(ss)
			resCounter += 1
			// break the loop
			if resCounter%30 == 0 {
				// after receiving 30 results we are blocking this thread
				// no more results will be taken from the channel for 5 seconds
				println(&quot;received 30 results, waiting...&quot;)
				time.Sleep(5 * time.Second)
			}
		case &lt;-done:
			// we are done here, let&#39;s break this infinite loop
			break forloop
		}
	}
}

I hope this shows moreover how it can be done.

So, what's the problem with your code?
To be honest, it looks fine (I mean 30 results are published, then the code wait, then another 30 results, etc.), but the question is where would you like to wait?

There are a few possibilities I guess:

  • creating workers (this is how your code works now, as I see, it publishes jobs in 30-packs; please notice that the 2-second delay you have in the digit function is applicable only to the goroutine the code is executed)

  • triggering workers (so the "wait" code should be in worker function, not allowing to run more workers - so it must watch how many results were published)

  • handling results (this is how my code works and proper synchronization is in the forloop)

huangapple
  • 本文由 发表于 2021年12月17日 00:18:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/70382337.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定