英文:
GO language: fatal error: all goroutines are asleep - deadlock
问题
以下是翻译好的内容:
下面的代码在使用硬编码的JSON数据时运行良好,但在从文件中读取JSON数据时不起作用。在使用sync.WaitGroup
时,我遇到了fatal error: all goroutines are asleep - deadlock
错误。
使用硬编码的JSON数据的工作示例:
package main
import (
"bytes"
"fmt"
"os/exec"
"time"
)
func connect(host string) {
cmd := exec.Command("ssh", host, "uptime")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
fmt.Println(err)
}
fmt.Printf("%s: %q\n", host, out.String())
time.Sleep(time.Second * 2)
fmt.Printf("%s: DONE\n", host)
}
func listener(c chan string) {
for {
host := <-c
go connect(host)
}
}
func main() {
hosts := [2]string{"user1@111.79.154.111", "user2@111.79.190.222"}
var c chan string = make(chan string)
go listener(c)
for i := 0; i < len(hosts); i++ {
c <- hosts[i]
}
var input string
fmt.Scanln(&input)
}
输出:
user@user-VirtualBox:~/go$ go run channel.go
user1@111.79.154.111: " 09:46:40 up 86 days, 18:16, 0 users, load average: 5"
user2@111.79.190.222: " 09:46:40 up 86 days, 17:27, 1 user, load average: 9"
user1@111.79.154.111: DONE
user2@111.79.190.222: DONE
不起作用 - 使用读取JSON数据文件的示例:
package main
import (
"bytes"
"fmt"
"os/exec"
"time"
"encoding/json"
"os"
"sync"
)
func connect(host string) {
cmd := exec.Command("ssh", host, "uptime")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
fmt.Println(err)
}
fmt.Printf("%s: %q\n", host, out.String())
time.Sleep(time.Second * 2)
fmt.Printf("%s: DONE\n", host)
}
func listener(c chan string) {
for {
host := <-c
go connect(host)
}
}
type Content struct {
Username string `json:"username"`
Ip string `json:"ip"`
}
func main() {
var wg sync.WaitGroup
var source []Content
var hosts []string
data := json.NewDecoder(os.Stdin)
data.Decode(&source)
for _, value := range source {
hosts = append(hosts, value.Username + "@" + value.Ip)
}
var c chan string = make(chan string)
go listener(c)
for i := 0; i < len(hosts); i++ {
wg.Add(1)
c <- hosts[i]
defer wg.Done()
}
var input string
fmt.Scanln(&input)
wg.Wait()
}
输出:
user@user-VirtualBox:~/go$ go run deploy.go < hosts.txt
user1@111.79.154.111: " 09:46:40 up 86 days, 18:16, 0 users, load average: 5"
user2@111.79.190.222: " 09:46:40 up 86 days, 17:27, 1 user, load average: 9"
user1@111.79.154.111: DONE
user2@111.79.190.222: DONE
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc210000068)
/usr/lib/go/src/pkg/runtime/sema.goc:199 +0x30
sync.(*WaitGroup).Wait(0xc210047020)
/usr/lib/go/src/pkg/sync/waitgroup.go:127 +0x14b
main.main()
/home/user/go/deploy.go:64 +0x45a
goroutine 3 [chan receive]:
main.listener(0xc210038060)
/home/user/go/deploy.go:28 +0x30
created by main.main
/home/user/go/deploy.go:53 +0x30b
exit status 2
user@user-VirtualBox:~/go$
HOSTS.TXT:
[
{
"username":"user1",
"ip":"111.79.154.111"
},
{
"username":"user2",
"ip":"111.79.190.222"
}
]
英文:
Code below works fine with hard coded JSON data however doesn't work when I read JSON data from a file. I'm getting fatal error: all goroutines are asleep - deadlock
error when using sync.WaitGroup
.
WORKING EXAMPLE WITH HARD-CODED JSON DATA:
package main
import (
"bytes"
"fmt"
"os/exec"
"time"
)
func connect(host string) {
cmd := exec.Command("ssh", host, "uptime")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
fmt.Println(err)
}
fmt.Printf("%s: %q\n", host, out.String())
time.Sleep(time.Second * 2)
fmt.Printf("%s: DONE\n", host)
}
func listener(c chan string) {
for {
host := <-c
go connect(host)
}
}
func main() {
hosts := [2]string{"user1@111.79.154.111", "user2@111.79.190.222"}
var c chan string = make(chan string)
go listener(c)
for i := 0; i < len(hosts); i++ {
c <- hosts[i]
}
var input string
fmt.Scanln(&input)
}
OUTPUT:
user@user-VirtualBox:~/go$ go run channel.go
user1@111.79.154.111: " 09:46:40 up 86 days, 18:16, 0 users, load average: 5"
user2@111.79.190.222: " 09:46:40 up 86 days, 17:27, 1 user, load average: 9"
user1@111.79.154.111: DONE
user2@111.79.190.222: DONE
NOT WORKING - EXAMPLE WITH READING JSON DATA FILE:
package main
import (
"bytes"
"fmt"
"os/exec"
"time"
"encoding/json"
"os"
"sync"
)
func connect(host string) {
cmd := exec.Command("ssh", host, "uptime")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
fmt.Println(err)
}
fmt.Printf("%s: %q\n", host, out.String())
time.Sleep(time.Second * 2)
fmt.Printf("%s: DONE\n", host)
}
func listener(c chan string) {
for {
host := <-c
go connect(host)
}
}
type Content struct {
Username string `json:"username"`
Ip string `json:"ip"`
}
func main() {
var wg sync.WaitGroup
var source []Content
var hosts []string
data := json.NewDecoder(os.Stdin)
data.Decode(&source)
for _, value := range source {
hosts = append(hosts, value.Username + "@" + value.Ip)
}
var c chan string = make(chan string)
go listener(c)
for i := 0; i < len(hosts); i++ {
wg.Add(1)
c <- hosts[i]
defer wg.Done()
}
var input string
fmt.Scanln(&input)
wg.Wait()
}
OUTPUT
user@user-VirtualBox:~/go$ go run deploy.go < hosts.txt
user1@111.79.154.111: " 09:46:40 up 86 days, 18:16, 0 users, load average: 5"
user2@111.79.190.222: " 09:46:40 up 86 days, 17:27, 1 user, load average: 9"
user1@111.79.154.111 : DONE
user2@111.79.190.222: DONE
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc210000068)
/usr/lib/go/src/pkg/runtime/sema.goc:199 +0x30
sync.(*WaitGroup).Wait(0xc210047020)
/usr/lib/go/src/pkg/sync/waitgroup.go:127 +0x14b
main.main()
/home/user/go/deploy.go:64 +0x45a
goroutine 3 [chan receive]:
main.listener(0xc210038060)
/home/user/go/deploy.go:28 +0x30
created by main.main
/home/user/go/deploy.go:53 +0x30b
exit status 2
user@user-VirtualBox:~/go$
HOSTS.TXT
[
{
"username":"user1",
"ip":"111.79.154.111"
},
{
"username":"user2",
"ip":"111.79.190.222"
}
]
答案1
得分: 64
Go程序在主函数结束时结束。
根据语言规范:
程序的执行从初始化主包开始,然后调用main函数。当该函数调用返回时,程序退出。它不会等待其他(非主)goroutine完成。
因此,你需要等待goroutine完成。常见的解决方案是使用sync.WaitGroup对象。
同步goroutine的最简单的代码:
package main
import "fmt"
import "sync"
var wg sync.WaitGroup // 1
func routine() {
defer wg.Done() // 3
fmt.Println("routine finished")
}
func main() {
wg.Add(1) // 2
go routine() // *
wg.Wait() // 4
fmt.Println("main finished")
}
用于同步多个goroutine的代码:
package main
import "fmt"
import "sync"
var wg sync.WaitGroup // 1
func routine(i int) {
defer wg.Done() // 3
fmt.Printf("routine %v finished\n", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 2
go routine(i) // *
}
wg.Wait() // 4
fmt.Println("main finished")
}
WaitGroup的使用顺序如下:
- 声明全局变量。将其声明为全局变量是使其对所有函数和方法可见的最简单方法。
- 增加计数器。这必须在主goroutine中完成,因为不能保证新启动的goroutine会在4之前执行,这是由于内存模型的保证。
- 减少计数器。这必须在goroutine退出时完成。使用延迟调用,确保它将在函数结束时被调用,无论如何结束。
- 等待计数器达到0。这必须在主goroutine中完成,以防止程序退出。
* 实际参数在启动新的goroutine之前进行评估。因此,在wg.Add(1)
之前需要显式评估它们,以防止可能导致计数器增加的代码发生恐慌。
使用:
param := f(x)
wg.Add(1)
go g(param)
而不是:
wg.Add(1)
go g(f(x))
英文:
Go program ends when the main function ends.
From the language specification
> Program execution begins by initializing the main package and then invoking the function main. When that function invocation returns, the program exits. It does not wait for other (non-main) goroutines to complete.
Therefore, you need to wait for your goroutines to finish. The common solution for this is to use sync.WaitGroup object.
The simplest possible code to synchronize goroutine:
package main
import "fmt"
import "sync"
var wg sync.WaitGroup // 1
func routine() {
defer wg.Done() // 3
fmt.Println("routine finished")
}
func main() {
wg.Add(1) // 2
go routine() // *
wg.Wait() // 4
fmt.Println("main finished")
}
And for synchronizing multiple goroutines
package main
import "fmt"
import "sync"
var wg sync.WaitGroup // 1
func routine(i int) {
defer wg.Done() // 3
fmt.Printf("routine %v finished\n", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 2
go routine(i) // *
}
wg.Wait() // 4
fmt.Println("main finished")
}
WaitGroup usage in order of execution.
- Declaration of global variable. Making it global is the easiest way to make it visible to all functions and methods.
- Increasing the counter. This must be done in main goroutine because there is no guarantee that newly started goroutine will execute before 4 due to memory model guarantees.
- Decreasing the counter. This must be done at the exit of goroutine. Using deferred call, we make sure that it will be called whenever function ends no matter but no matter how it ends.
- Waiting for the counter to reach 0. This must be done in main goroutine to prevent program exit.
* The actual parameters are evaluated before starting new gouroutine. Thus it is needed to evaluate them explicitly before wg.Add(1)
so the possibly panicking code would not leave increased counter.
Use
param := f(x)
wg.Add(1)
go g(param)
instead of
wg.Add(1)
go g(f(x))
答案2
得分: 5
感谢Grzegorz Żur提供的非常好的详细解释。
我想指出的一件事是,通常需要进行线程处理的函数不会在main()
中,所以我们会有类似这样的代码:
package main
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"reflect"
"regexp"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup // 非常重要的全局声明,否则会出现“fatal error: all goroutines are asleep - deadlock!”的错误
func doSomething(arg1 arg1Type) {
// 治愈了癌症
}
func main() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randTime := r.Intn(10)
wg.Add(1)
go doSomething(randTime)
wg.Wait()
fmt.Println("等待所有线程完成")
}
我想指出的是,全局声明的wg
对于所有线程在main()
之前完成非常关键。
英文:
Thanks for the very nice and detailed explanation Grzegorz Żur.
One thing that I want to point it out that typically the func that needs to be threaded wont be in main()
, so we would have something like this:
package main
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"reflect"
"regexp"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup // VERY IMP to declare this globally, other wise one //would hit "fatal error: all goroutines are asleep - deadlock!"
func doSomething(arg1 arg1Type) {
// cured cancer
}
func main() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randTime := r.Intn(10)
wg.Add(1)
go doSomething(randTime)
wg.Wait()
fmt.Println("Waiting for all threads to finish")
}
The thing that I want to point it out is that global declaration of wg
is very crucial for all threads to finish before main()
答案3
得分: 1
package main
/*
模拟从线程发送消息进行处理,
并将响应(处理结果)发送回线程
*/
import (
"fmt"
"math/rand"
"time"
)
type (
TChans []chan TMsgRec
TMsgRec struct {
name string // 通道名称
rid int //-1 或者 TChans 中响应通道的索引
msg string // 消息
note string // 注释
}
TThRec struct { // 线程
name string
rid int // TChans 中响应通道的索引(或者为 -1)
job chan TMsgRec // 发送消息到接收器的通道
resp chan TMsgRec // 返回给线程的响应通道
}
)
func main() {
index := -1
Chans := make(TChans, 100)
index = NewChanIndex(&Chans)
Job := Chans[index] // 从线程发送消息到接收器的通道
index = NewChanIndex(&Chans) // 线程 "1th" 的响应通道索引
go ping(TThRec{name: "1th", job: Job, rid: index, resp: Chans[index]})
index = NewChanIndex(&Chans) // 线程 "2th" 的响应通道索引
go ping(TThRec{name: "2th", job: Job, rid: index, resp: Chans[index]})
Receiver(Job, Chans)
}
func Receiver(c chan TMsgRec, pChans TChans) {
var v TMsgRec
for {
select {
case v = <-c: // 接收消息
{
if v.rid > -1 {
//pChans[v.rid] <- TMsgRec{name: v.name, rid: -1, msg: fmt.Sprint(v.msg, ":receiver "), note: ""}
go worker(v, pChans[v.rid])
}
}
default:
{
//fmt.Println("receiver")
SleepM(2)
}
}
}
}
func worker(v TMsgRec, r chan TMsgRec) {
// 模拟 SQL 查询或其他处理
SleepM(rand.Intn(50))
v.msg = v.msg + ":worker"
r <- v
}
func waitResponse(d chan TMsgRec, pTimeout int) (bool, TMsgRec) {
var v TMsgRec
for {
select {
case v = <-d:
{
return true, v
}
case <-time.After(10 * time.Second):
{
return false, v
}
}
}
}
func ping(pParam TThRec) {
SleepM(10)
var v TMsgRec
ok := true
i := 0
for i < 500 {
if ok {
ok = false
pParam.job <- TMsgRec{name: pParam.name, rid: pParam.rid, msg: fmt.Sprint(i), note: ""}
i++
}
if pParam.rid > -1 {
if !ok {
ok, v = waitResponse(pParam.resp, 10)
if ok {
fmt.Println(v.name, v.msg)
SleepM(1)
} else {
fmt.Println(pParam.name, "响应超时")
}
}
} else {
SleepM(1)
}
}
fmt.Println(v.name, "-- 结束 --")
}
func NewChanIndex(pC *TChans) int {
for i, v := range *pC {
if v == nil {
(*pC)[i] = make(chan TMsgRec)
return i
}
}
return -1
}
func FreeRespChan(pC *TChans, pIndex int) {
if (*pC)[pIndex] != nil {
close((*pC)[pIndex]) // 关闭通道
(*pC)[pIndex] = nil
}
}
func SleepM(pMilliSec int) { // 休眠毫秒数
time.Sleep(time.Duration(pMilliSec) * time.Millisecond)
}
英文:
package main
/*
Simulation for sending messages from threads for processing,
and getting a response (processing result) to the thread
*/
import (
"fmt"
"math/rand"
"time"
)
type (
TChans []chan TMsgRec
TMsgRec struct {
name string //channel name
rid int //-1 or index of response channel in TChans
msg string // message
note string // comment
}
TThRec struct { // for thread
name string
rid int // index of response channel in TChans (or -1)
job chan TMsgRec // chanel for send message to Receiver
resp chan TMsgRec // response channel back to thread
}
)
func main() {
index := -1
Chans := make(TChans, 100)
index = NewChanIndex(&Chans)
Job := Chans[index] // channel for send message from threads to Receiver
index = NewChanIndex(&Chans) // channel index for response, for the thread "1th"
go ping(TThRec{name: "1th", job: Job, rid: index, resp: Chans[index]})
index = NewChanIndex(&Chans) // channel index for response, for the thread "2th"
go ping(TThRec{name: "2th", job: Job, rid: index, resp: Chans[index]})
Receiver(Job, Chans)
}
func Receiver(c chan TMsgRec, pChans TChans) {
var v TMsgRec
for {
select {
case v = <-c: // receive message
{
if v.rid > -1 {
//pChans[v.rid] <- TMsgRec{name: v.name, rid: -1, msg: fmt.Sprint(v.msg, ":receiver "), note: ""}
go worker(v, pChans[v.rid])
}
}
default:
{
//fmt.Println("receiver")
SleepM(2)
}
}
}
}
func worker(v TMsgRec, r chan TMsgRec) {
// simulation SQL query, or auther process
SleepM(rand.Intn(50))
v.msg = v.msg + ":worker"
r <- v
}
func waitResponse(d chan TMsgRec, pTimeout int) (bool, TMsgRec) {
var v TMsgRec
for {
select {
case v = <-d:
{
return true, v
}
case <-time.After(10 * time.Second):
{
return false, v
}
}
}
}
func ping(pParam TThRec) {
SleepM(10)
var v TMsgRec
ok := true
i := 0
for i < 500 {
if ok {
ok = false
pParam.job <- TMsgRec{name: pParam.name, rid: pParam.rid, msg: fmt.Sprint(i), note: ""}
i++
}
if pParam.rid > -1 {
if !ok {
ok, v = waitResponse(pParam.resp, 10)
if ok {
fmt.Println(v.name, v.msg)
SleepM(1)
} else {
fmt.Println(pParam.name, "response timeout")
}
}
} else {
SleepM(1)
}
}
fmt.Println(v.name, "-- end --")
}
func NewChanIndex(pC *TChans) int {
for i, v := range *pC {
if v == nil {
(*pC)[i] = make(chan TMsgRec)
return i
}
}
return -1
}
func FreeRespChan(pC *TChans, pIndex int) {
if (*pC)[pIndex] != nil {
close((*pC)[pIndex]) //close channel
(*pC)[pIndex] = nil
}
}
func SleepM(pMilliSec int) { // sleep millisecounds
time.Sleep(time.Duration(pMilliSec) * time.Millisecond)
}
答案4
得分: -2
请尝试以下代码片段:
package main
import (
"bytes"
"fmt"
"os/exec"
"time"
"sync"
)
func connect(host string, wg *sync.WaitGroup) {
defer wg.Done()
cmd := exec.Command("ssh", host, "uptime")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
fmt.Println(err)
}
fmt.Printf("%s: %q\n", host, out.String())
time.Sleep(time.Second * 2)
fmt.Printf("%s: DONE\n", host)
}
func listener(c chan string, wg *sync.WaitGroup) {
for {
host, ok := <-c
// check channel is closed or not
if !ok {
break
}
go connect(host, wg)
}
}
func main() {
var wg sync.WaitGroup
hosts := [2]string{"user1@111.79.154.111", "user2@111.79.190.222"}
var c chan string = make(chan string)
go listener(c, &wg)
for i := 0; i < len(hosts); i++ {
wg.Add(1)
c <- hosts[i]
}
close(c)
var input string
fmt.Scanln(&input)
wg.Wait()
}
希望对你有帮助!
英文:
try this code snippest
package main
import (
"bytes"
"fmt"
"os/exec"
"time"
"sync"
)
func connect(host string, wg *sync.WaitGroup) {
defer wg.Done()
cmd := exec.Command("ssh", host, "uptime")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
fmt.Println(err)
}
fmt.Printf("%s: %q\n", host, out.String())
time.Sleep(time.Second * 2)
fmt.Printf("%s: DONE\n", host)
}
func listener(c chan string,wg *sync.WaitGroup) {
for {
host,ok := <-c
// check channel is closed or not
if !ok{
break
}
go connect(host)
}
}
func main() {
var wg sync.WaitGroup
hosts := [2]string{"user1@111.79.154.111", "user2@111.79.190.222"}
var c chan string = make(chan string)
go listener(c)
for i := 0; i < len(hosts); i++ {
wg.Add(1)
c <- hosts[i]
}
close(c)
var input string
fmt.Scanln(&input)
wg.Wait()
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论