Go concurrent TCP server hangs when JSON is sent as the response but works with plain string

huangapple go评论77阅读模式
英文:

Go concurrent TCP server hangs when JSON is sent as the response but works with plain string

问题

我正在尝试在Go语言中实现一个并发的TCP服务器,并在linode上找到了一篇很好的解释性文章,其中清楚地解释了使用示例代码。下面是客户端和服务器的示例代码片段。

并发的TCP服务器,为每个TCP客户端创建一个新的go协程。

package main

import (
        "bufio"
        "fmt"
        "net"
        "os"
        "strconv"
        "strings"
)

var count = 0

func handleConnection(c net.Conn) {
        fmt.Print(".")
        for {
                netData, err := bufio.NewReader(c).ReadString('\n')
                if err != nil {
                        fmt.Println(err)
                        return
                }

                temp := strings.TrimSpace(string(netData))
                if temp == "STOP" {
                        break
                }
                fmt.Println(temp)
                counter := strconv.Itoa(count) + "\n"
                c.Write([]byte(string(counter)))
        }
        c.Close()
}

func main() {
        arguments := os.Args
        if len(arguments) == 1 {
                fmt.Println("Please provide a port number!")
                return
        }

        PORT := ":" + arguments[1]
        l, err := net.Listen("tcp4", PORT)
        if err != nil {
                fmt.Println(err)
                return
        }
        defer l.Close()

        for {
                c, err := l.Accept()
                if err != nil {
                        fmt.Println(err)
                        return
                }
                go handleConnection(c)
                count++
        }
}

TCP客户端代码片段

package main

import (
        "bufio"
        "fmt"
        "net"
        "os"
        "strings"
)

func main() {
        arguments := os.Args
        if len(arguments) == 1 {
                fmt.Println("Please provide host:port.")
                return
        }

        CONNECT := arguments[1]
        c, err := net.Dial("tcp", CONNECT)
        if err != nil {
                fmt.Println(err)
                return
        }

        for {
                reader := bufio.NewReader(os.Stdin)
                fmt.Print(">> ")
                text, _ := reader.ReadString('\n')
                fmt.Fprintf(c, text+"\n")

                message, _ := bufio.NewReader(c).ReadString('\n')
                fmt.Print("->: " + message)
                if strings.TrimSpace(string(text)) == "STOP" {
                        fmt.Println("TCP client exiting...")
                        return
                }
        }
}

上述并发的TCP服务器和客户端可以正常工作。问题出现在当我将TCP服务器更改为发送JSON响应而不是文本响应时。当我将以下行更改为:

counter := strconv.Itoa(count) + "\n"
c.Write([]byte(string(counter)))

改为

res, err := json.Marshal(IdentitySuccess{MessageType: "newidentity", Approved: "approved"})
if err != nil {
        fmt.Printf("Error: %v", err)
}
c.Write(res)

服务器会挂起,并且不会向客户端发送任何响应。奇怪的是,当我使用Ctrl+C强制关闭服务器时,服务器会向客户端发送响应。对于这种奇怪的行为有什么想法吗?就像服务器保留响应并在退出时发送一样。

英文:

I am trying to implement a concurrent TCP server in Go and found this great explanatory article in linode where it clearly explains with a sample code. The sample code snippets for the client and server and included below.

Concurrent TCP server where for each TCP client a new go-routine is created.

package main

import (
        "bufio"
        "fmt"
        "net"
        "os"
        "strconv"
        "strings"
)

var count = 0

func handleConnection(c net.Conn) {
        fmt.Print(".")
        for {
                netData, err := bufio.NewReader(c).ReadString('\n')
                if err != nil {
                        fmt.Println(err)
                        return
                }

                temp := strings.TrimSpace(string(netData))
                if temp == "STOP" {
                        break
                }
                fmt.Println(temp)
                counter := strconv.Itoa(count) + "\n"
                c.Write([]byte(string(counter)))
        }
        c.Close()
}

func main() {
        arguments := os.Args
        if len(arguments) == 1 {
                fmt.Println("Please provide a port number!")
                return
        }

        PORT := ":" + arguments[1]
        l, err := net.Listen("tcp4", PORT)
        if err != nil {
                fmt.Println(err)
                return
        }
        defer l.Close()

        for {
                c, err := l.Accept()
                if err != nil {
                        fmt.Println(err)
                        return
                }
                go handleConnection(c)
                count++
        }
}

TCP client code snippet

package main

import (
        "bufio"
        "fmt"
        "net"
        "os"
        "strings"
)

