go-libp2p – 从流中接收字节

huangapple go评论90阅读模式
英文:

go-libp2p - receiving bytes from stream

问题

我正在构建我的第一个go-libp2p应用程序,并尝试修改echo示例,以便读取一个[]byte而不是示例中的string

在我的代码中,我将doEcho函数更改为运行io.ReadAll(s),而不是示例中的bufio.NewReader(s),然后跟随ReadString('\n')

// doEcho 从流中读取一行数据并将其写回
func doEcho(s network.Stream) error {
    b, err := io.ReadAll(s)
    if err != nil {
        return err
    }
    log.Printf("接收到的字节数:%d", len(b))

    _, err = s.Write([]byte("感谢提供的字节"))
    return err
}

当我运行这段代码并发送一条消息时,我确实看到了listener received new stream的日志,但doEcho函数在io.ReadAll(s)调用之后被阻塞住,从未执行回复。

所以我的问题是:

  1. 为什么我的代码不起作用,我该如何使其起作用?
  2. io.ReadAll(s)bufioReadString('\n')在底层是如何工作的,以致它们导致这种行为上的差异?

编辑:
根据@Stephan Schlecht的建议,我将代码更改为以下形式,但它仍然像之前一样被阻塞住:

func doEcho(s network.Stream) error {
    buf := bufio.NewReader(s)
    var data []byte

    for {
        b, err := buf.ReadByte()
        if err != nil {
            break
        }
        data = append(data, b)
    }

    log.Printf("接收到的字节数:%d", len(data))

    _, err := s.Write([]byte("感谢提供的字节"))
    return err
}

编辑2:我忘了澄清一下,但我不想使用ReadString('\n')ReadBytes('\n'),因为我对接收到的[]byte一无所知,所以它可能不以\n结尾。我想从流中读取任何[]byte,然后写回流中。

英文:

I'm building my first go-libp2p application and trying to modify the echo example to read a []byte instead of a string as in the example.

In my code, I changed the doEcho function to run io.ReadAll(s) instead of bufio.NewReader(s) followed by ReadString('\n'):

// doEcho reads a line of data a stream and writes it back
func doEcho(s network.Stream) error {
     b, err := io.ReadAll(s)
     if err != nil {
	     return err
     }
     log.Printf("Number of bytes received: %d", len(b))

     _, err = s.Write([]byte("thanks for the bytes"))
     return err
}

When I run this and send a message, I do see the listener received new stream log but the doEcho function gets stuck after the io.ReadAll(s) call and never executes the reply.

So my questions are:

  1. Why does my code not work and how can I make it work?
  2. How does io.ReadAll(s) and bufio's ReadString('\n') work under the hood so that they cause this difference in behavior?

Edit:
As per @Stephan Schlecht suggestion I changed my code to this, but it still remains blocked as before:

func doEcho(s network.Stream) error {
    buf := bufio.NewReader(s)
    var data []byte

    for {
	    b, err := buf.ReadByte()
	    if err != nil {
		    break
	    }
	    data = append(data, b)
    }

    log.Printf("Number of bytes received: %d", len(data))

    _, err := s.Write([]byte("thanks for the bytes"))
    return err
}

Edit 2: I forgot to clarify this, but I don't want to use ReadString('\n') or ReadBytes('\n') because I don't know anything about the []byte I'm receiving, so it might not end with \n. I want to read any []byte from the stream and then write back to the stream.

答案1

得分: 1

ReadString('\n') 会读取输入中第一个 \n 出现之前的内容,并返回字符串。

io.ReadAll(s) 会一直读取直到出现错误或者 EOF,并返回读取到的数据。所以除非出现错误或者 EOF,否则它不会返回。

原则上,在流式连接中,接收到的数据结构没有固定的大小。

这取决于远程发送方。

如果远程发送方发送的是二进制数据,并在发送最后一个字节后关闭流,那么你可以简单地在接收方读取所有数据直到 EOF。

如果流不立即关闭且数据大小可变,还有其他可能性:首先发送一个具有定义大小的头部,最简单的情况下只需传输数据的长度。一旦接收到指定数量的数据,就知道这一轮接收完成,可以继续下一轮。

另外,你可以定义一个特殊字符来标记要传输的数据结构的结束。如果要传输未经编码的任意二进制数据,这种方法将不起作用。

还有其他一些稍微复杂的选项,比如将数据分成块。

在问题中链接的示例中,刚刚发送的数据末尾发送了一个 \n,但如果要发送任意二进制数据,这种方法将不起作用。

修改后的回显示例

为了最小限度地修改问题中链接的回显示例,以首先发送一个带有有效负载长度的 1 字节头部,然后再发送实际的有效负载,代码可能如下所示:

发送

runSender 函数中,可以将当前发送有效负载的代码:

	log.Println("sender saying hello")
	_, err = s.Write([]byte("Hello, world!\n"))
	if err != nil {
		log.Println(err)
		return
	}

替换为:

	log.Println("sender saying hello")
	payload := []byte("Hello, world!")
	header := []byte{byte(len(payload))}
	_, err = s.Write(header)
	if err != nil {
		log.Println(err)
		return
	}
	_, err = s.Write(payload)
	if err != nil {
		log.Println(err)
		return
	}

因此,在实际有效负载之前,我们发送了一个字节的有效负载长度。

回显

doEcho 函数首先读取头部,然后再读取有效负载。它使用 ReadFull 函数,该函数会精确地读取 len(payload) 字节。

