GoLang Protobuf:如何在同一个 TCP 连接上发送多个消息?

huangapple go评论90阅读模式
英文:

GoLang Protobuf: How to send multiple messages using the same tcp connection?

问题

我正在使用GoLang的protobuf对通过单个TCP连接发送的消息进行编码(和解码)。

.proto结构如下:

message Prepare{
   int64 instance = 1;
   int64 round = 2;
   int64 nodeId = 3;
}

然后我使用protoc工具生成相应的存根。

这是我将内容写入网络的方式:

func (t *Prepare) Marshal(wire io.Writer) {
    data, err := proto.Marshal(t)
    if err != nil {
        panic(err)
    }
    _, err = wire.Write(data)
    if err != nil {
        panic(err)
    }
}

这是我在接收方读取和解组的方式:

func (t *Prepare) Unmarshal(wire io.Reader) error {
    data := make([]byte, 8*1024*1024) 
    length, err := wire.Read(data)
    if err != nil {
        panic(err)
    }
    err = proto.Unmarshal(data[:length], t)
    if err != nil {
        panic(err)
    }
    return nil
}

如果对于每个protobuf消息,生成一个新的TCP连接,上述方法可以正常工作。但是当使用单个TCP连接传输多个消息(持久连接)时,解组过程会出现错误,错误信息为proto: invalid field number

这个问题的原因是,当使用单个连接发送protobuf消息时,不会强制执行任何消息边界,因此在读取length, err := wire.Read(data)时,data缓冲区可能包含以下内容:1)多个protobuf消息的字节,2)部分protobuf消息的字节。

protobuf文档提到了以下解决方案:

如果您想将多个消息写入单个文件或流中,您需要自己跟踪一个消息的结束和下一个消息的开始。Protocol Buffer的二进制格式不是自解释的,因此协议缓冲区解析器无法自行确定消息的结束位置。解决此问题的最简单方法是在写入消息本身之前写入每个消息的大小。在读取消息时,先读取大小,然后将字节读入单独的缓冲区,然后从该缓冲区解析。(如果您想避免将字节复制到单独的缓冲区,请查看C++和Java中的CodedInputStream类,该类可以限制读取的字节数。)

虽然这是一种直观的方法,但归根结底是一个先有鸡还是先有蛋的问题。从写入到网络的字节数组的长度(通过data, err := proto.Marshal(t); len(data)获取)是不固定的,不知道需要多少字节来表示这个数字(len(data))。现在我们面临的问题与之前相同,即如何在接收方读取时发送字节数组的长度,而不知道实际上需要多少字节来表示该length(换句话说,接收方如何知道有多少字节对应于length字段)。

对此有什么建议吗?

谢谢!

英文:

I am using GoLang protobuf for encoding (and decoding) messages that are sent through a single tcp connection.

The .proto struct

message Prepare{
   int64 instance = 1;
   int64 round = 2;
   int64 nodeId = 3;
}

Then I use the protoc tool to generate the corresponding stubs.

This is how I write the contents to the wire.

func (t *Prepare) Marshal(wire io.Writer) {

	data, err := proto.Marshal(t)
	if err != nil {
		panic(err)
	}
	_, err = wire.Write(data)
	if err != nil {
		panic(err)
	}
}

And this is how I read and unmarshall in the receiver side.

func (t *Prepare) Unmarshal(wire io.Reader) error {
	data := make([]byte, 8*1024*1024) 
	length, err := wire.Read(data)
	if err != nil {
		panic(err)
	}
	err = proto.Unmarshal(data[:length], t)
	if err != nil {
		panic(err)
	}
	return nil
}

If for each protobuf message, a new tcp connection is spawn, the above approach works fine. But when a single tcp connection is used to transmit multiple messages (persistent connections), then the unmarshalling fails with the error proto: invalid field number

This problem occurs because, protobuf messages when sent using a single connection does not enforce any message boundaries, thus when reading length, err := wire.Read(data) the data buffer can contain bytes corresponding to 1) multiple protobuff messages, and 2) partial protobuff messages.

The protobuf documentation mentions the following as a solution.

> If you want to write multiple messages to a single file or stream, it is up to you to keep track of where one message ends and the next begins. The Protocol Buffer wire format is not self-delimiting, so protocol buffer parsers cannot determine where a message ends on their own. The easiest way to solve this problem is to write the size of each message before you write the message itself. When you read the messages back in, you read the size, then read the bytes into a separate buffer, then parse from that buffer. (If you want to avoid copying bytes to a separate buffer, check out the CodedInputStream class (in both C++ and Java) which can be told to limit reads to a certain number of bytes.)

While this is an intuitive method, it boils down to a chicken-and-egg problem. The length of the byte array written to the wire (as taken from data, err := proto.Marshal(t); len(data) ) is not fixed, and its not known how many bytes will be required for representing this number (len(data)). Now we have the same problem as in, how to send the length of the byte array to read in the receiver side, without actually knowing how many bytes will be taken for that length (stated differently, how can the receiver know how many bytes are corresponding to the length field)

Any suggestions for this?

Thanks

答案1

得分: 2

我建议使用gRPC,但你已经说过你不想用那个。
我还可以建议发送简单的UTP数据包,因为UDP根本不需要连接。

