如何在同一个结构体中使用多个通道?

huangapple go评论87阅读模式
英文:

How can I use multiple channels in the same struct

问题

在我的代码中,我想要做以下操作:

  1. 从输入中接收数据,作为eventmessage
  2. 根据event对接收到的数据进行格式化

我考虑使用类似面向对象编程(OOP)的方法,但似乎出现了问题。

我写的代码如下:

// 定义包含通道的结构体
type sseData struct {
	event, message string
}
type DataPasser struct {
	data       chan sseData
	logs       chan string
	connection chan struct{} // 用于控制最大允许的客户端连接数
}

// 定义结构体的接收者,根据输入数据进行格式化
func (p *DataPasser) Format() {
	data := <-p.data
	switch {
	case len(data.event) > 0:
		p.logs <- fmt.Sprintf("event: %v\ndata: %v\n\n", data.event, data.message)
	case len(data.event) == 0:
		p.logs <- fmt.Sprintf("data: %v\n\n", data.message)
	}

}

然后我有以下代码:

func (p *DataPasser) HandleSignal(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	setupCORS(&w, r)

	fmt.Println("Client connected from IP:", r.RemoteAddr)

	p.connection <- struct{}{}
	flusher, ok := w.(http.Flusher)
	if !ok {
		http.Error(w, "Internal error", 500)
		return
	}

	fmt.Fprint(w, "event: notification\ndata: Connection to WhatsApp server ...\n\n")
	flusher.Flush()

	// 连接到WhatsApp客户端
	go Connect()

	// 准备数据解析器`p`通过其sseData通道接收数据
	go p.Format()


	for {
		select {
		case c := <-p.logs:
			fmt.Fprint(w, c)
			flusher.Flush()
		case <-r.Context().Done():
			<-p.connection
			fmt.Println("Connection closed")
			return
		}
	}
}

func setupCORS(w *http.ResponseWriter, req *http.Request) {
	(*w).Header().Set("Cache-Control", "no-cache")
	(*w).Header().Set("Access-Control-Allow-Origin", "*")
	(*w).Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
	(*w).Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization")
}

在connect函数中,我有以下代码:

package main

import (
	"context"
	"fmt"
)

var err error

func Connect() {
	fmt.Println("Connected")
	if client.IsConnected() {
		client.Disconnect()
		passer.data <- sseData{
			event:   "notification",
			message: "Reconnecting to WhatsApp server ...",
		}
	}

	if client.Store.ID == nil {
		// 未存储ID,新登录
	GetQR:
		qrChan, _ := client.GetQRChannel(context.Background())
		err = client.Connect()
		if err != nil {
			//	panic(err)
			//passer.logs <- "Can not connect with WhatApp server, try again later"
			passer.data <- sseData{
				event:   "notification",
				message: "Can not connect with WhatApp server, try again later",
			}
			fmt.Println("Sorry", err)
		}

		for evt := range qrChan {
			switch evt.Event {
			case "success":
				{
					//passer.logs <- "success"
					passer.data <- sseData{
						event:   "notification",
						message: "success",
					}
					fmt.Println("Login event: success")
				}
			case "timeout":
				{
					//passer.logs <- "timeout/Refreshing"
					passer.data <- sseData{
						event:   "notification",
						message: "timeout/Refreshing",
					}
					fmt.Println("Login event: timeout")
					goto GetQR
				}
			case "code":
				{
					fmt.Println("new code recieved")
					fmt.Println(evt.Code)
					//passer.logs <- evt.Code
					passer.data <- sseData{
						event:   "qrCode",
						message: evt.Code,
					}
				}
			}
		}
	} else {
		// 已登录,只需连接
		//passer.logs <- "Already logged"
		passer.data <- sseData{
			event:   "notification",
			message: "Already logged in",
		}
		fmt.Println("Already logged")
		err = client.Connect()
		if err != nil {
			panic(err)
		}
	}
	/*
		c := make(chan os.Signal, 1)
		signal.Notify(c, os.Interrupt, syscall.SIGTERM)

		<-c
		passer.data <- sseData{
			event:   "notification",
			message: "Server got shut down",
		}
	*/
}

在主文件中,我有以下代码:

var passer *DataPasser

const maxClients = 1

func init() {
	passer = &DataPasser{
		data:       make(chan sseData),
		logs:       make(chan string),
		connection: make(chan struct{}, maxClients),
	}
}

