在Go语言中,我们可以使用多个通道和多个Go协程。

huangapple go评论76阅读模式
英文:

Can we have multiple channels having multiple go routines in golang

问题

  1. 我们可以有多个goroutine监听多个通道,面临的问题是无法打印出所有的问题。

  2. 我无法打印出所有的数字,如何改进这段代码?

  3. 如果可能的话,有人可以提供一些示例吗?因为我在这个示例中遇到了困难。

  4. 每个goroutine之后是否需要使用time.sleep?


package main

import (
	"fmt"
	"strconv"
	"sync"
	"time"
)

var count string

func worker3(var3 chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	for ch := range var3 {
		count += ch + " "
	}
}

func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	for ch := range var2 {
		var3 <- ch
	}
}

func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	for ch := range var1 {
		var2 <- ch
	}
}

func main() {
	var1 := make(chan string, 1500)
	var2 := make(chan string, 1500)
	var3 := make(chan string, 1500)

	var wg sync.WaitGroup
	count = ""
	for i := 0; i < 15; i++ {
		time.Sleep(time.Second)
		wg.Add(1)
		go worker1(var1, var2, var3, &wg)
	}
	for i := 0; i < 15; i++ {
		time.Sleep(time.Second)
		wg.Add(1)
		go worker2(var2, var3, &wg)
	}
	for i := 0; i < 15; i++ {
		time.Sleep(time.Second)
		wg.Add(1)
		go worker3(var3, &wg)
	}

	for i := 0; i <= 100000; i++ {
		var1 <- strconv.Itoa(i)
	}
	time.Sleep(time.Second)
	wg.Wait()
	fmt.Println(count)
}

英文:
  1. Can we have multiple go routines listening to multiple channels facing issuing in printing all the problems.

  2. I am not able to print all the numbers how can I improved this piece of code

  3. If possible can anyone provide some example as I am struggling with this example.

  4. Is time.sleep needed after every go routine


package main
import (
&quot;fmt&quot;
&quot;strconv&quot;
&quot;sync&quot;
&quot;time&quot;
)
var count string
func worker3(var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var3 {
count += ch + &quot; &quot;
}
}
func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var2 {
var3 &lt;- ch
}
}
func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var1 {
var2 &lt;- ch
}
}
func main() {
var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)
var wg sync.WaitGroup
count = &quot;&quot;
for i := 0; i &lt; 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker1(var1, var2, var3, &amp;wg)
}
for i := 0; i &lt; 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker2(var2, var3, &amp;wg)
}
for i := 0; i &lt; 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker3(var3, &amp;wg)
}
for i := 0; i &lt;= 100000; i++ {
var1 &lt;- strconv.Itoa(i)
}
time.Sleep(time.Second)
wg.Wait()
fmt.Println(count)
}

答案1

得分: 1

是的,这很复杂,但有几个经验法则可以使事情变得更简单明了。

  • 优先使用形式参数来传递给go协程的通道,而不是在全局范围内访问通道。这样可以获得更多的编译器检查,并且具有更好的模块化。
  • 避免在特定的go协程(包括'main'协程)中同时读取和写入同一个通道。否则,死锁的风险会更大。
英文:

Yes, it's complicated, But there are a couple of rules of thumb that should make things feel much more straightforward.

  • prefer using formal arguments for the channels you pass to go-routines instead of accessing channels in global scope. You can get more compiler checking this way, and better modularity too.
  • avoid both reading and writing on the same channel in a particular go-routine (including the 'main' one). Otherwise, deadlock is a much greater risk.

答案2

得分: 1

让我们看看你的程序在做什么。

首先,你初始化了三个带缓冲的通道:var1、var2、var3。

var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)

然后,你初始化了一个 WaitGroup(wg)。

var wg sync.WaitGroup

接下来,你定义了一个变量 count,并将其初始化为空字符串。

count = ""

下一部分是一个循环,从 0 到 15,生成 15 个 worker1 协程。

for i := 0; i < 15; i++ {
     time.Sleep(time.Second)
     wg.Add(1)
     go worker1(var1, var2, var3, &wg)
}

每次启动一个 worker1 协程,并将通道和 WaitGroup(wg)的指针传递给 worker1。

那么 worker1 做什么呢?

func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var1 {
            var2 <- ch
        }
    }

worker1 会等待 var1 通道中的数据,并将其传递给 var2 通道。

这很好。你绝对不需要这个 time.Sleep(time.Second)。

让我们继续。

现在,你创建了一个新的循环,将生成另外 15 个 worker2 协程。

for i := 0; i < 15; i++ {
    time.Sleep(time.Second)
    wg.Add(1)
    go worker2(var2, var3, &wg)
}

worker2 会从 var2 通道中取出所有内容,并将其传递给 var3 通道。

func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var2 {
            var3 <- ch
        }
    }

现在,你为 worker3 创建了另外 15 个协程。

