英文:
Go: negative WaitGroup counter
问题
我对Go语言还不太熟悉,但我可以帮你翻译这段代码。这段代码是一个使用goroutine运行几个WebSocket客户端的程序。当从WebSocket读取消息时出现错误时(请查看readHandler函数中的conn.ReadMessage()函数),程序似乎关闭了一个多余的线程(如果这个术语不正确,请原谅我),导致程序崩溃。你有什么解决这个问题的想法吗?非常感谢你花时间查看代码。提前谢谢!
package main
import (
"context"
"fmt"
"os"
"time"
"os/signal"
"syscall"
"sync"
"net/url"
"github.com/gorilla/websocket"
"strconv"
"encoding/json"
"log"
"bytes"
"compress/gzip"
"io/ioutil"
)
// 结构体
type Ping struct {
Ping int64 `json:"ping"`
}
type Pong struct {
Pong int64 `json:"pong"`
}
type SubParams struct {
Sub string `json:"sub"`
ID string `json:"id"`
}
func InitSub(subType string, pair string, i int) []byte {
var idInt string = "id" + strconv.Itoa(i)
subStr := "market." + pair + "." + subType
sub := &SubParams{
Sub: subStr,
ID: idInt,
}
out, err := json.MarshalIndent(sub, "", " ")
if err != nil {
log.Println(err);
}
//log.Println(string(out))
return out
}
// 主函数
func main() {
var server string = "api.huobi.pro"
pairs := []string{"btcusdt", "ethusdt", "ltcusdt"}
comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
for x, pair := range pairs {
wg.Add(1)
go control(server, "ws", pair, ctx, &wg, x+1)
}
<-comms
cancel()
wg.Wait()
}
func control(server string, path string, pair string, ctx context.Context, wg *sync.WaitGroup, i int) {
fmt.Printf("Started control for %s\n", server)
url := url.URL{
Scheme: "wss",
Host: server,
Path: path,
}
fmt.Println(url.String())
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
panic(err)
}
subscribe(conn, pair, i)
defer conn.Close()
var localwg sync.WaitGroup
localwg.Add(1)
go readHandler(ctx, conn, &localwg, server)
<-ctx.Done()
localwg.Wait()
wg.Done()
return
}
func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
for {
select {
case <-ctx.Done():
wg.Done()
return
default:
_, p, err := conn.ReadMessage()
if err != nil {
wg.Done()
fmt.Println(err)
}
r, err := gzip.NewReader(bytes.NewReader(p))
if err == nil {
result, err := ioutil.ReadAll(r)
if err != nil {
fmt.Println(err)
}
d := string(result)
fmt.Println(d)
var ping Ping
json.Unmarshal([]byte(d), &ping)
if ping.Ping > 0 {
str := Pong{Pong: ping.Ping}
msg, err := json.Marshal(str)
if err == nil {
fmt.Println(string(msg))
conn.WriteMessage(websocket.TextMessage, []byte(msg))
}
}
}
}
}
}
func subscribe(conn *websocket.Conn, pair string, id int) {
sub := string(InitSub("trade.detail", pair, id))
err := conn.WriteMessage(websocket.TextMessage, []byte(sub))
if err != nil {
panic(err)
}
}
希望对你有帮助!如果你有任何其他问题,请随时问我。
英文:
I'm somewhat new to go and am reworking code that I found somewhere else to fit my needs. Because of that, I don't totally understand what is happening here, although I get the general idea.
I'm running a few websocket clients using go routines, but I'm getting an unexpected error that causes the program to crash. My program seems to close one too many threads (excuse me if this is the wrong terminology) when there is an error reading a message from the websocket (check the conn.ReadMessage() func in the readHandler func). Any ideas on how would I work around this issue? I would really appreciate anyone taking the time to look through it. Thanks in advance!
package main
import (
"context"
"fmt"
"os"
"time"
"os/signal"
"syscall"
"sync"
"net/url"
"github.com/gorilla/websocket"
"strconv"
"encoding/json"
"log"
"bytes"
"compress/gzip"
"io/ioutil"
)
// Structs
type Ping struct {
Ping int64 `json:"ping"`
}
type Pong struct {
Pong int64 `json:"pong"`
}
type SubParams struct {
Sub string `json:"sub"`
ID string `json:"id"`
}
func InitSub(subType string, pair string, i int) []byte {
var idInt string = "id" + strconv.Itoa(i)
subStr := "market." + pair + "." + subType
sub := &SubParams{
Sub: subStr,
ID: idInt,
}
out, err := json.MarshalIndent(sub, "", " ")
if err != nil {
log.Println(err);
}
//log.Println(string(out))
return out
}
// main func
func main() {
var server string = "api.huobi.pro"
pairs := []string{"btcusdt", "ethusdt", "ltcusdt"}
comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
for x, pair := range pairs {
wg.Add(1)
go control(server, "ws", pair, ctx, &wg, x+1)
}
<-comms
cancel()
wg.Wait()
}
func control(server string, path string, pair string, ctx context.Context, wg *sync.WaitGroup, i int) {
fmt.Printf("Started control for %s\n", server)
url := url.URL {
Scheme: "wss",
Host: server,
Path: path,
}
fmt.Println(url.String())
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
panic(err)
}
subscribe(conn, pair, i)
defer conn.Close()
var localwg sync.WaitGroup
localwg.Add(1)
go readHandler(ctx, conn, &localwg, server)
<- ctx.Done()
localwg.Wait()
wg.Done()
return
}
func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
for {
select {
case <- ctx.Done():
wg.Done()
return
default:
_, p, err := conn.ReadMessage()
if err != nil {
wg.Done()
fmt.Println(err)
}
r, err := gzip.NewReader(bytes.NewReader(p))
if(err == nil) {
result, err := ioutil.ReadAll(r)
if(err != nil) {
fmt.Println(err)
}
d := string(result)
fmt.Println(d)
var ping Ping
json.Unmarshal([]byte(d), &ping)
if (ping.Ping > 0) {
str := Pong{Pong: ping.Ping}
msg, err := json.Marshal(str)
if (err == nil) {
fmt.Println(string(msg))
conn.WriteMessage(websocket.TextMessage, []byte(msg))
}
}
}
}
}
}
func subscribe(conn *websocket.Conn, pair string, id int) {
sub := string(InitSub("trade.detail", pair, id))
err := conn.WriteMessage(websocket.TextMessage, []byte(sub))
if err != nil {
panic(err)
}
}
答案1
得分: 1
-
当连接失败时,跳出
readHandler
循环:_, p, err := conn.ReadMessage() if err != nil { wg.Done() fmt.Println(err) return // <--- 添加这行代码 }
如果没有
return
语句,函数将在一个紧密的循环中读取错误,直到发生panic。 -
在goroutine的开头使用
defer wg.Done()
,确保Done只被调用一次。func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) { defer wg.Done() for { select { case <-ctx.Done(): return default: _, p, err := conn.ReadMessage() if err != nil { fmt.Println(err) return } ...
同样更新
control
函数。 -
因为调用者不会与
readHandler
并发执行任何代码,所以在goroutine中运行readHandler
没有任何价值。从readHandler
中删除所有对wait group的引用,并直接调用该函数:将go readHandler(ctx, conn, &localwg, server)
改为readHandler(ctx, conn, server)
。
还有更多问题,但这些修改应该能让你继续前进。
英文:
-
Break out of the
readHandler
loop when the connection fails:_, p, err := conn.ReadMessage() if err != nil { wg.Done() fmt.Println(err) return // <--- add this line }
Without the return, the function spins in a tight loop reading errors until the panic.
-
Use
defer wg.Done()
at the beginning of the goroutine to ensure that Done is called exactly once.func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) { defer wg.Done() for { select { case <-ctx.Done(): return default: _, p, err := conn.ReadMessage() if err != nil { fmt.Println(err) return } ...
Update the
control
function also. -
Because the caller does not execute any code concurrently with
readHander
, there's no value in runningreadHandler
is a goroutine. Remove all references to wait groups fromreadHandler
and call the function directly: changego readHandler(ctx, conn, &localwg, server)
toreadHandler(ctx, conn, server)
.
There are more issues, but this should move you further along.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论