Getting EOF from server as client in Go

huangapple go评论65阅读模式
英文:

Getting EOF from server as client in Go

问题

我有一个用于自定义协议的Go客户端。该协议是使用lz4压缩的JSON-RPC,带有一个四字节的头部,用于指定压缩后的JSON的长度。

func ReceiveMessage(conn net.Conn) ([]byte, error) {
    start := time.Now()
    bodyLen := 0
    body := make([]byte, 0, 4096)
    buf := make([]byte, 0, 256)

    for bodyLen == 0 || len(body) < bodyLen {
        if len(body) > 4 {
            header := body[:4]
            body = body[:4]
            bodyLen = int(unpack(header))
        }

        n, err := conn.Read(buf[:])
        if err != nil {
            if err != io.EOF {
                return body, err
            }
        }
        body = append(body, buf[0:n]...)

        now := time.Now()
        if now.Sub(start) > time.Duration(readTimeout) * time.Millisecond {
            return body, fmt.Errorf("Timed-out while reading from socket.")
        }
        time.Sleep(time.Duration(1) * time.Millisecond)
    }

    return lz4.Decode(nil, body)
}

客户端代码:

func main() {
    address := os.Args[1]
    msg := []byte(os.Args[2])

    fmt.Printf("Sending %s to %s\n", msg, address)

    conn, err := net.Dial(address)
    if err != nil {
        fmt.Printf("%v\n", err)
        return
    }

    // 另一个库的调用
    _, err = SendMessage(conn, []byte(msg))
    if err != nil {
        fmt.Printf("%v\n", err)
        return
    }

    response, err := ReceiveMessage(conn)
    conn.Close()

    if err != nil {
        fmt.Printf("%v\n", err)
        return
    }

    fmt.Printf("Response: %s\n", response)
}

当我调用它时,没有得到响应,它只是超时了。(如果我不明确忽略EOF,它会返回带有io.EOF错误的位置。)我有另一个用Python编写的库,它也可以使用相同的负载与相同的终端点正常工作。你立刻看到了什么问题吗?

英文:

I have some a Go client for a custom protocol. The protocol is lz4-compressed JSON-RPC with a four byte header giving the length of the compressed JSON.

func ReceiveMessage(conn net.Conn) ([]byte, error) {
    start := time.Now()
    bodyLen := 0
    body := make([]byte, 0, 4096)
    buf := make([]byte, 0, 256)

    for bodyLen == 0 || len(body) &lt; bodyLen {
        if len(body) &gt; 4 {
            header := body[:4]
            body = body[:4]
            bodyLen = int(unpack(header))
        }

        n, err := conn.Read(buf[:])
    	if err != nil {
            if err != io.EOF {
                return body, err
            }
    	}
		body = append(body, buf[0:n]...)

		now := time.Now()
    	if now.Sub(start) &gt; time.Duration(readTimeout) * time.Millisecond     {
			return body, fmt.Errorf(&quot;Timed-out while reading from socket.&quot;)
    	}
	    time.Sleep(time.Duration(1) * time.Millisecond)
	}

    return lz4.Decode(nil, body)
}

The client:

func main() {
	address := os.Args[1]
	msg := []byte(os.Args[2])

	fmt.Printf(&quot;Sending %s to %s\n&quot;, msg, address)

	conn, err := net.Dial(address)
	if err != nil {
		fmt.Printf(&quot;%v\n&quot;, err)
		return
	}

    // Another library call
	_, err = SendMessage(conn, []byte(msg))
	if err != nil {
        fmt.Printf(&quot;%v\n&quot;, err)
	    return
    }

	response, err := ReceiveMessage(conn)
	conn.Close()

	if err != nil {
		fmt.Printf(&quot;%v\n&quot;, err)
		return
	}

	fmt.Printf(&quot;Response: %s\n&quot;, response)
}

When I call it, I get no response and it just times out. (If I do not explicitly ignore the EOF, it returns there with io.EOF error.) I have another library for this written in Python that also works against the same endpoint with the same payload. Do you see anything immediately?

答案1

得分: 1

[JimB刚刚在回答之前就击败了我,但我还是要说一下。]

根本问题在于你使用了body = body[:4]
而你想要的是body = body[4:]
前者只保留了前四个头部字节,
而后者则丢弃了刚刚解码的四个头部字节。

下面是一个带有一些调试日志的自包含版本,它可以正常工作。
它包含了我提到的一些其他更改。
(我猜测了一些你没有包括的东西,比如使用的lz4包、超时、unpack等。)

package main

import (
	"encoding/binary"
	"errors"
	"fmt"
	"io"
	"log"
	"net"
	"time"

	"github.com/bkaradzic/go-lz4"
)

