Goroutine实现的疑问

huangapple go评论96阅读模式
英文:

Goroutine implementation doubts

问题

我需要一些关于如何在这个问题中使用goroutines的帮助。我只会发布一些代码片段,但如果你想深入了解,可以在这里查看完整代码。

基本上,我有一个distributor函数,它会多次调用并接收一个请求切片,每次调用该函数时,它必须将这个请求分发给其他函数来实际解决请求。我试图创建一个通道并在新的goroutine上启动这个函数来解决请求,以便程序可以并发处理请求。

distributor函数的调用方式如下:

// Run触发系统开始接收请求
func Run() {

    // 从这里开始,让我们创建一个通道来接收请求
    requestCh := make(chan []string)
    idCh := make(chan string)

    // 如果你想参与,你需要在这里注册你的Sender
    go publisher.Sender(requestCh)
    go makeID(idCh)
    // 我们的请求池
    for request := range requestCh {

        // 添加ID
        request = append(request, <-idCh)

        // 分发
        distributor(request)
    }
    
    // 问题所在
    for result := range resultCh {
        fmt.Println(result)
    }
}

distributor函数本身如下:

// 将请求分发到各自的通道。
// 没有排队等待。每个人都有自己的goroutine!
func distributor(request []string) {

    switch request[0] {

    case "sum":
        arithCh := make(chan []string)
        go arithmetic.Exec(arithCh, resultCh)
        arithCh <- request
    case "sub":
        arithCh := make(chan []string)
        go arithmetic.Exec(arithCh, resultCh)
        arithCh <- request
    case "mult":
        arithCh := make(chan []string)
        go arithmetic.Exec(arithCh, resultCh)
        arithCh <- request
    case "div":
        arithCh := make(chan []string)
        go arithmetic.Exec(arithCh, resultCh)
        arithCh <- request
    case "fibonacci":
        fibCh := make(chan []string)
        go fibonacci.Exec(fibCh, resultCh)
        fibCh <- request
    case "reverse":
        revCh := make(chan []string)
        go reverse.Exec(revCh, resultCh)
        revCh <- request
    case "encode":
        encCh := make(chan []string)
        go encode.Exec(encCh, resultCh)
        encCh <- request
    }
}

fibonacci.Exec函数用于说明我如何根据在fibCh上接收到的请求计算斐波那契数,并通过resultCh发送结果值。

func Exec(fibCh chan []string, result chan map[string]string) {

    fib := parse(<-fibCh)
    nthFibonacci(fib)

    result <- fib
}

到目前为止,在Run函数中,当我遍历resultCh时,我得到了结果,但也出现了死锁。为什么会这样?此外,我想象我应该使用waitGroup函数来等待goroutine完成,但我不确定如何实现,因为我希望接收连续的请求流。我希望能够帮助你理解我在这里做错了什么,并找到解决方法。

英文:

I need some help on understanding how to use goroutines in this problem. I will post only some snippets of code but if you want to take a deep look you can check it out here

Basically, I have a distributor function which receives a request slice being called many times, and each time the function is called it must distribute this request among other functions to actually resolve the request. And what I'm trying to create a channel and launch this function to resolve the request on a new goroutine, so the program can handle requests concurrently.

How the distribute function is called:

// Run trigger the system to start receiving requests
func Run() {

	// Since the programs starts here, let&#39;s make a channel to receive requests
	requestCh := make(chan []string)
	idCh := make(chan string)

	// If you want to play with us you need to register your Sender here
	go publisher.Sender(requestCh)
	go makeID(idCh)
	// Our request pool
	for request := range requestCh {

		// add ID
		request = append(request, &lt;-idCh)

		// distribute
		distributor(request)
	}
    
	// PROBLEM
    for result := range resultCh {
	    fmt.Println(result)
	}
}

The distribute function itself:

// Distribute requests to respective channels.
// No waiting in line. Everybody gets its own goroutine!
func distributor(request []string) {

	switch request[0] {

	case &quot;sum&quot;:
		arithCh := make(chan []string)
		go arithmetic.Exec(arithCh, resultCh)
		arithCh &lt;- request
	case &quot;sub&quot;:
		arithCh := make(chan []string)
		go arithmetic.Exec(arithCh, resultCh)
		arithCh &lt;- request
	case &quot;mult&quot;:
		arithCh := make(chan []string)
		go arithmetic.Exec(arithCh, resultCh)
		arithCh &lt;- request
	case &quot;div&quot;:
		arithCh := make(chan []string)
		go arithmetic.Exec(arithCh, resultCh)
		arithCh &lt;- request
	case &quot;fibonacci&quot;:
		fibCh := make(chan []string)
		go fibonacci.Exec(fibCh, resultCh)
		fibCh &lt;- request
	case &quot;reverse&quot;:
		revCh := make(chan []string)
		go reverse.Exec(revCh, resultCh)
		revCh &lt;- request
	case &quot;encode&quot;:
		encCh := make(chan []string)
		go encode.Exec(encCh, resultCh)
		encCh &lt;- request
	}
}

