使用net.Conn连接进行读写操作的正确处理方法是什么?

huangapple go评论89阅读模式
英文:

Proper handling of reading and writing with a net.Conn connection

问题

我正在尝试使用网络连接进行读写操作。目前的情况是,如果数据传输速度过快,服务器会丢弃或丢失一些数据。客户端连接后,协商连接,然后我让它发送3个连续的命令来更新状态页面。每次通信都是一个JSON字符串,被转换为结构体并使用存储的密钥进行解码。

如果我在客户端上多次点击请求(每次生成3个以\n结尾的JSON数据包),服务器在某个时刻会抛出错误:在顶级值之后出现无效字符X。我将客户端发送的信息进行了转储,看起来在客户端端是3个格式正确的JSON条目;而在服务器端,似乎有一个JSON数据包完全丢失,另一个JSON数据包丢失了前面的515个字符。由于缺少了前面的515个字符,JSON格式不正确,因此解组失败。

我的问题是,我该如何防止从连接中丢失数据读取?我是否遇到了某种竞争条件或者没有正确处理连接的读取和发送?

下面是我用于客户端连接处理程序的基本代码。客户端和服务器协商加密连接,因此有几个与模式和RSA状态相关的引用,当密钥正确设置时,使用模式4来使服务器和客户端可以交换命令和结果。在高层次上,处理程序会启动一个goroutine来从连接中读取数据并将其发送到一个通道中。然后将该字符串读取并转换为结构体。会话的第一部分用于“握手”,以协商加密密钥并保存会话信息;一旦达到第4阶段,结构体将携带来自客户端的加密命令并将结果发送回,直到连接发生错误或关闭。

func HandleClientConnection(conClient net.Conn) {
	defer conClient.Close()

	chnLogging <- "Connection from " + conClient.RemoteAddr().String()

	tmTimeout := time.NewTimer(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)
	chnCloseConn := make(chan bool)

	chnDataFromClient := make(chan string, 1000)
	go func(chnData chan string) {

		for {
			netData, err := bufio.NewReader(conClient).ReadString('\n')
			if err != nil {
				if !strings.Contains(err.Error(), "EOF") {
					chnLogging <- "Error from client " + conClient.RemoteAddr().String() + ": " + err.Error()
				} else {
					chnLogging <- "Client " + conClient.RemoteAddr().String() + " disconnected"
				}
				chnCloseConn <- true
				return
			}

			tmTimeout.Stop()
			tmTimeout.Reset(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)

			chnData <- netData
		}
	}(chnDataFromClient)

	for {
		select {
		case <-chnCloseConn:
			chnLogging <- "Connection listener exiting for " + conClient.RemoteAddr().String()
			return
		case <-tmTimeout.C:
			chnLogging <- "Connection Timeout for " + conClient.RemoteAddr().String()
			return
		case strNetData := <-chnDataFromClient:

			var strctNetEncrypted stctNetEncrypted
			err := json.Unmarshal([]byte(strNetData), &strctNetEncrypted)
			CheckErr(err)

			switch strctNetEncrypted.IntMode {
			case 1:

				keyPrivateKey, err := rsa.GenerateKey(rand.Reader, 2048)
				CheckErr(err)

				btServerPrivateKey, err := json.Marshal(keyPrivateKey)
				CheckErr(err)

				strctClientPubKeys.SetClientPubkey(strctNetEncrypted.BtPubKey, btServerPrivateKey)

				defer strctClientPubKeys.DelClientPubkey(strctNetEncrypted.BtPubKey)

				strctConnections.SetConnection(strctNetEncrypted.BtPubKey, conClient)

				defer strctConnections.DelConnection(strctNetEncrypted.BtPubKey)

				strctNetResponse := CreateStctNetEncryptedToClient("", strctNetEncrypted.BtPubKey, 2)
				if strctNetResponse.BtPubKey == nil ||
					strctNetResponse.BtRemotePubKey == nil {
					chnLogging <- "Error generating stage two response struct"
					chnCloseConn <- true
					return
				}

				btJSON, err := json.Marshal(strctNetResponse)
				CheckErr(err)
				chnLogging <- "Sending stage 2 negotation response"
				conClient.Write(btJSON)
				conClient.Write([]byte("\n"))

			case 2:

				chnLogging <- "WARNING: Received mode 2 network communication even though I shouldn't have"

			case 3:

				chnLogging <- "Received stage 3 negotiation response"

				strMessage, err := strctNetEncrypted.RSADecrypt()
				CheckErr(err)

				if len(strMessage) != 32 {
					chnLogging <- "Unexpected shared key length; Aborting"
					chnCloseConn <- true
					return
				}

				strctClientPubKeys.SetClientSharedKey(strMessage, strctNetEncrypted.BtPubKey, conClient.RemoteAddr().String())

			case 4:

				strMessageDecrypted := DecryptPayloadFromClient(strctNetEncrypted)

				if strMessageDecrypted != "" {

					if strings.ToLower(strMessageDecrypted) == "close" {
						chnLogging <- "Client requests disconnection"
						chnCloseConn <- true
						return
					}

					// Keepalive message; disregard
					if strMessageDecrypted == "PING" {
						continue
					}

					btResult := InterpretClientCommand(strMessageDecrypted)

					strctResponse := CreateStctNetEncryptedToClient(string(btResult), strctNetEncrypted.BtPubKey, 4)

					btJSON, err := json.Marshal(strctResponse)
					CheckErr(err)
					conClient.Write(btJSON)
					conClient.Write([]byte("\n"))

				} else {
					chnLogging <- "Invalid command \"" + strMessageDecrypted + "\""
				}

			default:

				chnLogging <- "ERROR: Message received without mode set"
			}
		}
	}
}

