如何使Go通道工作者具有不同的结果长度?

huangapple go评论85阅读模式
英文:

How to make Go channel worker have different result's length?

问题

我从gobyexample上做了一些修改:

import (
	"fmt"
	"math/rand"
	"time"
)

type DemoResult struct {
	Name string
	Rate int
}

func random(min, max int) int {
	rand.Seed(time.Now().UTC().UnixNano())
	return rand.Intn(max-min) + min
}

func worker(id int, jobs <-chan int, results chan<- DemoResult) {
	for j := range jobs {
		fmt.Println("worker", id, "started  job", j)
		time.Sleep(time.Second)
		fmt.Println("worker", id, "finished job", j)
		myrand := random(1, 4)
		if myrand == 2 {
			results <- DemoResult{Name: "succ", Rate: j}
		}
		//  else {
		// 	results <- DemoResult{Name: "failed", Rate: 999}
		// }
	}
}

func main() {
	const numJobs = 5
	jobs := make(chan int, numJobs)
	results := make(chan DemoResult, numJobs) // 修改这里,使结果通道的长度与作业通道相同
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}
	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs)

	for a := 1; a <= numJobs; a++ {
		out := <-results
		if out.Name == "succ" {
			fmt.Printf("%v\n", out)
		}
	}
}

我有意注释了以下代码,使其永远停在那里:

        //  else {
        //  results <- DemoResult{Name: "failed", Rate: 999}
        // }

看起来我们应该使结果通道的长度与作业通道相同。我想知道是否可以使它们具有不同的长度?

英文:

I made some edits from the gobyexample:

import (
	&quot;fmt&quot;
	&quot;math/rand&quot;
	&quot;time&quot;
)

type DemoResult struct {
	Name string
	Rate int
}

func random(min, max int) int {
	rand.Seed(time.Now().UTC().UnixNano())
	return rand.Intn(max-min) + min
}

func worker(id int, jobs &lt;-chan int, results chan&lt;- DemoResult) {
	for j := range jobs {
		fmt.Println(&quot;worker&quot;, id, &quot;started  job&quot;, j)
		time.Sleep(time.Second)
		fmt.Println(&quot;worker&quot;, id, &quot;finished job&quot;, j)
		myrand := random(1, 4)
		if myrand == 2 {
			results &lt;- DemoResult{Name: &quot;succ&quot;, Rate: j}
		}
		//  else {
		// 	results &lt;- DemoResult{Name: &quot;failed&quot;, Rate: 999}
		// }
	}
}

func main() {
	const numJobs = 5
	jobs := make(chan int, numJobs)
	results := make(chan DemoResult)
	for w := 1; w &lt;= 3; w++ {
		go worker(w, jobs, results)
	}
	for j := 1; j &lt;= numJobs; j++ {
		jobs &lt;- j
	}
	close(jobs)

	for a := 1; a &lt;= numJobs; a++ {
		out := &lt;-results
		if out.Name == &quot;succ&quot; {
			fmt.Printf(&quot;%v\n&quot;, out)
		}
	}
}

I commented the following code intentional to make it stuck forever:

        //  else {
        //  results &lt;- DemoResult{Name: &quot;failed&quot;, Rate: 999}
        // }

It seems like we should make the result's length the same as jobs'. I was wondering if we could make it have different length?

答案1

得分: 5

使用等待组(wait group)来检测工作线程是否完成。当工作线程完成时,关闭结果通道。接收结果直到通道关闭。

func worker(wg *sync.WaitGroup, id int, 
            jobs <-chan int, 
            results chan<- DemoResult) {
    // 在函数返回时减少等待组计数器。
    defer wg.Done()
     
}

func main() {
    
    // 声明等待组并为每个工作线程增加计数器。
    var wg sync.WaitGroup
    for w := 1; w <= 3; w++ {
        wg.Add(1)
        go worker(&wg, w, jobs, results)
    }
    
    // 等待工作线程将等待组计数器减少到零并关闭通道。
    // 在 goroutine 中执行,以便我们可以继续从主函数中接收结果值。
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 循环直到结果通道关闭。
    for out := range results {
        
    }
}

点击此处查看示例代码。

英文:

Use a wait group to detect when the workers are done. Close the results channel when the workers are done. Receive results until the channel is closed.