如果你想坚持你目前的方法,解决方案很简单:
在将protobuf编组为字节数组之后,你知道它的长度。它是len(data),这就是你想要首先写入的值。wire.Write()实际写入的字节数将是相同的。如果不是,那么连接出现了问题,数据包只被部分写入。因此,接收方无法解组它。

在接收时,首先读取长度,准备一个具有正确大小的缓冲区,或者更好的办法是创建一个LimitedReader,然后从中解组。

字节数应该编码为整数。你可以使用32位或64位的值,还需要在小端和大端之间做出决定-你使用的是什么对于发送方和接收方来说都是无关紧要的,只要大小和字节顺序在两端相同即可。

请参考https://pkg.go.dev/encoding/binary以及在ByteOrder上定义的函数:

binary.LittleEndian.PutUint64(w, uint64(len(data)))
length := int64(binary.LittleEndian.Uint64(r))

当然,如果有一个简单的错误,或者你只错了一个字节,剩下的所有数据实际上都是无用的。通过将消息作为专用的UDP数据包发送,你可以避免这个问题。

英文:

I would recommend using gRPC, but you already stated you don't want that.
I can also recommend sending simple UTP packages, since UDP doesn't need a connection at all.

If you want to stick to your current approach, the solution is simple though:
After marshalling protobuf to a byte array, you know it's length. It's len(data) and that's the value you want to write first. The actual number of bytes written by wire.Write() will be the same. If not, there was a problem with the connection, and the package was only written partialy. So the receiver can't unmarshal it anways.

When receiving, first read the length, prepare a buffer with the correct size or, even better, make a LimitedReader and unmarshal from there.

The number-of-bytes should be encoded as an integer. You can either use a 32bit or 64bit value, and you also need to decide between little and big endian - what you use is irrelevant, as long as the size and endianess is the same on the sender and receiver side.

Take a look at https://pkg.go.dev/encoding/binary and the functions defined on ByteOrder:

binary.LittleEndian.PutUint64(w, uint64(len(data)))
length := int64(binary.LittleEndian.Uint64(r))

Of course, if there is even a simple bug or you are wrong by only one byte, all the remaining data is effectively useless. By sending messages as dedicated UDP packages, you can avoid this issue.

答案2

得分: 0

对于问题中提到的确切场景,以下是上述答案的详细说明:

// 将 Prepare 结构体编组为字节流并写入到 io.Writer 中
func (t *Prepare) Marshal(wire io.Writer) {
    // 使用 proto.Marshal 将结构体 t 编码为字节流
    data, err := proto.Marshal(t)
    if err != nil {
        panic(err)
    }
    lengthWritten := len(data)
    var b [16]byte
    bs := b[:16]
    // 使用 binary.LittleEndian.PutUint64 将 lengthWritten 转换为字节流
    binary.LittleEndian.PutUint64(bs, uint64(lengthWritten))
    // 将字节流 bs 写入到 io.Writer 中
    _, err = wire.Write(bs)
    if err != nil {
        panic(err)
    }
    // 将字节流 data 写入到 io.Writer 中
    _, err = wire.Write(data)
    if err != nil {
        panic(err)
    }
}

// 从 io.Reader 中读取字节流并解组为 Prepare 结构体
func (t *Prepare) Unmarshal(wire io.Reader) error {
    var b [16]byte
    bs := b[:16]
    // 从 io.Reader 中读取字节流 bs
    _, err := io.ReadFull(wire, bs)
    // 使用 binary.LittleEndian.Uint64 将字节流 bs 转换为 uint64 类型的 numBytes
    numBytes := uint64(binary.LittleEndian.Uint64(bs))
    // 创建长度为 numBytes 的字节切片 data
    data := make([]byte, numBytes)
    // 从 io.Reader 中读取字节流 data
    length, err := io.ReadFull(wire, data)
    if err != nil {
        panic(err)
    }
    // 使用 proto.Unmarshal 将字节流 data[:length] 解码为结构体 t
    err = proto.Unmarshal(data[:length], t)
    if err != nil {
        panic(err)
    }
    return nil
}

以上是给定问题中所提到的情景下的详细解答。

英文:

Elaborating the above answer for the exact scenario mentioned in the question

func (t *Prepare) Marshal(wire io.Writer) {
data, err := proto.Marshal(t)
if err != nil {
panic(err)
}
lengthWritten := len(data)
var b [16]byte
bs := b[:16]
binary.LittleEndian.PutUint64(bs, uint64(lengthWritten))
_, err = wire.Write(bs)
if err != nil {
panic(err)
}
_, err = wire.Write(data)
if err != nil {
panic(err)
}
}
func (t *Prepare) Unmarshal(wire io.Reader) error {
var b [16]byte
bs := b[:16]
_, err := io.ReadFull(wire, bs)
numBytes := uint64(binary.LittleEndian.Uint64(bs))
data := make([]byte, numBytes)
length, err := io.ReadFull(wire, data)
if err != nil {
panic(err)
}
err = proto.Unmarshal(data[:length], t)
if err != nil {
panic(err)
}
return nil
}

huangapple
  • 本文由 发表于 2021年8月3日 19:58:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/68635618.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定