for i := 0; i < 15; i++ {
            time.Sleep(time.Second)
            wg.Add(1)
            go worker3(var3, &wg)
        }

worker3 会从 var3 通道中取出所有内容,并将其附加到 count 字符串中进行处理。

代码的最后一部分是向通道发送数据的种子程序。该循环从 0 到 100000,并将每个数字转换为字符串,然后将其传递给 var1 通道。

接下来,程序将等待所有协程完成并打印结果。

好的,这段代码有几个问题。

  1. 在每个协程之前,你绝对不需要这个 time.Sleep(time.Second),也不需要在 wg.Wait() 之前使用 time.Sleep。
  2. 对于这种类型的工作负载,不需要使用带缓冲的通道。这是一个简单的流水线,你可以使用无缓冲的通道,并且这些通道将用于任务之间的同步。

当你将代码更改为使用无缓冲的通道并删除这些 time.Sleep 时,仍然存在一个问题。问题是 Go 语言运行时显示错误:

fatal error: all goroutines are asleep - deadlock!

并终止代码的执行。

但是为什么会发生这种情况?我们有 sync.WaitGroup,一切看起来都很好。让我们看一个产生相同错误的更简单的程序。

package main

import (
	"log"
	"strconv"
	"sync"
)

func worker(var1 <-chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	for e := range var1 {
		log.Printf("Element e %s ", e)
	}

}
func main() {
	var1 := make(chan string)
	var wg sync.WaitGroup
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go worker(var1, &wg)
	}

	for i := 0; i < 15; i++ {
		var1 <- strconv.Itoa(i)
	}

	wg.Wait()
}

这段代码也会产生与你的代码相同的错误。这是因为这些通道从未被关闭,而 goroutine(worker)将永远等待通道中的新数据。Go 运行时检测到这一点并终止进程。

为了防止这种类型的错误,我们需要添加一些机制来告诉 goroutine 我们已经完成了,goroutine 可以停止监听该通道并正确结束。

发送该信号的最简单方法是关闭由该 goroutine 读取的通道。以下是修复问题的代码。

package main

import (
	"log"
	"strconv"
	"sync"
)

func worker(var1 <-chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	for e := range var1 {
		log.Printf("Element e %s ", e)
	}

}
func main() {
	var1 := make(chan string)
	var wg sync.WaitGroup
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go worker(var1, &wg)
	}

	for i := 0; i < 15; i++ {
		var1 <- strconv.Itoa(i)
	}
	close(var1)
	wg.Wait()
}

这段代码不会产生错误。该代码将正确终止。

但是有一个技巧。你如何在你的代码中做到这一点?有 15 个 goroutine 从 var1 通道中读取,15 个 goroutine 从 var2 通道中读取,以及 15 个从 var3 通道中读取。

很难知道何时可以关闭哪个通道。我们知道对于 var1 通道,它首先处理数据,因此当生产者完成向同步通道插入数据时,我们可以关闭它们。原因是在读取之前,我们无法向通道插入新数据。因此,当生产者插入所有数据时,我们知道 var1 通道上的所有数据都已处理完毕,因此可以安全地关闭该通道。但是 var2 和 var3 通道呢?

有 15 个不同的 goroutine 等待 var2 通道,15 个等待 var3 通道,这意味着我们需要找到一种方法,在所有 goroutine(worker1)中关闭 var2 通道时关闭它,对于 var3 通道也是如此。这可以通过创建两个额外的 goroutine wg1 和 wg2 来完成,并使用这些 goroutine 来为 worker1 和 worker2 生成 goroutine。这些 goroutine 将作为协调器工作,在这些函数内部,我们为 worker1 和 worker2 创建新的 sync.Group,并且这些函数将知道何时所有这些子 goroutine 完成。因此,对于 wg1,当所有这些 worker1 goroutine 完成时,我们可以安全地关闭 var2 通道。对于 wg2 和 var3 通道也是如此。

以下是 wg1 和 wg2 函数的代码:

// wg1
wg.Add(1)
go func() {
		log.Printf("Starting WG1 master go routine")
		var wg1 sync.WaitGroup
		defer func() {
			close(var2)
			wg.Done()
		}()
		for i := 0; i < 15; i++ {
			wg1.Add(1)
			go worker1(var1, var2, &wg1)
		}
		wg1.Wait()
	}()

// wg2
wg.Add(1)
go func() {
		log.Printf("Starting WG2 routine for second stage")
		defer func() {
			close(var3)
			wg.Done()
		}()
		var wg2 sync.WaitGroup
		for i := 0; i < 15; i++ {
			wg2.Add(1)
			go worker2(var2, var3, &wg2)
		}
		wg2.Wait()
	}()

你可以在以下链接找到完整的可工作代码:
Working example

英文:

Let's see what your program doing.
You first initialized three buffered channels
var1, var2, var3

var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)

Now you initialized one WaitGroup (wg)

var wg sync.WaitGroup

Now you defined variable count and that variable is empty string

count = &quot;&quot; 