func main() {
        arguments := os.Args
        if len(arguments) == 1 {
                fmt.Println("Please provide host:port.")
                return
        }

        CONNECT := arguments[1]
        c, err := net.Dial("tcp", CONNECT)
        if err != nil {
                fmt.Println(err)
                return
        }

        for {
                reader := bufio.NewReader(os.Stdin)
                fmt.Print(">> ")
                text, _ := reader.ReadString('\n')
                fmt.Fprintf(c, text+"\n")

                message, _ := bufio.NewReader(c).ReadString('\n')
                fmt.Print("->: " + message)
                if strings.TrimSpace(string(text)) == "STOP" {
                        fmt.Println("TCP client exiting...")
                        return
                }
        }
}

The above concurrent TCP server and client works without any issue. The issue comes when I change the TCP server to send a JSON response instead of the text response. When I change the line:

counter := strconv.Itoa(count) + "\n"
c.Write([]byte(string(counter)))

to

res, err := json.Marshal(IdentitySuccess{MessageType: "newidentity", Approved: "approved"})
if err != nil {
 	fmt.Printf("Error: %v", err)
}
c.Write(res)

the server hangs and does not send any response back to client. The strange thing is that when I shut down the server forcefully with Ctrl+C, the server sends the response to the client. Any idea about this strange behavior? It's like the server holds the response and sends it when it exists.

答案1

得分: 4

那个套接字教程,就像许多其他设计有缺陷的套接字教程一样,根本没有解释什么是应用协议,以及为什么需要它。它只是说:

在这个例子中,你实现了一个基于TCP的非官方协议。

这个"非官方"协议非常基础:消息是由换行符(\n)分隔的。

除了学习套接字的基础知识之外,在任何环境中都不应该像这样使用套接字。

你需要一个应用协议来对消息进行分帧(这样你的客户端和服务器可以识别部分和连接的消息)。

所以简短的答案是:在你的JSON之后发送一个\n。长答案是:不要使用这样的裸套接字,而是使用一个应用协议,比如HTTP。

英文:

That socket tutorial, just as so many other broken-by-design socket tutorials, doesn't explain at all what an application protocol is or why you need it. All it says is:

> In this example, you have implemented an unofficial protocol that is based on TCP.

This "unofficial" protocol is as rudimentary as it gets: messages are separated by newline characters (\n).

You should not be using sockets like that in any environment, apart from learning the basics about sockets.

You need an application protocol to frame messages (so your client and server can recognise partial and concatenated messages).

So the short answer: send a \n after your JSON. The long answer: don't use barebones sockets like this, use an application protocol, such as HTTP.

答案2

得分: 2

请注意数据竞争问题。你在不同的程序中写入和读取counter变量,没有使用同步机制。数据竞争是不可预测的。

你的实现目前还没有问题,因为你还没有测试同时查询的客户端。

通过使用-race标志构建你的程序来启用竞争检测器,像这样:go run -race . / go build -race .

我已经使用atomic包的函数修复了数据竞争问题。

在下面的代码中,我调整了你的代码,使用bufio.Scanner代替bufio.Reader,仅用于演示目的。

input := bufio.NewScanner(src)
output := bufio.NewScanner(c)
for input.Scan() {
    text := input.Text()
    fmt.Fprintf(c, "%v\n", text)
    isEOT := text == "STOP"

    if !output.Scan() {
        fmt.Fprintln(os.Stderr, output.Err())
        return
    }
    message := output.Text()
    fmt.Print("->: " + message)
    if isEOT {
        fmt.Println("All messages sent...")
        return
    }
}

我还调整了main序列,模拟了连续的两个客户端,使用预定义的输入缓冲区,并在过程中重置它。

input := `hello
world!
STOP
nopnop`
test := strings.NewReader(input)

go serve(arguments[1])

test.Reset(input)
query(arguments[1], test)

test.Reset(input)
query(arguments[1], test)

我在你的客户端中添加了一个简单的重试器,它可以帮助我们编写简单的代码。

c, err := net.Dial("tcp", addr)
for {
    if err != nil {
        fmt.Fprintln(os.Stderr, err)
        <-time.After(time.Second)
        c, err = net.Dial("tcp", addr)
        continue
    }
    break
}

整个程序被组装成一个文件,不太容易阅读输出,但更容易传输和执行。

https://play.golang.org/p/keKQsKA3fAw

在下面的示例中,我演示了如何使用JSON编组器/解组器交换结构化数据。

input := bufio.NewScanner(src)
dst := json.NewEncoder(c)
output := json.NewDecoder(c)
for input.Scan() {
    text := input.Text()
    isEOT := text == "STOP"

    err = dst.Encode(text)
    if err != nil {
        fmt.Fprintln(os.Stderr, err)
        return
    }

    var tmp interface{}
    err = output.Decode(&tmp)
    if err != nil {
        fmt.Fprintln(os.Stderr, err)
        return
    }
    fmt.Printf("->: %v\n", tmp)
    if isEOT {
        fmt.Println("All messages sent...")
        return
    }
}