func doEcho(s network.Stream) error {
	buf := bufio.NewReader(s)
	header, err := buf.ReadByte()
	if err != nil {
		return err
	}

	payload := make([]byte, header)
	n, err := io.ReadFull(buf, payload)
	log.Printf("payload has %d bytes", n)
	if err != nil {
		return err
	}

	log.Printf("read: %s", payload)
	_, err = s.Write(payload)
	return err
}

测试

终端 1

2022/11/06 09:59:38 I am /ip4/127.0.0.1/tcp/8088/p2p/QmVrjAX9QPqihfVFEPJ2apRSUxVCE9wnvqaWanBz2FLY1e
2022/11/06 09:59:38 listening for connections
2022/11/06 09:59:38 Now run "./echo -l 8089 -d /ip4/127.0.0.1/tcp/8088/p2p/QmVrjAX9QPqihfVFEPJ2apRSUxVCE9wnvqaWanBz2FLY1e" on a different terminal
2022/11/06 09:59:55 listener received new stream
2022/11/06 09:59:55 payload has 13 bytes
2022/11/06 09:59:55 read: Hello, world!

终端 2

stephan@mac echo % ./echo -l 8089 -d /ip4/127.0.0.1/tcp/8088/p2p/QmVrjAX9QPqihfVFEPJ2apRSUxVCE9wnvqaWanBz2FLY1e
2022/11/06 09:59:55 I am /ip4/127.0.0.1/tcp/8089/p2p/QmW6iSWiFBG5ugUUwBND14pDZzLDaqSNfxBG6yb8cmL3Di
2022/11/06 09:59:55 sender opening stream
2022/11/06 09:59:55 sender saying hello
2022/11/06 09:59:55 read reply: "Hello, world!"
s

这只是一个相当简单的示例,肯定需要根据实际需求进行定制,但可能是朝着正确方向迈出的第一步。

英文:

ReadString('\n') reads until the first occurrence of \n in the input and returns the string.

io.ReadAll(s) reads until an error or EOF and returns the data it read. So unless an error or EOF occurs it does not return.

In principle, there is no natural size for a data structure to be received on stream-oriented connections.

It depends on the remote sender.

If the remote sender sends binary data and closes the stream after sending the last byte, then you can simply read all data up to the EOF on the receiver side.

If the stream is not to be closed immediately and the data size is variable, there are further possibilities: One first sends a header that has a defined size and in the simplest case simply transmits the length of the data. Once you have received the specified amount of data, you know that this round of reception is complete and you can continue.

Alternatively, you can define a special character that marks the end of the data structure to be transmitted. This will not work if you want to transmit arbitrary binary data without encoding.

There are other options that are a little more complicated, such as splitting the data into blocks.

In the example linked in the question, a \n is sent at the end of the data just sent, but this would not work if you want to send arbitrary binary data.

Adapted Echo Example

In order to minimally modify the echo example linked in the question to first send a 1-byte header with the length of the payload and only then the actual payload, it could look something like the following:

Sending

In the function runSender line one could replace the current sending of the payload from:

	log.Println("sender saying hello")
	_, err = s.Write([]byte("Hello, world!\n"))
	if err != nil {
		log.Println(err)
		return
	}

to

	log.Println("sender saying hello")
	payload := []byte("Hello, world!")
	header := []byte{byte(len(payload))}
	_, err = s.Write(header)
	if err != nil {
		log.Println(err)
		return
	}
	_, err = s.Write(payload)
	if err != nil {
		log.Println(err)
		return
	}

So we send one byte with the length of the payload before the actual payload.

Echo

The doEcho would then read the header first and afterwards the payload. It uses ReadFull, which reads exactly len(payload) bytes.

func doEcho(s network.Stream) error {
	buf := bufio.NewReader(s)
	header, err := buf.ReadByte()
	if err != nil {
		return err
	}

	payload := make([]byte, header)
	n, err := io.ReadFull(buf, payload)
	log.Printf("payload has %d bytes", n)
	if err != nil {
		return err
	}

	log.Printf("read: %s", payload)
	_, err = s.Write(payload)
	return err
}

Test

Terminal 1

2022/11/06 09:59:38 I am /ip4/127.0.0.1/tcp/8088/p2p/QmVrjAX9QPqihfVFEPJ2apRSUxVCE9wnvqaWanBz2FLY1e
2022/11/06 09:59:38 listening for connections
2022/11/06 09:59:38 Now run "./echo -l 8089 -d /ip4/127.0.0.1/tcp/8088/p2p/QmVrjAX9QPqihfVFEPJ2apRSUxVCE9wnvqaWanBz2FLY1e" on a different terminal
2022/11/06 09:59:55 listener received new stream
2022/11/06 09:59:55 payload has 13 bytes
2022/11/06 09:59:55 read: Hello, world!

Terminal 2

stephan@mac echo % ./echo -l 8089 -d /ip4/127.0.0.1/tcp/8088/p2p/QmVrjAX9QPqihfVFEPJ2apRSUxVCE9wnvqaWanBz2FLY1e
2022/11/06 09:59:55 I am /ip4/127.0.0.1/tcp/8089/p2p/QmW6iSWiFBG5ugUUwBND14pDZzLDaqSNfxBG6yb8cmL3Di
2022/11/06 09:59:55 sender opening stream
2022/11/06 09:59:55 sender saying hello
2022/11/06 09:59:55 read reply: "Hello, world!"
s

This is certainly a fairly simple example and will certainly need to be customized to your actual requirements, but could perhaps be a first step in the right direction.

huangapple
  • 本文由 发表于 2022年11月5日 15:50:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/74326022.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定