func main() {

	http.HandleFunc("/sse", passer.HandleSignal)
	go http.ListenAndServe(":1234", nil)

	// 监听Ctrl+C(您也可以执行其他操作以防止程序退出)
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)

	<-c
	if client.IsConnected() {
		client.Disconnect()
	}
}

问题是服务器只能正确发送第一个SSE,然后似乎在通道通信的某个地方停顿了。

有什么想法吗?

英文:

In my code I want to do the following:

  1. Recieve data from inputs as event and message
  2. Format the received data based on the event

I thought to use something close to the method in the OOP, but it looks I meesed things up.

What I wrote is:

// Define the structs that contains the channels
type sseData struct {
	event, message string
}
type DataPasser struct {
	data       chan sseData
	logs       chan string
	connection chan struct{} // To control maximum allowed clients connections
}

// DEfine the struct&#39;s reciever that do the formating based on the input date
func (p *DataPasser) Format() {
	data := &lt;-p.data
	switch {
	case len(data.event) &gt; 0:
		p.logs &lt;- fmt.Sprintf(&quot;event: %v\ndata: %v\n\n&quot;, data.event, data.message)
	case len(data.event) == 0:
		p.logs &lt;- fmt.Sprintf(&quot;data: %v\n\n&quot;, data.message)
	}

}

Then I've the below:

func (p *DataPasser) HandleSignal(w http.ResponseWriter, r *http.Request) {
	w.Header().Set(&quot;Content-Type&quot;, &quot;text/event-stream; charset=utf-8&quot;)
	w.Header().Set(&quot;Cache-Control&quot;, &quot;no-cache&quot;)
	w.Header().Set(&quot;Connection&quot;, &quot;keep-alive&quot;)
	setupCORS(&amp;w, r)

	fmt.Println(&quot;Client connected from IP:&quot;, r.RemoteAddr)

	p.connection &lt;- struct{}{}
	flusher, ok := w.(http.Flusher)
	if !ok {
		http.Error(w, &quot;Internal error&quot;, 500)
		return
	}

	fmt.Fprint(w, &quot;event: notification\ndata: Connection to WhatsApp server ...\n\n&quot;)
	flusher.Flush()

	// Connect to the WhatsApp client
	go Connect()

	// Prepare dataParser `p` to recieve data through its sseData channel
	go p.Format()


	for {
		select {
		case c := &lt;-p.logs:
			fmt.Fprint(w, c)
			flusher.Flush()
		case &lt;-r.Context().Done():
			&lt;-p.connection
			fmt.Println(&quot;Connection closed&quot;)
			return
		}
	}
}

func setupCORS(w *http.ResponseWriter, req *http.Request) {
	(*w).Header().Set(&quot;Cache-Control&quot;, &quot;no-cache&quot;)
	(*w).Header().Set(&quot;Access-Control-Allow-Origin&quot;, &quot;*&quot;)
	(*w).Header().Set(&quot;Access-Control-Allow-Methods&quot;, &quot;POST, GET, OPTIONS, PUT, DELETE&quot;)
	(*w).Header().Set(&quot;Access-Control-Allow-Headers&quot;, &quot;Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization&quot;)
}

Anf in the connect function, I've the:

package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
)

var err error

func Connect() {
	fmt.Println(&quot;Connected&quot;)
	if client.IsConnected() {
		client.Disconnect()
		passer.data &lt;- sseData{
			event:   &quot;notification&quot;,
			message: &quot;Reconnecting to WhatsApp server ...&quot;,
		}
	}

	if client.Store.ID == nil {
		// No ID stored, new login
	GetQR:
		qrChan, _ := client.GetQRChannel(context.Background())
		err = client.Connect()
		if err != nil {
			//	panic(err)
			//passer.logs &lt;- &quot;Can not connect with WhatApp server, try again later&quot;
			passer.data &lt;- sseData{
				event:   &quot;notification&quot;,
				message: &quot;Can not connect with WhatApp server, try again later&quot;,
			}
			fmt.Println(&quot;Sorry&quot;, err)
		}

		for evt := range qrChan {
			switch evt.Event {
			case &quot;success&quot;:
				{
					//passer.logs &lt;- &quot;success&quot;
					passer.data &lt;- sseData{
						event:   &quot;notification&quot;,
						message: &quot;success&quot;,
					}
					fmt.Println(&quot;Login event: success&quot;)
				}
			case &quot;timeout&quot;:
				{
					//passer.logs &lt;- &quot;timeout/Refreshing&quot;
					passer.data &lt;- sseData{
						event:   &quot;notification&quot;,
						message: &quot;timeout/Refreshing&quot;,
					}
					fmt.Println(&quot;Login event: timeout&quot;)
					goto GetQR
				}
			case &quot;code&quot;:
				{
					fmt.Println(&quot;new code recieved&quot;)
					fmt.Println(evt.Code)
					//passer.logs &lt;- evt.Code
					passer.data &lt;- sseData{
						event:   &quot;qrCode&quot;,
						message: evt.Code,
					}
				}
			}
		}
	} else {
		// Already logged in, just connect
		//passer.logs &lt;- &quot;Already logged&quot;
		passer.data &lt;- sseData{
			event:   &quot;notification&quot;,
			message: &quot;Already logged in&quot;,
		}
		fmt.Println(&quot;Already logged&quot;)
		err = client.Connect()
		if err != nil {
			panic(err)
		}
	}
	/*
		c := make(chan os.Signal, 1)
		signal.Notify(c, os.Interrupt, syscall.SIGTERM)

		&lt;-c
		passer.data &lt;- sseData{
			event:   &quot;notification&quot;,
			message: &quot;Server got shut down&quot;,
		}
	*/
}

