Golang goroutine不会在通道内运行

huangapple go评论79阅读模式
英文:

Golang goroutine doesn't run with channel inside

问题

我正在尝试实现一个单词计数程序,但在第一步遇到了一些问题。

这是我的代码:

package main

import (
    "fmt"
    "os"
    "bufio"
    "sync"
)

// 将数据加载到通道中
func laodData(arr []string,channel chan string,wg sync.WaitGroup) {
    for _,path := range arr {
        file,err := os.Open(path)
        fmt.Println("开始加载数据 ", path)
        if err != nil {
            fmt.Println(err)
            os.Exit(-1)
        }
        defer file.Close()
        reader := bufio.NewReaderSize(file, 32*10*1024)
        i := 0
        for {
            line,err := reader.ReadString('\n')
            channel <- line
            if err != nil {
                break
            }
            i++
            if i%200 == 0 {
                fmt.Println(i, "行已解析")
            }
        }
        fmt.Println("完成数据加载 ", path)
    }
    wg.Done()
}

// 将数据行分发给不同的映射器
func dispatcher(channel chan string,wg sync.WaitGroup){
    fmt.Println("拉取数据 11")
    line,ok := <- channel
    fmt.Println(ok)
    for ok {
        fmt.Println(line)
        line,ok = <- channel
    }
    fmt.Println("拉取数据 22")
    wg.Done()
}

func main() {
    path := os.Args
    if len(path) < 2 {
        fmt.Println("需要输入文件")
        os.Exit(0)
    }
    var wg sync.WaitGroup
    wg.Add(2)

    channel := make(chan string)
    defer close(channel)
    
    fmt.Println("在分发器之前")
    go laodData(path[1:],channel,wg)
    go dispatcher(channel,wg)
    wg.Wait()

    fmt.Println("在分发器之后")
}

这是我的输出:

...

完成数据加载  result.txt

抛出异常:所有的goroutine都处于休眠状态 - 死锁!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x42154100, 0x42154100)
    /usr/local/go/src/pkg/runtime/zsema_amd64.c:146 +0x25
sync.(*WaitGroup).Wait(0x4213b440, 0x0)
    /usr/local/go/src/pkg/sync/waitgroup.go:79 +0xf2
main.main()
    /Users/kuankuan/go/src/mreasy/main.go:66 +0x238

goroutine 2 [syscall]:
created by runtime.main
    /usr/local/go/src/pkg/runtime/proc.c:221

goroutine 4 [chan receive]:
main.dispatcher(0x42115a50, 0x0, 0x2, 0x0)
    /Users/kuankuan/go/src/mreasy/main.go:45 +0x223
created by main.main
    /Users/kuankuan/go/src/mreasy/main.go:65 +0x228
退出状态 2

谢谢!

英文:

I'm trying to implement a word count program, but with the first step i got some problem.

Here's my code:

package main

import (
    &quot;fmt&quot;
    &quot;os&quot;
    &quot;bufio&quot;
    &quot;sync&quot;
)

// Load data into channel
func laodData(arr []string,channel chan string,wg sync.WaitGroup) {
    for _,path := range arr {
        file,err := os.Open(path)
        fmt.Println(&quot;begin to laodData &quot;, path)
        if err != nil {
            fmt.Println(err)
            os.Exit(-1)
        }
        defer file.Close()
        reader := bufio.NewReaderSize(file, 32*10*1024)
        i := 0
        for {
            line,err := reader.ReadString(&#39;\n&#39;)
            channel &lt;- line
            if err != nil {
                break
            }
            i++
            if i%200 == 0 {
                fmt.Println(i,&quot; lines parsed&quot;)
            }
        }
        fmt.Println(&quot;finish laodData &quot;, path)
    }
    wg.Done()
}

// dispatch data lines into different mappers
func dispatcher(channel chan string,wg sync.WaitGroup){
    fmt.Println(&quot;pull data 11&quot;)
    line,ok := &lt;- channel
    fmt.Println(ok)
    for ok {
        fmt.Println(line)
        line,ok = &lt;- channel
    }
    fmt.Println(&quot;pull data 22&quot;)
    wg.Done()
}

func main() {
    path := os.Args
    if len(path) &lt; 2 {
        fmt.Println(&quot;Need Input Files&quot;)
        os.Exit(0)
    }
    var wg sync.WaitGroup
    wg.Add(2)

    channel := make(chan string)
    defer close(channel)
    
    fmt.Println(&quot;before dispatcher&quot;)
    go laodData(path[1:],channel,wg)
    go dispatcher(channel,wg)
    wg.Wait()

    fmt.Println(&quot;after dispatcher&quot;)
}

And here's my output:

...

finish laodData  result.txt

throw: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x42154100, 0x42154100)
	/usr/local/go/src/pkg/runtime/zsema_amd64.c:146 +0x25
sync.(*WaitGroup).Wait(0x4213b440, 0x0)
	/usr/local/go/src/pkg/sync/waitgroup.go:79 +0xf2
main.main()
	/Users/kuankuan/go/src/mreasy/main.go:66 +0x238

goroutine 2 [syscall]:
created by runtime.main
	/usr/local/go/src/pkg/runtime/proc.c:221