但是!请注意,这个最后的版本对恶意用户是敏感的。与bufio.Scannerbufio.Reader不同,它不会检查从网络中读取的数据量。因此,它可能会累积数据直到OOM(内存耗尽)。

这对于服务器端尤其如此,在以下代码中:

defer c.Close()
defer atomic.AddUint64(&count, ^uint64(0))
input := json.NewDecoder(c)
output := json.NewEncoder(c)
fmt.Print(".")
for {
    var netData interface{}
    input.Decode(&netData)
    fmt.Printf("%v", netData)
    count := atomic.LoadUint64(&count)
    output.Encode(count)
    if x, ok := netData.(string); ok && x == "STOP" {
        break
    }
}

https://play.golang.org/p/LpIu4ofpm9e

在你最后的代码片段中,正如CodeCaster所回答的那样,不要忘记使用适当的分隔符对消息进行分帧。

英文:

take care to data races. You are writing and reading the counter variable from different routines without synchronization mechanisms. There is no benign data races.

Your implementation wont hit yet, because you are not testing simultaneous clients queries.

Enable the race detector by building your program using the -race flag, like this go run -race . / go build -race .

I have fixed the data race using the atomic package functions.

In below code, i have adjusted your code to use a bufio.Scanner instead of bufio.Reader, only for demonstration purposes.


	input := bufio.NewScanner(src)
	output := bufio.NewScanner(c)
	for input.Scan() {
		text := input.Text()
		fmt.Fprintf(c, &quot;%v\n&quot;, text)
		isEOT := text == &quot;STOP&quot;

		if !output.Scan() {
			fmt.Fprintln(os.Stderr, output.Err())
			return
		}
		message := output.Text()
		fmt.Print(&quot;-&gt;: &quot; + message)
		if isEOT {
			fmt.Println(&quot;All messages sent...&quot;)
			return
		}
	}

I also have adjusted the main sequence to simulate 2 consecutive clients, using a predefined buffer input that I reset along the way.


	input := `hello
world!
STOP
nopnop`
	test := strings.NewReader(input)

	go serve(arguments[1])
  
	test.Reset(input)
	query(arguments[1], test)

    test.Reset(input)
	query(arguments[1], test)

I added a simple retrier into your client, it helps us writing simple code.

	c, err := net.Dial(&quot;tcp&quot;, addr)
	for {
		if err != nil {
			fmt.Fprintln(os.Stderr, err)
			&lt;-time.After(time.Second)
			c, err = net.Dial(&quot;tcp&quot;, addr)
			continue
		}
		break
	}

The overall program is assembled into one file, not really good to read the output, but easier to transport around and execute.

https://play.golang.org/p/keKQsKA3fAw

In below example I demonstrate how you can use a json marshaller / unmarshaller to exchange structured data.

	input := bufio.NewScanner(src)
	dst := json.NewEncoder(c)
	output := json.NewDecoder(c)
	for input.Scan() {
		text := input.Text()
		isEOT := text == &quot;STOP&quot;

		err = dst.Encode(text)
		if err != nil {
			fmt.Fprintln(os.Stderr, err)
			return
		}

		var tmp interface{}
		err = output.Decode(&amp;tmp)
		if err != nil {
			fmt.Fprintln(os.Stderr, err)
			return
		}
		fmt.Printf(&quot;-&gt;: %v\n&quot;, tmp)
		if isEOT {
			fmt.Println(&quot;All messages sent...&quot;)
			return
		}
	}

But ! Beware, this last version is sensible to malicious users. Unlike bufio.Scanner or bufio.Reader it does not check the amount of data read on the wire. So it can possibly accumulate data until OOM.

This is particularly true for the server side of the thing, in

defer c.Close()
		defer atomic.AddUint64(&amp;count, ^uint64(0))
		input := json.NewDecoder(c)
		output := json.NewEncoder(c)
		fmt.Print(&quot;.&quot;)
		for {
			var netData interface{}
			input.Decode(&amp;netData)
			fmt.Printf(&quot;%v&quot;, netData)
			count := atomic.LoadUint64(&amp;count)
			output.Encode(count)
			if x, ok := netData.(string); ok &amp;&amp; x == &quot;STOP&quot; {
				break
			}
		}

https://play.golang.org/p/LpIu4ofpm9e

In your last piece of code, as answered by CodeCaster, don't forget to frame your messages using the appropriate delimiter.

huangapple
  • 本文由 发表于 2021年9月10日 15:54:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/69128988.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定