And the fibonacci.Exec function to illustrate how I'm trying to calculate the Fibonacci given a request received on the fibCh and sending the result value through the resultCh.

func Exec(fibCh chan []string, result chan map[string]string) {

	fib := parse(&lt;-fibCh)
	nthFibonacci(fib)

	result &lt;- fib
}

So far, at the Run function when I range over the resultCh I get the results but also a deadlock. But why? Also, I imagine that I should use the waitGroup function to wait the goroutines to finish but I'm not sure of how implement that since I'm expecting receive a continuous stream of requests. I would appreciate some help on understanding what I'm doing wrong here and a way to solve it.

答案1

得分: 1

我不会深入研究你的应用程序的实现细节,但基本上听起来你可以使用"workers"模式。

使用"workers"模式,多个goroutine可以从一个单一的通道中读取,将工作量分配给CPU核心,因此得名"workers"。在Go中,这种模式很容易实现 - 只需启动一些带有通道参数的goroutine,并将值发送到该通道 - 分发和多路复用将由Go运行时自动完成。

这是一个"workers"模式的简单实现:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-tasksCh
        if !ok {
            return
        }
        d := time.Duration(task) * time.Millisecond
        time.Sleep(d)
        fmt.Println("processing task", task)
    }
}

func pool(wg *sync.WaitGroup, workers, tasks int) {
    tasksCh := make(chan int)

    for i := 0; i < workers; i++ {
        go worker(tasksCh, wg)
    }

    for i := 0; i < tasks; i++ {
        tasksCh <- i
    }

    close(tasksCh)
}

func main() {
    var wg sync.WaitGroup
    wg.Add(36)
    go pool(&wg, 36, 50)
    wg.Wait()
}

另一个有用的资源是如何使用"WaitGroup"等待所有goroutine执行完毕后再继续(以避免陷入死锁)的文章:

http://nathanleclaire.com/blog/2014/02/15/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing/

以下是一个非常基本的实现:

Go playground

如果你不想改变实现以使用"worker"模式,也许使用另一个通道来表示goroutine执行结束会是一个好主意,因为当无缓冲通道没有接收者接收发送的消息时,会发生死锁。

done := make(chan bool)
//.....
done <- true //告诉主函数一切都完成了。

因此,当你接收到消息时,通过将通道值设置为true来标记执行完成。

英文:

I'm not digging into the implementation details of your application, but basically as it sounds to me, you can use the workers pattern.

Using the workers pattern multiple goroutines can read from a single channel, distributing an amount of work between CPU cores, hence the workers name. In Go, this pattern is easy to implement - just start a number of goroutines with channel as parameter, and just send values to that channel - distributing and multiplexing will be done by Go runtime, automagically.

Here is a simple implementation of the workers pattern:

package main

import (
    &quot;fmt&quot;
    &quot;sync&quot;
    &quot;time&quot;
)

func worker(tasksCh &lt;-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := &lt;-tasksCh
        if !ok {
            return
        }
        d := time.Duration(task) * time.Millisecond
        time.Sleep(d)
        fmt.Println(&quot;processing task&quot;, task)
    }
}

func pool(wg *sync.WaitGroup, workers, tasks int) {
    tasksCh := make(chan int)

    for i := 0; i &lt; workers; i++ {
        go worker(tasksCh, wg)
    }

    for i := 0; i &lt; tasks; i++ {
        tasksCh &lt;- i
    }

    close(tasksCh)
}

func main() {
    var wg sync.WaitGroup
    wg.Add(36)
    go pool(&amp;wg, 36, 50)
    wg.Wait()
}

Another useful resource how you can use the WaitGroup to wait for all the goroutines to finish the execution before to continue (hence to not trap into deadlock) is this nice article:

http://nathanleclaire.com/blog/2014/02/15/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing/

And a very basic implementation of it:

<kbd>Go playground</kbd>

If you do not want to change the implementation to use the worker pattern maybe would be a good idea to use another channel to signify the end of goroutine execution, because deadlock happens when there is no receiver to accept the sent message through unbuffered channel.

done := make(chan bool)
//.....
done &lt;- true //Tell the main function everything is done.

So when you receive the message you mark the execution as completed by setting the channel value to true.

huangapple
  • 本文由 发表于 2016年2月18日 15:06:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/35474949.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定