Go: 负的 WaitGroup 计数器

huangapple go评论84阅读模式
英文:

Go: negative WaitGroup counter

问题

我对Go语言还不太熟悉,但我可以帮你翻译这段代码。这段代码是一个使用goroutine运行几个WebSocket客户端的程序。当从WebSocket读取消息时出现错误时(请查看readHandler函数中的conn.ReadMessage()函数),程序似乎关闭了一个多余的线程(如果这个术语不正确,请原谅我),导致程序崩溃。你有什么解决这个问题的想法吗?非常感谢你花时间查看代码。提前谢谢!

package main

import (
	"context"
	"fmt"
	"os"
	"time"
	"os/signal"
	"syscall"
	"sync"
	"net/url"
	"github.com/gorilla/websocket"
	"strconv"
	"encoding/json"
	"log"
	"bytes"
	"compress/gzip"
	"io/ioutil"
)

// 结构体

type Ping struct {
	Ping int64 `json:"ping"`
}

type Pong struct {
	Pong int64 `json:"pong"`
}

type SubParams struct {
	Sub string `json:"sub"`
	ID  string `json:"id"`
}

func InitSub(subType string, pair string, i int) []byte {
	var idInt string = "id" + strconv.Itoa(i)
	subStr := "market." + pair + "." + subType
	sub := &SubParams{
		Sub: subStr,
		ID:  idInt,
	}

	out, err := json.MarshalIndent(sub, "", " ")
	if err != nil {
		log.Println(err);
	}
	//log.Println(string(out))
	return out
}

// 主函数

func main() {
	var server string = "api.huobi.pro"
	pairs := []string{"btcusdt", "ethusdt", "ltcusdt"}
	comms := make(chan os.Signal, 1)
	signal.Notify(comms, os.Interrupt, syscall.SIGTERM)

	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	var wg sync.WaitGroup

	for x, pair := range pairs {
		wg.Add(1)
		go control(server, "ws", pair, ctx, &wg, x+1)
	}

	<-comms
	cancel()
	wg.Wait()
}

func control(server string, path string, pair string, ctx context.Context, wg *sync.WaitGroup, i int) {
	fmt.Printf("Started control for %s\n", server)
	url := url.URL{
		Scheme: "wss",
		Host:   server,
		Path:   path,
	}

	fmt.Println(url.String())

	conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
	if err != nil {
		panic(err)
	}
	subscribe(conn, pair, i)
	defer conn.Close()

	var localwg sync.WaitGroup

	localwg.Add(1)
	go readHandler(ctx, conn, &localwg, server)

	<-ctx.Done()
	localwg.Wait()
	wg.Done()
	return
}

func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
	for {
		select {
		case <-ctx.Done():
			wg.Done()
			return
		default:
			_, p, err := conn.ReadMessage()
			if err != nil {
				wg.Done()
				fmt.Println(err)
			}
			r, err := gzip.NewReader(bytes.NewReader(p))
			if err == nil {
				result, err := ioutil.ReadAll(r)
				if err != nil {
					fmt.Println(err)
				}
				d := string(result)
				fmt.Println(d)

				var ping Ping
				json.Unmarshal([]byte(d), &ping)
				if ping.Ping > 0 {
					str := Pong{Pong: ping.Ping}
					msg, err := json.Marshal(str)
					if err == nil {
						fmt.Println(string(msg))
						conn.WriteMessage(websocket.TextMessage, []byte(msg))
					}
				}
			}
		}
	}
}

func subscribe(conn *websocket.Conn, pair string, id int) {
	sub := string(InitSub("trade.detail", pair, id))

	err := conn.WriteMessage(websocket.TextMessage, []byte(sub))
	if err != nil {
		panic(err)
	}
}

希望对你有帮助!如果你有任何其他问题,请随时问我。

英文:

I'm somewhat new to go and am reworking code that I found somewhere else to fit my needs. Because of that, I don't totally understand what is happening here, although I get the general idea.

I'm running a few websocket clients using go routines, but I'm getting an unexpected error that causes the program to crash. My program seems to close one too many threads (excuse me if this is the wrong terminology) when there is an error reading a message from the websocket (check the conn.ReadMessage() func in the readHandler func). Any ideas on how would I work around this issue? I would really appreciate anyone taking the time to look through it. Thanks in advance!