希望对你有所帮助!如果你还有其他问题,请随时提问。

英文:

I am trying to read and write using a network connection. What seems to be happening is some data is being discarded or lost if data comes in too quick to the server. The client connects, negotiates a connection, then I have it send 3 commands in quick succession to update a status page. Each communication is a JSON string that is translated to a struct and decoded with a stored key.

If I click the request on the client several times (each generating 3 \n-terminated JSON payloads), the server at some point will throw an ERROR: invalid character X after top-level value. I dumped the information being sent by the client and it looks like 3 properly formed JSON entries on the client side; on the server side, it looks like one of the JSON payloads is missing completely and one of them is missing the first 515 characters. Because the first 515 characters are missing, the JSON is malformed, therefore the marshaling fails.

My question is, what can I do to prevent the loss of data reads from the connection? Am I hitting some kind of race condition or mishandling how to read and send on the connection?

Below is basically what I'm using for the client connection handler. The client and server negotiate an encrypted connection so there are several references to mode and RSA status, and mode 4 is used when the keys are properly set so the server and client can exchange commands and results. At a high level, the handler spins off a goroutine that reads from the connection and sends it to a channel. That string is read and converted to a struct. The first part of the session is dedicated to a "handshake" to negotiate an encryption key and save the session information; once stage 4 is reached the struct carries encrypted commands from the client and sends the results back until the connection errors or is closed.