goroutine 4 [chan receive]:
main.dispatcher(0x42115a50, 0x0, 0x2, 0x0)
	/Users/kuankuan/go/src/mreasy/main.go:45 +0x223
created by main.main
	/Users/kuankuan/go/src/mreasy/main.go:65 +0x228
exit status 2

Thanks !

答案1

得分: 9

程序在主goroutine退出时终止,所以dispatcher()没有时间执行任何操作。你需要在main()中阻塞,直到dispatcher()完成。可以使用通道来实现这一点:

package main

import (
    "fmt"
    "os"
    "bufio"
)

var done = make(chan bool)             // 创建通道

// 加载文件并将它们发送到通道中供mappers读取。
func dispatcher(arr []string,channel chan string) {
    for _,path := range arr {
        file,err := os.Open(path)
        fmt.Println("开始分发 ", path)
        if err != nil {
            fmt.Println(err)
            os.Exit(-1)
        }
        defer file.Close()
        reader := bufio.NewReaderSize(file, 32*10*1024)
        i := 0
        for {
            line,_ := reader.ReadString('\n')
            channel <- line
            i++
            if i%200 == 0 {
                fmt.Println(i, " 行已解析")
            }
        }
        fmt.Println("完成分发 ", path)
    }
	done <- true                 // 通知main()完成
}

func main() {
    path := os.Args
    if len(path) < 2 {
        fmt.Println("需要输入文件")
        os.Exit(0)
    }
    channel := make(chan string)
    fmt.Println("在dispatcher之前")
    go dispatcher(path[1:],channel)
	<-done                 // 等待dispatcher()
    fmt.Println("在dispatcher之后")
}
英文:

Program terminates when main goroutine exits, so that dispatcher() has no time to do anything. You need to block in main() until dispatcher() completes. Channel can be used for this:

package main

import (
    &quot;fmt&quot;
    &quot;os&quot;
    &quot;bufio&quot;
)

var done = make(chan bool)             // create channel

// Load files and send them into a channel for mappers reading.
func dispatcher(arr []string,channel chan string) {
    for _,path := range arr {
        file,err := os.Open(path)
        fmt.Println(&quot;begin to dispatch &quot;, path)
        if err != nil {
            fmt.Println(err)
            os.Exit(-1)
        }
        defer file.Close()
        reader := bufio.NewReaderSize(file, 32*10*1024)
        i := 0
        for {
            line,_ := reader.ReadString(&#39;\n&#39;)
            channel &lt;- line
            i++
            if i%200 == 0 {
                fmt.Println(i,&quot; lines parsed&quot;)
            }
        }
        fmt.Println(&quot;finish dispatch &quot;, path)
    }
	done &lt;- true                 // notify main() of completion
}

func main() {
    path := os.Args
    if len(path) &lt; 2 {
        fmt.Println(&quot;Need Input Files&quot;)
        os.Exit(0)
    }
    channel := make(chan string)
    fmt.Println(&quot;before dispatcher&quot;)
    go dispatcher(path[1:],channel)
	&lt;-done                 // wait for dispatcher()
    fmt.Println(&quot;after dispatcher&quot;)
}

答案2

得分: 2

我修改了你的示例,使其在Go playground上运行,因为那里没有文件I/O;它会在通道上发送随机数。

Victor Deryagin的解释和他建议使用“done”通道是正确的。你遇到死锁的原因是你的goroutine在通道上发送数据,但没有人从中读取,所以程序在这一点上被卡住了。在上面的链接中,我添加了一个消费者goroutine。然后程序按预期并发运行。

请注意,要等待多个goroutine,使用sync.WaitGroup更清晰和更容易。

英文:

I modified your example to run on the Go playground where there's no file I/O; it sends random numbers on the channel instead.

@Victor Deryagin's explanation and his suggestion of using a "done" channel is correct. The reason you get a deadlock is that your goroutine sends on channel, but no one reads from it, so the program is stuck at this point. In the above link I added a consumer goroutine. The program then runs concurrently as intended.

Note that to wait for several goroutines, it's clearer and easier to use sync.WaitGroup.

答案3

得分: 1

在原始问题中有两个问题需要解决。

  1. 在发送完所有数据后,必须关闭通道。在laodData函数中,请在发送完所有数据后使用close(channel)
  2. sync.Waitgroup作为引用传递。你在以下函数的参数中将wg作为值发送... laodData和dispatcher函数。

修复这两个问题将解决死锁问题。你的代码死锁的原因如下:

  • 未关闭发送通道将导致下游通道等待时间过长。
  • sync.Waitgroup的参数作为值发送。应该将其作为引用发送,否则将创建一个新的对象副本。
英文:

Two issues needs to be fixed in the original question.

  1. You have to close the channel once you're done sending all the data. In func laodData, please use close(channel) post sending all data.
  2. Pass the sync.Waitgroup as a reference.you are sending wg as a value in the argument to the following functions... laodData and dispatcher functions.

Fixing these two issues will fix your problem of deadlock. The reasons for the deadlock in your code follow:

  • Leaving the sending channel unclosed will cause the downstream channel to wait for prolonged time.
  • sending the argument of sync.Waitgroup as a value . It should be sent as a reference otherwise it will create a new copy of the object which you are sending.

huangapple
  • 本文由 发表于 2012年11月22日 23:28:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/13515846.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定