使用golang读取多个WebSocket连接。

huangapple go评论90阅读模式
英文:

golang read numerous websockets

问题

在过去的几周里,我一直在 Stack Overflow 上潜水,寻找与读取多个 WebSockets 相关的信息。基本上,我有多个主机通过 WebSocket 发送消息,我需要将它们聚合起来。

到目前为止,我已经使用 Golang 实现了单个 WebSocket 连接的聚合。我也用 Python 实现了我想要的功能,但我真的很想在 Go 中实现这个功能!

我已经使用了 gorilla 的 WebSocket 示例以及其他一些示例,并且可以成功地在 Go 中读取一个 socket。然而,似乎 WebSocket 服务器并没有完全遵循典型的开发实践,比如在 JS 中使用 .forEach 或 .Each 方法会导致握手失败。

我实际上不需要向 socket 发送任何数据,我只需要连接并继续读取它,然后将这些数据聚合到一个单一的流中,以便进行后续操作。

以下是你提供的代码的翻译:

package main

import (
	"fmt"
	"golang.org/x/net/websocket"
	"log"
)

var url = "ws://10.0.1.19:5000/data/websocket"

func main() {
	ws, err := websocket.Dial(url, "", origin)
	if err != nil {
		log.Fatal(err)
	}

	var msg = make([]byte, 512)
	_, err = ws.Read(msg)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Receive: %s\n", msg)
}

更新(2016-03-19):

经过不断的更改和测试,使用 gorilla 和旧的 x/net/websocket 库,我发现不幸的是,我连接的 WebSocket 服务器没有正确遵循 gorilla 想要用于握手的标准。要么是我没有告诉 gorilla 如何正确连接,x/net/websocket 连接得很好;我只需指定 localhost/ 作为 origin,它似乎可以工作。我不知道如何告诉 gorilla 如何做同样的事情,以查看它是否以相同的方式工作。在 DefaultDialer.Dial() 中查找一些配置选项,但在我目前有限的 Go 知识中,我还没有找到一种利用它来实现我想要的功能的方法。

以下是更新后的代码(2016-03-19):

package main

import (
	"fmt"
	"golang.org/x/net/websocket"
	"time"
)

var origin = "http://localhost"

type url struct {
	host string
}

func processUrl(host string, messages chan []byte) {
	client, err := websocket.Dial(host, "", origin)
	if err != nil {
		// log.Printf("dial:", err)
	}
	defer client.Close()
	for {
		var msg = make([]byte, 512)
		_, err = client.Read(msg)
		if err != nil {
			// log.Fatal("read:", err)
			return
		}
		messages <- msg
	}
}

func main() {
	urls := []string{
		"ws://10.0.1.90:3000/data/websocket",
		"ws://10.0.2.90:3000/data/websocket",
		"ws://10.0.3.90:3000/data/websocket",
	}

	messages := make(chan []byte)

	for _, host := range urls {
		go processUrl(host, messages)
	}

	for msg := range messages {
		fmt.Printf("%d %s\n", time.Now().Unix(), msg)
	}
}

关于 IOWait 问题:

我遇到的一个问题是 IOWait 错误。我在一夜之间使用了 10 个 WebSocket 运行二进制文件,没有任何问题。我对需要运行的 488 个 WebSocket 运行了该二进制文件,它在 2 分钟后出现了 IOWait 错误。我看到的一些常见错误如下:

goroutine 72 [IO wait]:
net.runtime_pollWait(0x7f356149b208, 0x72, 0x0)
	/usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(*pollDesc).Wait(0xc20804e610, 0x72, 0x0, 0x0)
	/usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
net.(*pollDesc).WaitRead(0xc20804e610, 0x0, 0x0)
	/usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
