英文:
Goroutine implementation doubts
问题
我需要一些关于如何在这个问题中使用goroutines的帮助。我只会发布一些代码片段,但如果你想深入了解,可以在这里查看完整代码。
基本上,我有一个distributor函数,它会多次调用并接收一个请求切片,每次调用该函数时,它必须将这个请求分发给其他函数来实际解决请求。我试图创建一个通道并在新的goroutine上启动这个函数来解决请求,以便程序可以并发处理请求。
distributor函数的调用方式如下:
// Run触发系统开始接收请求
func Run() {
// 从这里开始,让我们创建一个通道来接收请求
requestCh := make(chan []string)
idCh := make(chan string)
// 如果你想参与,你需要在这里注册你的Sender
go publisher.Sender(requestCh)
go makeID(idCh)
// 我们的请求池
for request := range requestCh {
// 添加ID
request = append(request, <-idCh)
// 分发
distributor(request)
}
// 问题所在
for result := range resultCh {
fmt.Println(result)
}
}
distributor函数本身如下:
// 将请求分发到各自的通道。
// 没有排队等待。每个人都有自己的goroutine!
func distributor(request []string) {
switch request[0] {
case "sum":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "sub":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "mult":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "div":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "fibonacci":
fibCh := make(chan []string)
go fibonacci.Exec(fibCh, resultCh)
fibCh <- request
case "reverse":
revCh := make(chan []string)
go reverse.Exec(revCh, resultCh)
revCh <- request
case "encode":
encCh := make(chan []string)
go encode.Exec(encCh, resultCh)
encCh <- request
}
}
fibonacci.Exec函数用于说明我如何根据在fibCh上接收到的请求计算斐波那契数,并通过resultCh发送结果值。
func Exec(fibCh chan []string, result chan map[string]string) {
fib := parse(<-fibCh)
nthFibonacci(fib)
result <- fib
}
到目前为止,在Run函数中,当我遍历resultCh时,我得到了结果,但也出现了死锁。为什么会这样?此外,我想象我应该使用waitGroup函数来等待goroutine完成,但我不确定如何实现,因为我希望接收连续的请求流。我希望能够帮助你理解我在这里做错了什么,并找到解决方法。
英文:
I need some help on understanding how to use goroutines in this problem. I will post only some snippets of code but if you want to take a deep look you can check it out here
Basically, I have a distributor function which receives a request slice being called many times, and each time the function is called it must distribute this request among other functions to actually resolve the request. And what I'm trying to create a channel and launch this function to resolve the request on a new goroutine, so the program can handle requests concurrently.
How the distribute function is called:
// Run trigger the system to start receiving requests
func Run() {
// Since the programs starts here, let's make a channel to receive requests
requestCh := make(chan []string)
idCh := make(chan string)
// If you want to play with us you need to register your Sender here
go publisher.Sender(requestCh)
go makeID(idCh)
// Our request pool
for request := range requestCh {
// add ID
request = append(request, <-idCh)
// distribute
distributor(request)
}
// PROBLEM
for result := range resultCh {
fmt.Println(result)
}
}
The distribute function itself:
// Distribute requests to respective channels.
// No waiting in line. Everybody gets its own goroutine!
func distributor(request []string) {
switch request[0] {
case "sum":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "sub":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "mult":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "div":
arithCh := make(chan []string)
go arithmetic.Exec(arithCh, resultCh)
arithCh <- request
case "fibonacci":
fibCh := make(chan []string)
go fibonacci.Exec(fibCh, resultCh)
fibCh <- request
case "reverse":
revCh := make(chan []string)
go reverse.Exec(revCh, resultCh)
revCh <- request
case "encode":
encCh := make(chan []string)
go encode.Exec(encCh, resultCh)
encCh <- request
}
}
And the fibonacci.Exec function to illustrate how I'm trying to calculate the Fibonacci given a request received on the fibCh and sending the result value through the resultCh.
func Exec(fibCh chan []string, result chan map[string]string) {
fib := parse(<-fibCh)
nthFibonacci(fib)
result <- fib
}
So far, at the Run function when I range over the resultCh I get the results but also a deadlock. But why? Also, I imagine that I should use the waitGroup function to wait the goroutines to finish but I'm not sure of how implement that since I'm expecting receive a continuous stream of requests. I would appreciate some help on understanding what I'm doing wrong here and a way to solve it.
答案1
得分: 1
我不会深入研究你的应用程序的实现细节,但基本上听起来你可以使用"workers"模式。
使用"workers"模式,多个goroutine可以从一个单一的通道中读取,将工作量分配给CPU核心,因此得名"workers"。在Go中,这种模式很容易实现 - 只需启动一些带有通道参数的goroutine,并将值发送到该通道 - 分发和多路复用将由Go运行时自动完成。
这是一个"workers"模式的简单实现:
package main
import (
"fmt"
"sync"
"time"
)
func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-tasksCh
if !ok {
return
}
d := time.Duration(task) * time.Millisecond
time.Sleep(d)
fmt.Println("processing task", task)
}
}
func pool(wg *sync.WaitGroup, workers, tasks int) {
tasksCh := make(chan int)
for i := 0; i < workers; i++ {
go worker(tasksCh, wg)
}
for i := 0; i < tasks; i++ {
tasksCh <- i
}
close(tasksCh)
}
func main() {
var wg sync.WaitGroup
wg.Add(36)
go pool(&wg, 36, 50)
wg.Wait()
}
另一个有用的资源是如何使用"WaitGroup"等待所有goroutine执行完毕后再继续(以避免陷入死锁)的文章:
以下是一个非常基本的实现:
如果你不想改变实现以使用"worker"模式,也许使用另一个通道来表示goroutine执行结束会是一个好主意,因为当无缓冲通道没有接收者接收发送的消息时,会发生死锁。
done := make(chan bool)
//.....
done <- true //告诉主函数一切都完成了。
因此,当你接收到消息时,通过将通道值设置为true来标记执行完成。
英文:
I'm not digging into the implementation details of your application, but basically as it sounds to me, you can use the workers
pattern.
Using the workers
pattern multiple goroutines can read from a single channel, distributing an amount of work between CPU cores, hence the workers name. In Go, this pattern is easy to implement - just start a number of goroutines with channel as parameter, and just send values to that channel - distributing and multiplexing will be done by Go runtime, automagically.
Here is a simple implementation of the workers pattern:
package main
import (
"fmt"
"sync"
"time"
)
func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-tasksCh
if !ok {
return
}
d := time.Duration(task) * time.Millisecond
time.Sleep(d)
fmt.Println("processing task", task)
}
}
func pool(wg *sync.WaitGroup, workers, tasks int) {
tasksCh := make(chan int)
for i := 0; i < workers; i++ {
go worker(tasksCh, wg)
}
for i := 0; i < tasks; i++ {
tasksCh <- i
}
close(tasksCh)
}
func main() {
var wg sync.WaitGroup
wg.Add(36)
go pool(&wg, 36, 50)
wg.Wait()
}
Another useful resource how you can use the WaitGroup
to wait for all the goroutines to finish the execution before to continue (hence to not trap into deadlock) is this nice article:
And a very basic implementation of it:
<kbd>Go playground</kbd>
If you do not want to change the implementation to use the worker
pattern maybe would be a good idea to use another channel to signify the end of goroutine execution, because deadlock happens when there is no receiver to accept the sent message through unbuffered channel.
done := make(chan bool)
//.....
done <- true //Tell the main function everything is done.
So when you receive the message you mark the execution as completed by setting the channel value to true.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论