func HandleClientConnection(conClient net.Conn) {
defer conClient.Close()
chnLogging &lt;- &quot;Connection from &quot; + conClient.RemoteAddr().String()
tmTimeout := time.NewTimer(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)
chnCloseConn := make(chan bool)
chnDataFromClient := make(chan string, 1000)
go func(chnData chan string) {
for {
netData, err := bufio.NewReader(conClient).ReadString(&#39;\n&#39;)
if err != nil {
if !strings.Contains(err.Error(), &quot;EOF&quot;) {
chnLogging &lt;- &quot;Error from client &quot; + conClient.RemoteAddr().String() + &quot;: &quot; + err.Error()
} else {
chnLogging &lt;- &quot;Client &quot; + conClient.RemoteAddr().String() + &quot; disconnected&quot;
}
chnCloseConn &lt;- true
return
}
tmTimeout.Stop()
tmTimeout.Reset(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)
chnData &lt;- netData
}
}(chnDataFromClient)
for {
select {
case &lt;-chnCloseConn:
chnLogging &lt;- &quot;Connection listener exiting for &quot; + conClient.RemoteAddr().String()
return
case &lt;-tmTimeout.C:
chnLogging &lt;- &quot;Connection Timeout for &quot; + conClient.RemoteAddr().String()
return
case strNetData := &lt;-chnDataFromClient:
var strctNetEncrypted stctNetEncrypted
err := json.Unmarshal([]byte(strNetData), &amp;strctNetEncrypted)
CheckErr(err)
switch strctNetEncrypted.IntMode {
case 1:
keyPrivateKey, err := rsa.GenerateKey(rand.Reader, 2048)
CheckErr(err)
btServerPrivateKey, err := json.Marshal(keyPrivateKey)
CheckErr(err)
strctClientPubKeys.SetClientPubkey(strctNetEncrypted.BtPubKey, btServerPrivateKey)
defer strctClientPubKeys.DelClientPubkey(strctNetEncrypted.BtPubKey)
strctConnections.SetConnection(strctNetEncrypted.BtPubKey, conClient)
defer strctConnections.DelConnection(strctNetEncrypted.BtPubKey)
strctNetResponse := CreateStctNetEncryptedToClient(&quot;&quot;, strctNetEncrypted.BtPubKey, 2)
if strctNetResponse.BtPubKey == nil ||
strctNetResponse.BtRemotePubKey == nil {
chnLogging &lt;- &quot;Error generating stage two response struct&quot;
chnCloseConn &lt;- true
return
}
btJSON, err := json.Marshal(strctNetResponse)
CheckErr(err)
chnLogging &lt;- &quot;Sending stage 2 negotation response&quot;
conClient.Write(btJSON)
conClient.Write([]byte(&quot;\n&quot;))
case 2:
chnLogging &lt;- &quot;WARNING: Received mode 2 network communication even though I shouldn&#39;t have&quot;
case 3:
chnLogging &lt;- &quot;Received stage 3 negotiation response&quot;
strMessage, err := strctNetEncrypted.RSADecrypt()
CheckErr(err)
if len(strMessage) != 32 {
chnLogging &lt;- &quot;Unexpected shared key length; Aborting&quot;
chnCloseConn &lt;- true
return
}
strctClientPubKeys.SetClientSharedKey(strMessage, strctNetEncrypted.BtPubKey, conClient.RemoteAddr().String())
case 4:
strMessageDecrypted := DecryptPayloadFromClient(strctNetEncrypted)
if strMessageDecrypted != &quot;&quot; {
if strings.ToLower(strMessageDecrypted) == &quot;close&quot; {
chnLogging &lt;- &quot;Client requests disconnection&quot;
chnCloseConn &lt;- true
return
}
// Keepalive message; disregard
if strMessageDecrypted == &quot;PING&quot; {
continue
}
btResult := InterpretClientCommand(strMessageDecrypted)
strctResponse := CreateStctNetEncryptedToClient(string(btResult), strctNetEncrypted.BtPubKey, 4)
btJSON, err := json.Marshal(strctResponse)
CheckErr(err)
conClient.Write(btJSON)
conClient.Write([]byte(&quot;\n&quot;))
} else {
chnLogging &lt;- &quot;Invalid command \&quot;&quot; + strMessageDecrypted + &quot;\&quot;&quot;
}
default:
chnLogging &lt;- &quot;ERROR: Message received without mode set&quot;
}
}
}
}

答案1

得分: 1

应用程序将数据读入缓冲读取器,然后丢弃读取器和可能已缓冲超过第一行的任何数据。

保留缓冲读取器以维持连接的生命周期:

rdr := bufio.NewReader(conClient)
for {
    netData, err := rdr.ReadString('\n')
    ...

你可以简化代码(并修复与缓冲问题无关的其他问题)通过消除 goroutine。使用读取超时处理无响应的服务器。

func HandleClientConnection(conClient net.Conn) {
    defer conClient.Close()
    chnLogging <- "Connection from " + conClient.RemoteAddr().String()
    conClient.SetReadDeadline(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)
    scanner := bufio.NewScanner(conClient)
    for scanner.Scan() {
        var strctNetEncrypted stctNetEncrypted
        err := json.Unmarshal(scanner.Bytes(), &strctNetEncrypted)
        CheckErr(err)
        switch strctNetEncrypted.IntMode {
            // 在此处插入问题中的 switch 语句的内容,
            // 并删除对 chnCloseConn 的引用。
        }
        conClient.SetReadDeadline(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)
    }
    if scanner.Err() != nil {
        chnLogging <- "Error from client " + conClient.RemoteAddr().String() + ": " + err.Error()
    } else {
        chnLogging <- "Client " + conClient.RemoteAddr().String() + " disconnected"
    }
}
英文:

The application slurps up data to a buffered reader and then discards the reader and any data it may have buffered past the first line.

Retain the buffered reader for the lifetime of the connection:

    rdr := bufio.NewReader(conClient)
for {
netData, err := rdr.ReadString(&#39;\n&#39;)
...

You can simplify the code (and fix other issues unrelated to the buffer issue) by eliminating the goroutine. Use the read deadline to handle the unresponsive server.

func HandleClientConnection(conClient net.Conn) {
defer conClient.Close()
chnLogging &lt;- &quot;Connection from &quot; + conClient.RemoteAddr().String()
conClient.SetReadDeadline(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)
scanner := bufio.NewScanner(conClient)
for scanner.Scan() {
var strctNetEncrypted stctNetEncrypted
err := json.Unmarshal(scanner.Bytes(), &amp;strctNetEncrypted)
CheckErr(err)
switch strctNetEncrypted.IntMode {
// Insert contents of switch statement from
// question here with references to 
// chnCloseConn removed.
}
conClient.SetReadDeadline(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)
}
if scanner.Err() != nil {
chnLogging &lt;- &quot;Error from client &quot; + conClient.RemoteAddr().String() + &quot;: &quot; + err.Error()
} else {
chnLogging &lt;- &quot;Client &quot; + conClient.RemoteAddr().String() + &quot; disconnected&quot;
}
}

huangapple
  • 本文由 发表于 2021年11月10日 07:35:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/69906348.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定