net.(*netFD).Read(0xc20804e5b0, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
	/usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
net.(*conn).Read(0xc20803a150, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
	/usr/lib/go/src/pkg/net/net.go:122 +0xe7
bufio.(*Reader).fill(0xc208005140)
	/usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
bufio.(*Reader).ReadByte(0xc208005140, 0xc2080f22d0, 0x0, 0x0)
	/usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005140, 0x7f356149b908, 0xc2080f22d0, 0x0, 0x0)
	/home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
golang.org/x/net/websocket.(*Conn).Read(0xc2080d7050, 0xc2080f4c00, 0x200, 0x200, 0x0, 0x0, 0x0)
	/home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
main.processUrl(0x705010, 0x26, 0xc208004180)
	/home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
created by main.main
	/home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

goroutine 73 [IO wait, 2 minutes]:
net.runtime_pollWait(0x7f356149b158, 0x72, 0x0)
	/usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(*pollDesc).Wait(0xc20804e760, 0x72, 0x0, 0x0)
	/usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
net.(*pollDesc).WaitRead(0xc20804e760, 0x0, 0x0)
	/usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
net.(*netFD).Read(0xc20804e700, 0xc208015000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
	/usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
net.(*conn).Read(0xc20803a018, 0xc208015000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
	/usr/lib/go/src/pkg/net/net.go:122 +0xe7
bufio.(*Reader).fill(0xc2080042a0)
	/usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
bufio.(*Reader).ReadByte(0xc2080042a0, 0x67d6e0, 0x0, 0x0)
	/usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc2080042a0, 0x7f356149b908, 0xc2080196d0, 0x0, 0x0)
	/home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
golang.org/x/net/websocket.(*Conn).Read(0xc208024240, 0xc208080000, 0x200, 0x200, 0x0, 0x0, 0x0)
	/home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
main.processUrl(0x705190, 0x25, 0xc208004180)
	/home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
created by main.main
	/home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

goroutine 74 [IO wait]:
net.runtime_pollWait(0x7f356149b0a8, 0x72, 0x0)
	/usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(*pollDesc).Wait(0xc20804e8b0, 0x72, 0x0, 0x0)
	/usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
net.(*pollDesc).WaitRead(0xc20804e8b0, 0x0, 0x0)
	/usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
net.(*netFD).Read(0xc20804e850, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
	/usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
net.(*conn).Read(0xc20803a160, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
	/usr/lib/go/src/pkg/net/net.go:122 +0xe7
bufio.(*Reader).fill(0xc208005200)
	/usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
bufio.(*Reader).ReadByte(0xc208005200, 0xc2080f2320, 0x0, 0x0)
	/usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005200, 0x7f356149b908, 0xc2080f2320, 0x0, 0x0)
	/home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
golang.org/x/net/websocket.(*Conn).Read(0xc2080d70e0, 0xc2080f4e00, 0x200, 0x200, 0x0, 0x0, 0x0)
	/home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
main.processUrl(0x7052d0, 0x27, 0xc208004180)
	/home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
created by main.main
	/home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

关于这个 IOWait 错误的原因,我可以想象有以下几种可能:(a) 无法建立连接,协程一直保持打开状态,最终抛出错误;(b) 可能我运行的协程太多了?我尝试过 10、20、50 甚至 400+,其中一个版本指定 10 个协程可以工作(只要所有 10 个都有响应),而另一个版本中 10 个协程不起作用,因为有一个主机没有响应。

关于将 []byte 转换为文本/字符串并避免在日志文件中附加二进制数据,你可以使用 string() 函数将字节切片转换为字符串,然后将其写入 stdout。

关于使用通道,你可以使用结构体定义通道,并使通道包含你想要的内容:时间戳、主机和消息。然后在 process 函数中将这些信息一起发送到通道中,以便在日志中记录主机和消息。

希望这些回答对你有帮助!如果你有其他问题,请随时提问。

英文:

Over the last few weeks I have been lurking around Stack Overflow looking for information related to reading numerous websockets. Basically, I have numerous hosts that all emit messages over a websocket and I need to aggregate them.

I've accomplished this with Golang thus far for a single websocket connection. I've accomplished what I am looking for using Python as well, but I would really like to do this in Go!

I have used gorilla's websocket example as well a few others and can successfully read a socket in Go. However, it seems the websocket server does not completely conform to the typical development practice(s) as using a method such as .forEach or .Each in JS; causes handshake failures.

Original Version
<pre><code>

package main
import (
&quot;fmt&quot;
&quot;golang.org/x/net/websocket&quot;
&quot;log&quot;
)
var url = &quot;ws://10.0.1.19:5000/data/websocket&quot;
func main() {
ws, err := websocket.Dial(url, &quot;&quot;, origin)
if err != nil {
log.Fatal(err)
}
var msg = make([]byte, 512)
_, err = ws.Read(msg)
if err != nil {
log.Fatal(err)
}
fmt.Printf(&quot;Receive: %s\n&quot;, msg)
}

</code></pre>

I dont actually need to send any data to the socket, I simply need to connect and continue to read it and then I'll aggregate that data into a single stream for performing later operations.


Update (2016-03-19)

After continuous changes and testing with both the gorilla and the old x/net/websocket library, I found that unfortunately it seems the websocket server(s) I am connecting to do not properly adhere to the standard that gorilla wants to use for the handshake. Either that or I am not telling gorilla how to properly connect. x/net/websocket connects just fine; I just specify localhost/ as the origin and it seems to work. I am unsure how to tell gorilla how to do the same to see if it works the same way. Digging through the DefaultDialer.Dial() has some configuration options but in my modest Go knowledge now I have not found a way to leverage it to do what I am trying to do.

Current Version (2016-03-19)

package main
import (
&quot;fmt&quot;
&quot;golang.org/x/net/websocket&quot;
// &quot;log&quot;
&quot;time&quot;
)
var origin = &quot;http://localhost&quot;
type url struct {
host string
}
func processUrl(host string, messages chan []byte) {
client, err := websocket.Dial(host, &quot;&quot;, origin)
if err != nil {
// log.Printf(&quot;dial:&quot;, err)
}
// Clean up on exit from this goroutine
defer client.Close()
// Loop reading messages. Send each message to the channel.
for {
var msg = make([]byte, 512)
_, err = client.Read(msg)
if err != nil {
// log.Fatal(&quot;read:&quot;, err)
return
}
messages &lt;- msg
}
}
func main() {
// Create an arry of hosts to read websockets from
urls := []string{
&quot;ws://10.0.1.90:3000/data/websocket&quot;,
&quot;ws://10.0.2.90:3000/data/websocket&quot;,
&quot;ws://10.0.3.90:3000/data/websocket&quot;,
}
// Create channel to receive messages from all connections
messages := make(chan []byte)
// Run a goroutine for each URL that you want to dial.
for _, host := range urls {
go processUrl(host, messages)
}
// Print all messages received from the goroutines.
for msg := range messages {
fmt.Printf(&quot;%d %s\n&quot;, time.Now().Unix(), msg)
}
}

RESPONSE (message from ws):
<pre><code>
{
"src_city":"Wayne",
"dest_city":"Amsterdam",
"src_country":"US",
"dest_country":"NL",
"type":"view"
}
</code></pre>

IOWait Issue(s)

One issue I am running in to is IOWait errors. I ran the binary overnight against 10 websockets without any issue. I ran it against al 488 that I need to run it against and it hit IOWait 2 minutes, etc. Some of the routine errors I see:

<pre><code>

goroutine 72 [IO wait]:
net.runtime_pollWait(0x7f356149b208, 0x72, 0x0)
/usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(*pollDesc).Wait(0xc20804e610, 0x72, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
net.(*pollDesc).WaitRead(0xc20804e610, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
net.(*netFD).Read(0xc20804e5b0, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
/usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
net.(*conn).Read(0xc20803a150, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/lib/go/src/pkg/net/net.go:122 +0xe7
bufio.(*Reader).fill(0xc208005140)
/usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
bufio.(*Reader).ReadByte(0xc208005140, 0xc2080f22d0, 0x0, 0x0)
/usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005140, 0x7f356149b908, 0xc2080f22d0, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
golang.org/x/net/websocket.(*Conn).Read(0xc2080d7050, 0xc2080f4c00, 0x200, 0x200, 0x0, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
main.processUrl(0x705010, 0x26, 0xc208004180)
/home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
created by main.main
/home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126
goroutine 73 [IO wait, 2 minutes]:
net.runtime_pollWait(0x7f356149b158, 0x72, 0x0)
/usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(*pollDesc).Wait(0xc20804e760, 0x72, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
net.(*pollDesc).WaitRead(0xc20804e760, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
net.(*netFD).Read(0xc20804e700, 0xc208015000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
/usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
net.(*conn).Read(0xc20803a018, 0xc208015000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/lib/go/src/pkg/net/net.go:122 +0xe7
bufio.(*Reader).fill(0xc2080042a0)
/usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
bufio.(*Reader).ReadByte(0xc2080042a0, 0x67d6e0, 0x0, 0x0)
/usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc2080042a0, 0x7f356149b908, 0xc2080196d0, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
golang.org/x/net/websocket.(*Conn).Read(0xc208024240, 0xc208080000, 0x200, 0x200, 0x0, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
main.processUrl(0x705190, 0x25, 0xc208004180)
/home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
created by main.main
/home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126
goroutine 74 [IO wait]:
net.runtime_pollWait(0x7f356149b0a8, 0x72, 0x0)
/usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(*pollDesc).Wait(0xc20804e8b0, 0x72, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
net.(*pollDesc).WaitRead(0xc20804e8b0, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
net.(*netFD).Read(0xc20804e850, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
/usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
net.(*conn).Read(0xc20803a160, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/lib/go/src/pkg/net/net.go:122 +0xe7
bufio.(*Reader).fill(0xc208005200)
/usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
bufio.(*Reader).ReadByte(0xc208005200, 0xc2080f2320, 0x0, 0x0)
/usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005200, 0x7f356149b908, 0xc2080f2320, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
golang.org/x/net/websocket.(*Conn).Read(0xc2080d70e0, 0xc2080f4e00, 0x200, 0x200, 0x0, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
main.processUrl(0x7052d0, 0x27, 0xc208004180)
/home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
created by main.main
/home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

</code></pre>

I have another binary that attempts to make an initial connection to each web socket address so that I can make sure it is reachable, however that is another issue all together. My outstanding questions are:

  1. How to use Gorilla's websocket implementation to read from the websocket being served by SockJS as I am doing with x/net/websocket's implementation.
  2. What could or is the likely cause for the IOWait issue(s) I am seeing?
  3. Because I am using []byte as the response, my log file (im just piping stdout to a file) contains lines that have binary data appended to the end. How should/could I go about converting from the byte slice to just writing it as text/string to stdout as to avoid that?
  4. Using the chan, how best is it to pass an additional argument from my process function to return both the host of the websocket the message was returned from to the channel so I can log both the host and the message? Should I be using a struct to define the channel and have the channel contain what I desire: timestamp, host, message?

For the issue of the IOWait errors, I cannot only imagine (a) a connection cannot be established and the routine is holding it open and eventually throwing an error; (b) it is possible I have too many routines running? I have tried with 10, 20, 50, and all 400+ where even a version specifying 10 works (as long as all 10 are responding) and a version where 10 does not work as there is a host not responding.

I will likely have subsequent questions, but I appreciate the insight and help. The channel suggestion definitely got me going. I've used them once before but don't always understand how best to implement them. My other project leverages channels and wait groups (wg), but I honestly don't understand the point of one over the other..

Thanks again, your thoughts and suggestions have been wonderful!

apologies for the weird syntax in this post, I cannot seem to get the editor to remove some of the empty lines around my code elements

答案1

得分: 9

启动一个goroutine来读取每个连接。将接收到的消息发送到一个通道。从该通道接收消息以获取所有连接的消息。

// 创建一个通道,用于接收所有连接的消息
messages := make(chan []byte)

// 为要拨号的每个URL运行一个goroutine。
for _, u := range urls {
go func(u string) {
// 使用Gorilla包进行拨号。x/net/websocket包存在问题。
c, _, err := websocket.DefaultDialer.Dial(u, http.Header{"Origin": {origin}})
if err != nil {
log.Fatal("dial:", err)
}
// 在退出此goroutine时进行清理
defer c.Close()
// 循环读取消息。将每个消息发送到通道。
for {
_, m, err := c.ReadMessage()
if err != nil {
log.Fatal("read:", err)
return
}
messages <- m
}
}(u)
}

// 打印从goroutine接收到的所有消息。
for m := range messages {
fmt.Printf("%s\n", m)
}

英文:

Start a goroutine to read each connection. Send received messages to a channel. Receive from that channel to get messages from all connections.

// Create channel to receive messages from all connections
messages := make(chan []byte)
// Run a goroutine for each URL that you want to dial.
for _, u := range urls {
go func(u string) {
// Dial with Gorilla package. The x/net/websocket package has issues.
c, _, err := websocket.DefaultDialer.Dial(u, http.Header{&quot;Origin&quot;:{origin}})
if err != nil {
log.Fatal(&quot;dial:&quot;, err)
}
// Clean up on exit from this goroutine
defer c.Close()
// Loop reading messages. Send each message to the channel.
for {
_, m, err := c.ReadMessage()
if err != nil {
log.Fatal(&quot;read:&quot;, err)
return
}
messages &lt;- m
}
}(u)
}
// Print all messages received from the goroutines.
for m := range messages {
fmt.Printf(&quot;%s\n&quot;, m)
}

huangapple
  • 本文由 发表于 2016年3月17日 22:04:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/36062878.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定