In the main file, I do have:

var passer *DataPasser

const maxClients = 1

func init() {
	passer = &amp;DataPasser{
		data:       make(chan sseData),
		logs:       make(chan string),
		connection: make(chan struct{}, maxClients),
	}
}

func main() {

	http.HandleFunc(&quot;/sse&quot;, passer.HandleSignal)
	go http.ListenAndServe(&quot;:1234&quot;, nil)

	// Listen to Ctrl+C (you can also do something else that prevents the program from exiting)
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)

	&lt;-c
	if client.IsConnected() {
		client.Disconnect()
	}
}

What is happeneing is that the server sending the first SSE correctly only, and it looks it hangs somewhere in the channel communication.

Any thought?

答案1

得分: 0

我通过编写以下代码解决了这个问题:

	// 连接到 WhatsApp 客户端
	go Connect()

	for {
		select {
		case data := <-p.data:
			fmt.Println("接收到数据")

			switch {
			case len(data.event) > 0:
				fmt.Fprintf(w, "事件: %v\n数据: %v\n\n", data.event, data.message)
			case len(data.event) == 0:
				fmt.Fprintf(w, "数据: %v\n\n", data.message)
			}
			flusher.Flush()
		case <-r.Context().Done():
			<-p.connection
			fmt.Println("连接已关闭")
			return
		}
	}

但是我仍然对将操作拆分并使用接收器感兴趣,我不能将其作为答案接受,因为这只是解决问题的方法,而不是对问题的回答。你有什么想法吗?

英文:

I solved it by writting:

	// Connect to the WhatsApp client
	go Connect()

	for {
		select {
		case data := &lt;-p.data:
			fmt.Println(&quot;recieved&quot;)

			switch {
			case len(data.event) &gt; 0:
				fmt.Fprintf(w, &quot;event: %v\ndata: %v\n\n&quot;, data.event, data.message)
			case len(data.event) == 0:
				fmt.Fprintf(w, &quot;data: %v\n\n&quot;, data.message)
			}
			flusher.Flush()
		case &lt;-r.Context().Done():
			&lt;-p.connection
			fmt.Println(&quot;Connection closed&quot;)
			return
		}
	}

But I still interested in splitting the action and using the reciever, I can not accept this as an answer, as it is a solution for the problem, but not an answer to the question.
Any thought?

答案2

得分: 0

当你从go connect()例程发送数据到passer.data时,例程go p.Format()没有在监听。因为你在parser.data上使用了一个无缓冲的通道,但没有接收者在监听,所以你的代码被阻塞了。要么为parser.data使用一个带缓冲的通道,要么确保在实际发送数据到通道之前,你的例程已经开始并在监听来自数据通道的传入消息。在你的情况下,我猜在Connect例程之前启动Format例程应该就足够了。

英文:

When you are sending data to passer.data from the go connect() routine, the routine go p.Format() is not listening. Because you are using an unbuffered channel for parser.data but no receiver is listening, your code is stuck. Either use a buffered channel for parser.data or make sure that your routine listening for incoming message from the data chan is started and listening before actually sending data to the channel. In your case, I guess starting the Format routine before the Connect routine should be sufficient.

huangapple
  • 本文由 发表于 2022年7月13日 15:04:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/72962158.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定