package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;os&quot;
&quot;time&quot;
&quot;os/signal&quot;
&quot;syscall&quot;
&quot;sync&quot;
&quot;net/url&quot;
&quot;github.com/gorilla/websocket&quot;
&quot;strconv&quot;
&quot;encoding/json&quot;
&quot;log&quot;
&quot;bytes&quot;
&quot;compress/gzip&quot;
&quot;io/ioutil&quot;
)
// Structs
type Ping struct {
Ping	int64	`json:&quot;ping&quot;`
}
type Pong struct {
Pong		int64		`json:&quot;pong&quot;`
}
type SubParams struct {
Sub 	    string 		    `json:&quot;sub&quot;`
ID		    string			`json:&quot;id&quot;`
}
func InitSub(subType string, pair string, i int) []byte {
var idInt string = &quot;id&quot; + strconv.Itoa(i)
subStr := &quot;market.&quot; + pair + &quot;.&quot; + subType
sub := &amp;SubParams{
Sub: subStr,
ID: idInt,
}
out, err := json.MarshalIndent(sub, &quot;&quot;, &quot; &quot;)
if err != nil {
log.Println(err);
}
//log.Println(string(out))
return out
}
// main func
func main() {
var server string = &quot;api.huobi.pro&quot;
pairs := []string{&quot;btcusdt&quot;, &quot;ethusdt&quot;, &quot;ltcusdt&quot;}
comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
for x, pair := range pairs {
wg.Add(1)
go control(server, &quot;ws&quot;, pair, ctx, &amp;wg, x+1)
}
&lt;-comms
cancel()
wg.Wait()
}
func control(server string, path string, pair string, ctx context.Context, wg *sync.WaitGroup, i int) {
fmt.Printf(&quot;Started control for %s\n&quot;, server)
url := url.URL {
Scheme: &quot;wss&quot;,
Host: server,
Path: path,
}
fmt.Println(url.String())
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
panic(err)
}
subscribe(conn, pair, i)
defer conn.Close()
var localwg sync.WaitGroup
localwg.Add(1)
go readHandler(ctx, conn, &amp;localwg, server)
&lt;- ctx.Done()
localwg.Wait()
wg.Done()
return
}
func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
for {
select {
case &lt;- ctx.Done():
wg.Done()
return
default:
_, p, err :=  conn.ReadMessage()
if err != nil {
wg.Done()
fmt.Println(err)
}
r, err := gzip.NewReader(bytes.NewReader(p))
if(err == nil) {
result, err := ioutil.ReadAll(r)
if(err != nil) {
fmt.Println(err)
}
d := string(result)
fmt.Println(d)
var ping Ping
json.Unmarshal([]byte(d), &amp;ping)
if (ping.Ping &gt; 0) {
str := Pong{Pong: ping.Ping}
msg, err := json.Marshal(str)
if (err == nil) {
fmt.Println(string(msg))
conn.WriteMessage(websocket.TextMessage, []byte(msg))
}
}
}
}
}
}
func subscribe(conn *websocket.Conn, pair string, id int) {
sub := string(InitSub(&quot;trade.detail&quot;, pair, id))
err := conn.WriteMessage(websocket.TextMessage, []byte(sub))
if err != nil {
panic(err)
}
}

答案1

得分: 1

  • 当连接失败时,跳出readHandler循环:

      _, p, err :=  conn.ReadMessage()
    if err != nil {
    wg.Done()
    fmt.Println(err)
    return // &lt;--- 添加这行代码
    }
    

    如果没有return语句,函数将在一个紧密的循环中读取错误,直到发生panic。

  • 在goroutine的开头使用defer wg.Done(),确保Done只被调用一次。

    func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
    defer wg.Done()
    for {
    select {
    case &lt;-ctx.Done():
    return
    default:
    _, p, err := conn.ReadMessage()
    if err != nil {
    fmt.Println(err)
    return
    }
    ...
    

    同样更新control函数。

  • 因为调用者不会与readHandler并发执行任何代码,所以在goroutine中运行readHandler没有任何价值。从readHandler中删除所有对wait group的引用,并直接调用该函数:将go readHandler(ctx, conn, &amp;localwg, server)改为readHandler(ctx, conn, server)

还有更多问题,但这些修改应该能让你继续前进。

英文:
  • Break out of the readHandler loop when the connection fails:

      _, p, err :=  conn.ReadMessage()
    if err != nil {
    wg.Done()
    fmt.Println(err)
    return // &lt;--- add this line
    }
    

    Without the return, the function spins in a tight loop reading errors until the panic.

  • Use defer wg.Done() at the beginning of the goroutine to ensure that Done is called exactly once.

    func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
    defer wg.Done()
    for {
    select {
    case &lt;-ctx.Done():
    return
    default:
    _, p, err := conn.ReadMessage()
    if err != nil {
    fmt.Println(err)
    return
    }
    ...
    

    Update the control function also.

  • Because the caller does not execute any code concurrently with readHander, there's no value in running readHandler is a goroutine. Remove all references to wait groups from readHandler and call the function directly: change go readHandler(ctx, conn, &amp;localwg, server) to readHandler(ctx, conn, server).

There are more issues, but this should move you further along.

huangapple
  • 本文由 发表于 2021年12月4日 17:04:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/70224300.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定