英文:
Getting EOF from server as client in Go
问题
我有一个用于自定义协议的Go客户端。该协议是使用lz4压缩的JSON-RPC,带有一个四字节的头部,用于指定压缩后的JSON的长度。
func ReceiveMessage(conn net.Conn) ([]byte, error) {
start := time.Now()
bodyLen := 0
body := make([]byte, 0, 4096)
buf := make([]byte, 0, 256)
for bodyLen == 0 || len(body) < bodyLen {
if len(body) > 4 {
header := body[:4]
body = body[:4]
bodyLen = int(unpack(header))
}
n, err := conn.Read(buf[:])
if err != nil {
if err != io.EOF {
return body, err
}
}
body = append(body, buf[0:n]...)
now := time.Now()
if now.Sub(start) > time.Duration(readTimeout) * time.Millisecond {
return body, fmt.Errorf("Timed-out while reading from socket.")
}
time.Sleep(time.Duration(1) * time.Millisecond)
}
return lz4.Decode(nil, body)
}
客户端代码:
func main() {
address := os.Args[1]
msg := []byte(os.Args[2])
fmt.Printf("Sending %s to %s\n", msg, address)
conn, err := net.Dial(address)
if err != nil {
fmt.Printf("%v\n", err)
return
}
// 另一个库的调用
_, err = SendMessage(conn, []byte(msg))
if err != nil {
fmt.Printf("%v\n", err)
return
}
response, err := ReceiveMessage(conn)
conn.Close()
if err != nil {
fmt.Printf("%v\n", err)
return
}
fmt.Printf("Response: %s\n", response)
}
当我调用它时,没有得到响应,它只是超时了。(如果我不明确忽略EOF,它会返回带有io.EOF错误的位置。)我有另一个用Python编写的库,它也可以使用相同的负载与相同的终端点正常工作。你立刻看到了什么问题吗?
英文:
I have some a Go client for a custom protocol. The protocol is lz4-compressed JSON-RPC with a four byte header giving the length of the compressed JSON.
func ReceiveMessage(conn net.Conn) ([]byte, error) {
start := time.Now()
bodyLen := 0
body := make([]byte, 0, 4096)
buf := make([]byte, 0, 256)
for bodyLen == 0 || len(body) < bodyLen {
if len(body) > 4 {
header := body[:4]
body = body[:4]
bodyLen = int(unpack(header))
}
n, err := conn.Read(buf[:])
if err != nil {
if err != io.EOF {
return body, err
}
}
body = append(body, buf[0:n]...)
now := time.Now()
if now.Sub(start) > time.Duration(readTimeout) * time.Millisecond {
return body, fmt.Errorf("Timed-out while reading from socket.")
}
time.Sleep(time.Duration(1) * time.Millisecond)
}
return lz4.Decode(nil, body)
}
The client:
func main() {
address := os.Args[1]
msg := []byte(os.Args[2])
fmt.Printf("Sending %s to %s\n", msg, address)
conn, err := net.Dial(address)
if err != nil {
fmt.Printf("%v\n", err)
return
}
// Another library call
_, err = SendMessage(conn, []byte(msg))
if err != nil {
fmt.Printf("%v\n", err)
return
}
response, err := ReceiveMessage(conn)
conn.Close()
if err != nil {
fmt.Printf("%v\n", err)
return
}
fmt.Printf("Response: %s\n", response)
}
When I call it, I get no response and it just times out. (If I do not explicitly ignore the EOF, it returns there with io.EOF error.) I have another library for this written in Python that also works against the same endpoint with the same payload. Do you see anything immediately?
答案1
得分: 1
[JimB刚刚在回答之前就击败了我,但我还是要说一下。]
根本问题在于你使用了body = body[:4]
,
而你想要的是body = body[4:]
。
前者只保留了前四个头部字节,
而后者则丢弃了刚刚解码的四个头部字节。
下面是一个带有一些调试日志的自包含版本,它可以正常工作。
它包含了我提到的一些其他更改。
(我猜测了一些你没有包括的东西,比如使用的lz4包、超时、unpack
等。)
package main
import (
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"net"
"time"
"github.com/bkaradzic/go-lz4"
)
const readTimeout = 30 * time.Second // XXX guess
func ReceiveMessage(conn net.Conn) ([]byte, error) {
bodyLen := 0
body := make([]byte, 0, 4096)
var buf [256]byte
conn.SetDeadline(time.Now().Add(readTimeout))
defer conn.SetDeadline(time.Time{}) // disable deadline
for bodyLen == 0 || len(body) < bodyLen {
if bodyLen == 0 && len(body) >= 4 {
bodyLen = int(unpack(body[:4]))
body = body[4:]
if bodyLen <= 0 {
return nil, errors.New("invalid body length")
}
log.Println("read bodyLen:", bodyLen)
continue
}
n, err := conn.Read(buf[:])
body = append(body, buf[:n]...)
log.Printf("appended %d bytes, len(body) now %d", n, len(body))
// Note, this is checked *after* handing any n bytes.
// An io.Reader is allowed to return data with an error.
if err != nil {
if err != io.EOF {
return nil, err
}
break
}
}
if len(body) != bodyLen {
return nil, fmt.Errorf("got %d bytes, expected %d",
len(body), bodyLen)
}
return lz4.Decode(nil, body)
}
const address = ":5678"
var msg = []byte(`{"foo":"bar"}`)
func main() {
//address := os.Args[1]
//msg := []byte(os.Args[2])
fmt.Printf("Sending %s to %s\n", msg, address)
conn, err := net.Dial("tcp", address)
if err != nil {
fmt.Printf("%v\n", err)
return
}
// Another library call
_, err = SendMessage(conn, msg)
if err != nil {
fmt.Printf("%v\n", err)
return
}
response, err := ReceiveMessage(conn)
conn.Close()
if err != nil {
fmt.Printf("%v\n", err)
return
}
fmt.Printf("Response: %s\n", response)
}
// a guess at what your `unpack` does
func unpack(b []byte) uint32 {
return binary.LittleEndian.Uint32(b)
}
func SendMessage(net.Conn, []byte) (int, error) {
// stub
return 0, nil
}
func init() {
// start a simple test server in the same process as a go-routine.
ln, err := net.Listen("tcp", address)
if err != nil {
log.Fatal(err)
}
go func() {
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
log.Fatalln("accept:", err)
}
go Serve(conn)
}
}()
}
func Serve(c net.Conn) {
defer c.Close()
// skip readding the initial request/message and just respond
const response = `{"somefield": "someval"}`
// normally (de)compression in Go is done streaming via
// an io.Reader or io.Writer but we need the final length.
data, err := lz4.Encode(nil, []byte(response))
if err != nil {
log.Println("lz4 encode:", err)
return
}
log.Println("sending len:", len(data))
if err = binary.Write(c, binary.LittleEndian, uint32(len(data))); err != nil {
log.Println("writing len:", err)
return
}
log.Println("sending data")
if _, err = c.Write(data); err != nil {
log.Println("writing compressed response:", err)
return
}
log.Println("Serve done, closing connection")
}
Playground(但无法在那里运行)。
英文:
[JimB just beat me to an answer but here goes anyway.]
The root issue is that you did body = body[:4]
when you wanted body = body[4:]
.
The former keeps only the first four header bytes
while the latter tosses
the four header bytes just decoded.
Here is a self contained version with some debug logs
that works.
It has some of the other changes I mentioned.
(I guessed at various things that you didn't include, like the lz4 package used, the timeout, unpack
, etc.)
package main
import (
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"net"
"time"
"github.com/bkaradzic/go-lz4"
)
const readTimeout = 30 * time.Second // XXX guess
func ReceiveMessage(conn net.Conn) ([]byte, error) {
bodyLen := 0
body := make([]byte, 0, 4096)
var buf [256]byte
conn.SetDeadline(time.Now().Add(readTimeout))
defer conn.SetDeadline(time.Time{}) // disable deadline
for bodyLen == 0 || len(body) < bodyLen {
if bodyLen == 0 && len(body) >= 4 {
bodyLen = int(unpack(body[:4]))
body = body[4:]
if bodyLen <= 0 {
return nil, errors.New("invalid body length")
}
log.Println("read bodyLen:", bodyLen)
continue
}
n, err := conn.Read(buf[:])
body = append(body, buf[:n]...)
log.Printf("appended %d bytes, len(body) now %d", n, len(body))
// Note, this is checked *after* handing any n bytes.
// An io.Reader is allowed to return data with an error.
if err != nil {
if err != io.EOF {
return nil, err
}
break
}
}
if len(body) != bodyLen {
return nil, fmt.Errorf("got %d bytes, expected %d",
len(body), bodyLen)
}
return lz4.Decode(nil, body)
}
const address = ":5678"
var msg = []byte(`{"foo":"bar"}`)
func main() {
//address := os.Args[1]
//msg := []byte(os.Args[2])
fmt.Printf("Sending %s to %s\n", msg, address)
conn, err := net.Dial("tcp", address)
if err != nil {
fmt.Printf("%v\n", err)
return
}
// Another library call
_, err = SendMessage(conn, msg)
if err != nil {
fmt.Printf("%v\n", err)
return
}
response, err := ReceiveMessage(conn)
conn.Close()
if err != nil {
fmt.Printf("%v\n", err)
return
}
fmt.Printf("Response: %s\n", response)
}
// a guess at what your `unpack` does
func unpack(b []byte) uint32 {
return binary.LittleEndian.Uint32(b)
}
func SendMessage(net.Conn, []byte) (int, error) {
// stub
return 0, nil
}
func init() {
// start a simple test server in the same process as a go-routine.
ln, err := net.Listen("tcp", address)
if err != nil {
log.Fatal(err)
}
go func() {
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
log.Fatalln("accept:", err)
}
go Serve(conn)
}
}()
}
func Serve(c net.Conn) {
defer c.Close()
// skip readding the initial request/message and just respond
const response = `{"somefield": "someval"}`
// normally (de)compression in Go is done streaming via
// an io.Reader or io.Writer but we need the final length.
data, err := lz4.Encode(nil, []byte(response))
if err != nil {
log.Println("lz4 encode:", err)
return
}
log.Println("sending len:", len(data))
if err = binary.Write(c, binary.LittleEndian, uint32(len(data))); err != nil {
log.Println("writing len:", err)
return
}
log.Println("sending data")
if _, err = c.Write(data); err != nil {
log.Println("writing compressed response:", err)
return
}
log.Println("Serve done, closing connection")
}
<kbd>Playground</kbd> (but not runnable there).
答案2
得分: 0
你的服务器代码有一些问题。没有一个完整的重现案例,很难确定这些问题是否会解决所有问题。
在每次迭代中,如果 len(body) > 4
,你将 body 切片回到前4个字节。Body 可能永远不会达到 >= bodyLen
。
你不需要在这里重新切片 buf,可以使用 conn.Read(buf)
。
如果出现错误,你需要处理 io.EOF
,它表示流的结束。请注意,当你遇到 EOF 时,n 可能仍然大于 0。在处理完 body 后检查 io.EOF
,否则可能会无限循环。
你可以使用 conn.SetReadDeadline
在每次读取之前设置读取超时,这样可以中断阻塞的读取。
英文:
You have a number of issues with the server code. Without a full reproducing case, it's hard to tell if these will fix everything.
for bodyLen == 0 || len(body) < bodyLen {
if len(body) > 4 {
header := body[:4]
body = body[:4]
bodyLen = int(unpack(header))
}
every iteration, if len(body) > 4
, you slice body back to the first 4 bytes. Body might never get to be >= bodyLen.
n, err := conn.Read(buf[:])
You don't need to re-slice buf here, use conn.Read(buf)
if err != nil {
if err != io.EOF {
return body, err
}
}
io.EOF
is the end of the stream, and you need to handle it. Note that n might still be > 0 when you get an EOF. Check after processing the body for io.EOF
or you could loop indefinitely.
body = append(body, buf[0:n]...)
now := time.Now()
if now.Sub(start) > time.Duration(readTimeout) * time.Millisecond {
return body, fmt.Errorf("Timed-out while reading from socket.")
you would be better off using conn.SetReadDeadline
before each read, so a stalled Read could be interrupted.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论