const readTimeout = 30 * time.Second // XXX guess

func ReceiveMessage(conn net.Conn) ([]byte, error) {
	bodyLen := 0
	body := make([]byte, 0, 4096)
	var buf [256]byte

	conn.SetDeadline(time.Now().Add(readTimeout))
	defer conn.SetDeadline(time.Time{}) // disable deadline
	for bodyLen == 0 || len(body) < bodyLen {
		if bodyLen == 0 && len(body) >= 4 {
			bodyLen = int(unpack(body[:4]))
			body = body[4:]
			if bodyLen <= 0 {
				return nil, errors.New("invalid body length")
			}
			log.Println("read bodyLen:", bodyLen)
			continue
		}

		n, err := conn.Read(buf[:])
		body = append(body, buf[:n]...)
		log.Printf("appended %d bytes, len(body) now %d", n, len(body))
		// Note, this is checked *after* handing any n bytes.
		// An io.Reader is allowed to return data with an error.
		if err != nil {
			if err != io.EOF {
				return nil, err
			}
			break
		}
	}
	if len(body) != bodyLen {
		return nil, fmt.Errorf("got %d bytes, expected %d",
			len(body), bodyLen)
	}

	return lz4.Decode(nil, body)
}

const address = ":5678"

var msg = []byte(`{"foo":"bar"}`)

func main() {
	//address := os.Args[1]
	//msg := []byte(os.Args[2])

	fmt.Printf("Sending %s to %s\n", msg, address)

	conn, err := net.Dial("tcp", address)
	if err != nil {
		fmt.Printf("%v\n", err)
		return
	}

	// Another library call
	_, err = SendMessage(conn, msg)
	if err != nil {
		fmt.Printf("%v\n", err)
		return
	}

	response, err := ReceiveMessage(conn)
	conn.Close()

	if err != nil {
		fmt.Printf("%v\n", err)
		return
	}

	fmt.Printf("Response: %s\n", response)
}

// a guess at what your `unpack` does
func unpack(b []byte) uint32 {
	return binary.LittleEndian.Uint32(b)
}

func SendMessage(net.Conn, []byte) (int, error) {
	// stub
	return 0, nil
}

func init() {
	// start a simple test server in the same process as a go-routine.
	ln, err := net.Listen("tcp", address)
	if err != nil {
		log.Fatal(err)
	}
	go func() {
		defer ln.Close()
		for {
			conn, err := ln.Accept()
			if err != nil {
				log.Fatalln("accept:", err)
			}
			go Serve(conn)
		}
	}()
}

func Serve(c net.Conn) {
	defer c.Close()
	// skip readding the initial request/message and just respond
	const response = `{"somefield": "someval"}`
	// normally (de)compression in Go is done streaming via
	// an io.Reader or io.Writer but we need the final length.
	data, err := lz4.Encode(nil, []byte(response))
	if err != nil {
		log.Println("lz4 encode:", err)
		return
	}
	log.Println("sending len:", len(data))
	if err = binary.Write(c, binary.LittleEndian, uint32(len(data))); err != nil {
		log.Println("writing len:", err)
		return
	}
	log.Println("sending data")
	if _, err = c.Write(data); err != nil {
		log.Println("writing compressed response:", err)
		return
	}
	log.Println("Serve done, closing connection")
}

Playground(但无法在那里运行)。

英文:

[JimB just beat me to an answer but here goes anyway.]

The root issue is that you did body = body[:4]
when you wanted body = body[4:].
The former keeps only the first four header bytes
while the latter tosses
the four header bytes just decoded.