func worker(wg *sync.WaitGroup, id int, 
jobs &lt;-chan int, 
results chan&lt;- DemoResult) {
// Decrement wait group counter on return from
// function.
defer wg.Done()
⋮ 
}
func main() {
⋮
// Declare wait group and increment counter for
// each worker.
var wg sync.WaitGroup
for w := 1; w &lt;= 3; w++ {
wg.Add(1)
go worker(&amp;wg, w, jobs, results)
}
⋮
// Wait for workers to decrement wait group
// counter to zero and close channel.
// Execute in goroutine so we can continue on 
// to receiving values from results in main. 
go func() {
wg.Wait()
close(results)
}()
⋮
// Loop until results is closed.
for out := range results {
⋮
}
}

https://go.dev/play/p/FOQwybMl7tM

答案2

得分: 1

我想知道是否可以使其具有不同的长度?

当然可以,但你需要一种方法来确定何时到达结果的末尾。这就是你的示例失败的原因 - 目前该函数假设将会有numJobs(每个作业一个结果)个结果,并等待这么多个结果。

另一种方法是使用通道的关闭来指示这一点,即使用close(results)来关闭通道。以下是一个示例代码:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type DemoResult struct {
	Name string
	Rate int
}

func random(min, max int) int {
	rand.Seed(time.Now().UTC().UnixNano())
	return rand.Intn(max-min) + min
}

func worker(id int, jobs <-chan int, results chan<- DemoResult) {
	for j := range jobs {
		fmt.Println("worker", id, "started  job", j)
		time.Sleep(time.Second)
		fmt.Println("worker", id, "finished job", j)
		myrand := random(1, 4)
		if myrand == 2 {
			results <- DemoResult{Name: "succ", Rate: j}
		} // else {
		//	results <- DemoResult{Name: "failed", Rate: 999}
		//}
	}
}

func main() {
	const numWorkers = 3
	const numJobs = 5
	jobs := make(chan int, numJobs)
	results := make(chan DemoResult)

	var wg sync.WaitGroup
	wg.Add(numWorkers)
	for w := 1; w <= numWorkers; w++ {
		go func() {
			worker(w, jobs, results)
			wg.Done()
		}()
	}
	go func() {
		wg.Wait() // Wait for go routines to complete then close results channel
		close(results)
	}()

	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs)

	for out := range results {
		if out.Name == "succ" {
			fmt.Printf("%v\n", out)
		}
	}
}

希望对你有帮助!

英文:

> I was wondering if we could make it have different length?

Absolutely but you need some way of determining when you have reached the end of the results. This is the reason your example fails - currently the function assumes there will be numJobs (one result per job) results and waits for that many.

An alternative would be to use the channels closure to indicate this i.e. (playground)

package main

import (
	&quot;fmt&quot;
	&quot;math/rand&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

type DemoResult struct {
	Name string
	Rate int
}

func random(min, max int) int {
	rand.Seed(time.Now().UTC().UnixNano())
	return rand.Intn(max-min) + min
}

func worker(id int, jobs &lt;-chan int, results chan&lt;- DemoResult) {
	for j := range jobs {
		fmt.Println(&quot;worker&quot;, id, &quot;started  job&quot;, j)
		time.Sleep(time.Second)
		fmt.Println(&quot;worker&quot;, id, &quot;finished job&quot;, j)
		myrand := random(1, 4)
		if myrand == 2 {
			results &lt;- DemoResult{Name: &quot;succ&quot;, Rate: j}
		} // else {
		//	results &lt;- DemoResult{Name: &quot;failed&quot;, Rate: 999}
		//}
	}
}

func main() {
	const numWorkers = 3
	const numJobs = 5
	jobs := make(chan int, numJobs)
	results := make(chan DemoResult)

	var wg sync.WaitGroup
	wg.Add(numWorkers)
	for w := 1; w &lt;= numWorkers; w++ {
		go func() {
			worker(w, jobs, results)
			wg.Done()
		}()
	}
	go func() {
		wg.Wait() // Wait for go routines to complete then close results channel
		close(results)
	}()

	for j := 1; j &lt;= numJobs; j++ {
		jobs &lt;- j
	}
	close(jobs)

	for out := range results {
		if out.Name == &quot;succ&quot; {
			fmt.Printf(&quot;%v\n&quot;, out)
		}
	}
}

huangapple
  • 本文由 发表于 2022年7月17日 11:05:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/73009006.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定