The next part is for a loop that goes from 0 to 15 and generates 15 worker1 go routines

for i := 0; i &lt; 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker1(var1, var2, var3, &amp;wg)
}

Each time you start one worker1 go routine and pass channels and pointer to waitgroup (wg) in worker1.

But what worker1 will do?

func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var1 {
var2 &lt;- ch
}
}

worker1 will wait for that in channel var1 takes data from that channel and pass it to channel var2.

This is fine. You definitely don’t need this time.Sleep(time.Second).

let's go next.

You now create a new loop that will generate another 15 go routines (worker2).

for i := 0; i &lt; 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker2(var2, var3, &amp;wg)
}

worker2 will take everything from channel var2 and pass it to the channel var3

func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var2 {
var3 &lt;- ch
}
}

now you create another 15 go routines for worker3.

for i := 0; i &lt; 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker3(var3, &amp;wg)
}

and worker3 takes everything from channel var3 processing that data by appending it to count string

The last piece of code is the seeder of data to the channels. That loop goes from 0 - 100000 and for each number convert them in string and pass it to the channel var1

next program will waiting to all go routine finished and print result.

Ok, there are a few problems with this code.

  1. you definitely don’t need this time.Sleep(time.Second) before each go routine and also you don’t need time.Sleep before wg.Wait().
  2. Buffered channels are not needed for this type of workload. This is a simple pipeline, you can use unbuffered channels, and that channels will be used for synchronization between tasks.

When you change you code to use unbuffered channels and remove these time.Sleep you still have a problem. And problem is that go lang runtime show’s error:

fatal error: all goroutines are asleep - deadlock!

and terminate a code.

But why does this happen, we have sync.WaitGroup and everything looks fine. Let’s see a simpler program that has the same error.

package main
import (
&quot;log&quot;
&quot;strconv&quot;
&quot;sync&quot;
)
func worker(var1 &lt;-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf(&quot;Element e %s &quot;, e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i &lt; 3; i++ {
wg.Add(1)
go worker(var1, &amp;wg)
}
for i := 0; i &lt; 15; i++ {
var1 &lt;- strconv.Itoa(i)
}
wg.Wait()
}

This code will also produce the same error as your code. This is because these channels never are closed and go routines (workers) will wait forever for new data in the channel. Go runtime detect that and kill the process.

To prevent this type of error we need to add some mechanism to tell gorutine that we are done and go routine can stop listening on that channel and correctly finished.

The easiest way to send that signal is to close the channel that is read by that goroutine. This is a code that fixed the problem.

package main
import (
&quot;log&quot;
&quot;strconv&quot;
&quot;sync&quot;
)
func worker(var1 &lt;-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf(&quot;Element e %s &quot;, e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i &lt; 3; i++ {
wg.Add(1)
go worker(var1, &amp;wg)
}
for i := 0; i &lt; 15; i++ {
var1 &lt;- strconv.Itoa(i)
}
close(var1)
wg.Wait()
}

and this code won’t produce errors. This code would be correctly terminated.

But there is a trick. How you can do that in your code? there are 15 go routine that read from var1 channel, 15 goroutine that read from var2 channel, and 15 from var3 channel.

It’s hard to know when you can close which channel. We know for channel var1 that processes data first so we can close them when the producer finishes with insertions data in a synchronous channel. The reason for that is we can’t insert new data into the channel until previous data is read. So when producer inserts all data we know all data on channel var1 are processed so it’s safe to close the channel. But what with channels var2 and var3.

There are 15 different go routine that wait for channel var2 and 15 for var3, these means we need to find a way to close var2 when all processing on var2 is done (in all goroutines worker1 ), and the same for var3. That can be done by creating two additional goroutine

wg1 and wg2 and use that goroutine to spawn goroutine for worker1 and worker2, these goroutine will work as an orchestrator, inside of these functions we create new sync.Group only for worker1 and worker2 and these functions will know when all of these children goroutines are finished. So for wg1 when all these worker1 goroutines are finished we can safely close var2 channel. Same for wg2 and var3 channel.

these are wg1 and wg2 functions

// wg1
wg.Add(1)
go func() {
log.Printf(&quot;Starting WG1 master go routine&quot;)
var wg1 sync.WaitGroup
defer func() {
close(var2)
wg.Done()
}()
for i := 0; i &lt; 15; i++ {
wg1.Add(1)
go worker1(var1, var2, &amp;wg1)
}
wg1.Wait()
}()
// wg2
wg.Add(1)
go func() {
log.Printf(&quot;Starting WG2 routine for second stage&quot;)
defer func() {
close(var3)
wg.Done()
}()
var wg2 sync.WaitGroup
for i := 0; i &lt; 15; i++ {
wg2.Add(1)
go worker2(var2, var3, &amp;wg2)
}
wg2.Wait()
}()

You can find full working code on:
Working example

huangapple
  • 本文由 发表于 2021年12月31日 20:02:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/70542122.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定