Here is a self contained version with some debug logs
that works.
It has some of the other changes I mentioned.
(I guessed at various things that you didn't include, like the lz4 package used, the timeout, unpack, etc.)

package main
import (
&quot;encoding/binary&quot;
&quot;errors&quot;
&quot;fmt&quot;
&quot;io&quot;
&quot;log&quot;
&quot;net&quot;
&quot;time&quot;
&quot;github.com/bkaradzic/go-lz4&quot;
)
const readTimeout = 30 * time.Second // XXX guess
func ReceiveMessage(conn net.Conn) ([]byte, error) {
bodyLen := 0
body := make([]byte, 0, 4096)
var buf [256]byte
conn.SetDeadline(time.Now().Add(readTimeout))
defer conn.SetDeadline(time.Time{}) // disable deadline
for bodyLen == 0 || len(body) &lt; bodyLen {
if bodyLen == 0 &amp;&amp; len(body) &gt;= 4 {
bodyLen = int(unpack(body[:4]))
body = body[4:]
if bodyLen &lt;= 0 {
return nil, errors.New(&quot;invalid body length&quot;)
}
log.Println(&quot;read bodyLen:&quot;, bodyLen)
continue
}
n, err := conn.Read(buf[:])
body = append(body, buf[:n]...)
log.Printf(&quot;appended %d bytes, len(body) now %d&quot;, n, len(body))
// Note, this is checked *after* handing any n bytes.
// An io.Reader is allowed to return data with an error.
if err != nil {
if err != io.EOF {
return nil, err
}
break
}
}
if len(body) != bodyLen {
return nil, fmt.Errorf(&quot;got %d bytes, expected %d&quot;,
len(body), bodyLen)
}
return lz4.Decode(nil, body)
}
const address = &quot;:5678&quot;
var msg = []byte(`{&quot;foo&quot;:&quot;bar&quot;}`)
func main() {
//address := os.Args[1]
//msg := []byte(os.Args[2])
fmt.Printf(&quot;Sending %s to %s\n&quot;, msg, address)
conn, err := net.Dial(&quot;tcp&quot;, address)
if err != nil {
fmt.Printf(&quot;%v\n&quot;, err)
return
}
// Another library call
_, err = SendMessage(conn, msg)
if err != nil {
fmt.Printf(&quot;%v\n&quot;, err)
return
}
response, err := ReceiveMessage(conn)
conn.Close()
if err != nil {
fmt.Printf(&quot;%v\n&quot;, err)
return
}
fmt.Printf(&quot;Response: %s\n&quot;, response)
}
// a guess at what your `unpack` does
func unpack(b []byte) uint32 {
return binary.LittleEndian.Uint32(b)
}
func SendMessage(net.Conn, []byte) (int, error) {
// stub
return 0, nil
}
func init() {
// start a simple test server in the same process as a go-routine.
ln, err := net.Listen(&quot;tcp&quot;, address)
if err != nil {
log.Fatal(err)
}
go func() {
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
log.Fatalln(&quot;accept:&quot;, err)
}
go Serve(conn)
}
}()
}
func Serve(c net.Conn) {
defer c.Close()
// skip readding the initial request/message and just respond
const response = `{&quot;somefield&quot;: &quot;someval&quot;}`
// normally (de)compression in Go is done streaming via
// an io.Reader or io.Writer but we need the final length.
data, err := lz4.Encode(nil, []byte(response))
if err != nil {
log.Println(&quot;lz4 encode:&quot;, err)
return
}
log.Println(&quot;sending len:&quot;, len(data))
if err = binary.Write(c, binary.LittleEndian, uint32(len(data))); err != nil {
log.Println(&quot;writing len:&quot;, err)
return
}
log.Println(&quot;sending data&quot;)
if _, err = c.Write(data); err != nil {
log.Println(&quot;writing compressed response:&quot;, err)
return
}
log.Println(&quot;Serve done, closing connection&quot;)
}

<kbd>Playground</kbd> (but not runnable there).

答案2

得分: 0

你的服务器代码有一些问题。没有一个完整的重现案例,很难确定这些问题是否会解决所有问题。

在每次迭代中,如果 len(body) > 4,你将 body 切片回到前4个字节。Body 可能永远不会达到 >= bodyLen

你不需要在这里重新切片 buf,可以使用 conn.Read(buf)

如果出现错误,你需要处理 io.EOF,它表示流的结束。请注意,当你遇到 EOF 时,n 可能仍然大于 0。在处理完 body 后检查 io.EOF,否则可能会无限循环。

你可以使用 conn.SetReadDeadline 在每次读取之前设置读取超时,这样可以中断阻塞的读取。

英文:

You have a number of issues with the server code. Without a full reproducing case, it's hard to tell if these will fix everything.

    for bodyLen == 0 || len(body) &lt; bodyLen {
if len(body) &gt; 4 {
header := body[:4]
body = body[:4]
bodyLen = int(unpack(header))
}

every iteration, if len(body) &gt; 4, you slice body back to the first 4 bytes. Body might never get to be >= bodyLen.

        n, err := conn.Read(buf[:])

You don't need to re-slice buf here, use conn.Read(buf)

        if err != nil {
if err != io.EOF {
return body, err
}
}

io.EOF is the end of the stream, and you need to handle it. Note that n might still be > 0 when you get an EOF. Check after processing the body for io.EOF or you could loop indefinitely.

        body = append(body, buf[0:n]...)
now := time.Now()
if now.Sub(start) &gt; time.Duration(readTimeout) * time.Millisecond     {
return body, fmt.Errorf(&quot;Timed-out while reading from socket.&quot;)

you would be better off using conn.SetReadDeadline before each read, so a stalled Read could be interrupted.

huangapple
  • 本文由 发表于 2015年4月29日 03:17:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/29928033.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定