并发问题 – 在两个通道上发送 INT 流 – 从一个通道读取

huangapple go评论91阅读模式
英文:

A problem of concurrency - sending streams of INT on two channels - reading from one

问题

我应该实现以下内容:

  1. 一个名为A的Go协程,它生成随机整数并将其放入一个通道,在每次通道推送后暂停1秒。
  2. 第二个Go协程(B)执行相同的操作。将随机整数放入通道B,并在每次推送后暂停2秒。
  3. 现在,我需要从这两个通道中读取数据,并创建一个总和。例如,从通道A中获取的第一个元素与从通道B中获取的第一个元素相加,然后将结果放入通道C(以此类推,每次加1),直到创建了100个总和。
  4. 当完成100个总和(放入通道C并读取)后,关闭通道A、通道B和通道C。

到目前为止,我已经有了以下代码:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {

	a := make(chan int, 100)
	b := make(chan int, 100)
	c := make(chan string, 100)

	go func() {
		for {
			rand.Seed(time.Now().UnixNano())
			a <- rand.Intn(101)
			time.Sleep(time.Millisecond * 100)
		}
	}()

	go func() {
		for {
			rand.Seed(time.Now().UnixNano())
			b <- rand.Intn(101)
			time.Sleep(time.Millisecond * 300)
		}
	}()

	go func() {
		for {
			select {
			case ai := <-a:
				bi := <-b
				sum := ai + bi
				c <- fmt.Sprintf("%d + %d = %d", ai, bi, sum)

			}
		}
	}()

	sums := 0
	for val := range c {
		if sums == 100 {
			close(c)
			close(b)
			close(a)
		}
		println(val)
		sums++
	}

}

为了测试目的,我将秒更改为毫秒,并且验证了10个总和,但你可以理解这个思路。

额外信息:通道A和通道B必须缓冲100个项目。为了测试目的,我在这里只放了10个。

我不时遇到死锁问题,我知道原因。我的问题是,我不明白如何从接收通道关闭两个发送通道。有人可以解决这个问题并给我解释一下吗?

谢谢!

英文:

What I should implement:

  1. a go routine (let's call it A) that generates random INT's and puts them on a channel and pauses after each channel push, 1 second.
  2. a second go routine (B) that does the same. Puts random INT's to channel B and pauses for 2 seconds.
  3. Now, I have to read from both channels, and create a SUM. For example. First element that comes from channel A with first element that comes from channel B - make a sum and put it on a channel C (and so on +1) until there are 100 sums created.
  4. When 100 sums are done (put in channel C and read) - close channel A , channel B and channel C.

What I have until now:

package main

import (
	&quot;fmt&quot;
	&quot;math/rand&quot;
	&quot;time&quot;
)

func main() {

	a := make(chan int, 10)
	b := make(chan int, 10)
	c := make(chan string, 10)

	go func() {
		for {
			rand.Seed(time.Now().UnixNano())
			a &lt;- rand.Intn(101)
			time.Sleep(time.Millisecond * 100)
		}
	}()

	go func() {
		for {
			rand.Seed(time.Now().UnixNano())
			b &lt;- rand.Intn(101)
			time.Sleep(time.Millisecond * 300)
		}
	}()

	go func() {
		for {
			select {
			case ai := &lt;-a:
				bi := &lt;-b
				sum := ai + bi
				c &lt;- fmt.Sprintf(&quot;%d + %d = %d&quot;, ai, bi, sum)

			}
		}
	}()

	sums := 0
	for val := range c {
		if sums == 10 {
			close(c)
			close(b)
			close(a)
		}
		println(val)
		sums++
	}

}

For testing purposes I changed seconds to milliseconds and instead of 100 sums, I verify for 10 but you get the idea.

Extra info: channel A and channel B have to be buffered at 100 items. As well, for testing purposes I only put 10 here.

I keep receiving deadlocks every now and then and I get why. My problem is that, I don't understand how can I close two sending channels from a receiver channel. Can anyone solve this mistery and explain a bit to me.

Thank you!

答案1

得分: 1

不是死锁,但是当你向关闭的通道 ab 写入时,你应该会收到 panic 错误。

你需要使用另一个通道来让 goroutine 知道处理已经完成。

done := make(chan struct{})

将 goroutine 修改为检测 done

go func() {
        for {
            rand.Seed(time.Now().UnixNano())
            select {
               case b <- rand.Intn(101):
                  time.Sleep(time.Millisecond * 300)
               case <-done:
                   return
             }
        }
    }()

go func() {
        for {
            select {
            case ai := <-a:
                bi := <-b
                sum := ai + bi
                c <- fmt.Sprintf("%d + %d = %d", ai, bi, sum)
             case <-done:
                close(c) // 告诉监听者我们已经完成
                return
            }
        }
    }()

完成后,关闭 done 通道。这也会导致 c 被关闭:

for val := range c {
        if sums == 10 {
            close(done)
        }
        println(val)
        sums++
    }

c 被关闭时,for 循环将终止。

英文:

Not a deadlock, but you should receive a panic when you write to the closed channels a and b.

You have to use another channel to let the goroutines know that processing is finished.

done := make(chan struct{})

Change the goroutines to test for done:

go func() {
        for {
            rand.Seed(time.Now().UnixNano())
            select {
               case b &lt;- rand.Intn(101):
                  time.Sleep(time.Millisecond * 300)
               case &lt;-done:
                   return
             }
        }
    }()

go func() {
        for {
            select {
            case ai := &lt;-a:
                bi := &lt;-b
                sum := ai + bi
                c &lt;- fmt.Sprintf(&quot;%d + %d = %d&quot;, ai, bi, sum)
             case &lt;-done:
                close(c) // tell listeners that we are done
                return
            }
        }
    }()

When you're done, close the done channel. This will also cause c to be closed:

for val := range c {
        if sums == 10 {
            close(done)
        }
        println(val)
        sums++
    }

When c is closed, the for loop will terminate.

答案2

得分: 0

设置通道为nil

func main() {
    a := make(chan int, 10)
    b := make(chan int, 10)
    c := make(chan string, 10)

    go func() {
        for {
            rand.Seed(time.Now().UnixNano())
            if a == nil {
                break
            } else {
                a <- rand.Intn(101)
            }
            time.Sleep(time.Millisecond * 100)
        }
        println("a loop is quit")
    }()

    go func() {
        for {
            rand.Seed(time.Now().UnixNano())
            if b == nil {
                break
            } else {
                b <- rand.Intn(101)
            }
            time.Sleep(time.Millisecond * 300)
        }
        println("b loop is quit")
    }()

    go func() {
        for {
            ai := <-a
            bi := <-b
            sum := ai + bi
            if a == nil || b == nil || c == nil {
                break
            } else {
                c <- fmt.Sprintf("%d + %d = %d", ai, bi, sum)
            }
        }
        println("sum loop is quit")
    }()

    sums := 0
    for val := range c {
        println(val)
        sums++
        if sums == 10 {
            close(a)
            close(b)
            close(c)
            a, b, c = nil, nil, nil
        }
    }
    time.Sleep(time.Second)
}

playground: https://go.dev/play/p/-Im_vCcessJ

英文:

set channel to nil

func main() {
	a := make(chan int, 10)
	b := make(chan int, 10)
	c := make(chan string, 10)

	go func() {
		for {
			rand.Seed(time.Now().UnixNano())
			if a == nil {
				break
			} else {
				a &lt;- rand.Intn(101)
			}
			time.Sleep(time.Millisecond * 100)
		}
		println(&quot;a loop is quit&quot;)
	}()

	go func() {
		for {
			rand.Seed(time.Now().UnixNano())
			if b == nil {
				break
			} else {
				b &lt;- rand.Intn(101)
			}
			time.Sleep(time.Millisecond * 300)
		}
		println(&quot;b loop is quit&quot;)
	}()

	go func() {
		for {
			ai := &lt;-a
			bi := &lt;-b
			sum := ai + bi
			if a == nil || b == nil || c == nil {
				break
			} else {
				c &lt;- fmt.Sprintf(&quot;%d + %d = %d&quot;, ai, bi, sum)
			}
		}
		println(&quot;sum loop is quit&quot;)
	}()

	sums := 0
	for val := range c {
		println(val)
		sums++
		if sums == 10 {
			close(a)
			close(b)
			close(c)
			a, b, c = nil, nil, nil
		}
	}
	time.Sleep(time.Second)
}

playground : https://go.dev/play/p/-Im_vCcessJ

答案3

得分: 0

这是你可以实现它的方法:

创建非缓冲通道。

一个Go协程:

将100个整数通过睡眠的方式发送到通道a,并在所有100个整数都发送完毕后关闭通道a。

另一个Go协程:

将200个整数通过睡眠的方式发送到通道b,并在所有200个整数都发送完毕后关闭通道b。

第三个Go协程:

  1. 从通道a和b接收数据,直到它们都关闭。
  2. 将从a和b接收到的值放入两个专用的队列中。我们需要使用中间数据存储来解除通道的阻塞。
  3. 在将数据添加到任何一个队列后,检查第二个队列是否有数据。如果两个队列都包含数据,则从每个队列中取出一个元素,并将它们的和发送到通道c。
  4. 当a和b都关闭且至少一个队列为空时,关闭通道c。

通过这种设计,Go协程a和b控制数据流,而监听器不需要对数据量和延迟比率做任何假设。

如果你事先知道要发送到a和b的项目数量,可以使用缓冲通道。但这不是内存高效的,因为你必须为最坏情况分配内存。使用动态队列,你只需在内存中保留尚未发送到c的数据。

英文:

This is how you can implement it:

Create not buffered channels.

a go routine:

Post 100 ints with sleep to channel a and CLOSE channel a when all 100 posted

b go routine:

Post 200 ints with sleep to channel b and CLOSE channel b when all 200 posted

c goroutine:

  1. Listen from channels a and b until they both closed
  2. Put values from a and b into two dedicated queues We need to use intermediate data storage to unblock channels.
  3. After adding data into any of the queues, check if there is a data in second one. If both queues contain data, get an element from each and send the sum to channel c.
  4. Close c when a and b closed and at least one queue is empty.

With this design, go-routines a and b controls data flow, and you can implement any number of items and delays ratio without listener making any assumptions about that.

Buffered channels can be used if you know in advance a number of items you are going to send into a and b. It is not memory efficient though because you have to allocate memory for worst case scenario. With dynamic queues, you only keep in memory data that was not sent to c.

huangapple
  • 本文由 发表于 2022年7月10日 22:03:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/72929080.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定