处理在.NET中使用Websockets时接收多个消息/多个块的情况

huangapple go评论85阅读模式
英文:

Handling receiving multiple messages / multiple chunks when using websockets in .NET

问题

我正在实现一个WebSocket服务器,并使用二进制数据(而不是文本)在服务器和客户端之间进行通信。当等待接收消息然后处理时,我是否需要关心消息以多个分块接收以及在使用以下代码时同时接收两条消息?

var buffer = new byte[256];
var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

如果是的话,我将感激能够看到一个最佳实践代码,该代码在一次只处理一条消息的情况下正确处理接收消息。

我使用以下代码来处理多个分块的消息,但不确定是否过于复杂。

while (socket.State == WebSocketState.Open)
{
    var buffer = new byte[256];
    await using var dataStream = new MemoryStream();
    WebSocketReceiveResult result;
    do
    {
        result = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Close)
        {
            await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
            return;
        }
        await dataStream.WriteAsync(buffer);
    } while (!result.EndOfMessage);
    await Process(dataStream.ToArray()); // Process函数无法检测多个消息,只能一次处理一条消息
    dataStream.SetLength(0); // 清除内存流数据
}

注意: 以上为您提供的代码的翻译,没有其他内容。

英文:

I'm implementing a websocket server and using binary data (not text) to communicate between server and clients. When waiting for a message to receive and then process should I care about a message received in multiple chunks and also two messages receives at the same time when I use following code?

var buffer = new byte[256];
var result = await webSocket.ReceiveAsync(new ArraySegment&lt;byte&gt;(buffer), CancellationToken.None);

*I'm sure all messages sent by (valid) clients/server are fitted within 256 bytes long buffer and EndOfMessage flag is set to true when using webSocket.SendAsync();

If yes, I would be grateful to see a best practice code that handles receiving messages properly when there is a "await Process(byte[] data)" method that handles only one message at a time.

I used following code to handle messages with multiple chunks but not sure whether it is overengineering or not.

while (socket.State == WebSocketState.Open)
{
    var buffer = new byte[256];
    await using var dataStream = new MemoryStream();
    WebSocketReceiveResult result;
    do
    {
        result = await socket.ReceiveAsync(new ArraySegment&lt;byte&gt;(buffer), CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Close)
        {
            await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, &quot;&quot;, CancellationToken.None);
            return;
        }
        await dataStream.WriteAsync(buffer);
    } while (!result.EndOfMessage);
    await Process(dataStream.ToArray()); //Process functions do not detect multiple messages, and just can process one message at a time
    dataStream.SetLength(0); //Clear memory stream data
}

答案1

得分: 1

对于最佳实践的代码,我会感到非常感激,它处理了正确接收消息的方式。

不确定最佳实践,但关于上面的评论,我会进行一些重构:

var buffer = new Memory<byte>(new byte[256]);

while (socket.State == WebSocketState.Open)
{
    ValueWebSocketReceiveResult result;
    do
    {
        result = await socket.ReceiveAsync(buffer, CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Close)
        {
            await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
            return;
        }            
    } 
    while (!result.EndOfMessage);        
    // 处理函数无法检测多个消息,只能逐一处理一条消息
    await Process(buffer[..result.Count]); // 对于 Memory<byte> 来说,范围运算符相当便宜         
}

所做的更改:

  1. byte[] 替换为 Memory<byte>(),原因是 Range 运算符的使用(buffer[..result.Count])不会在该类型上引发内存分配。

  2. 使用了 socket.ReceiveAsync() 的另一个重载,返回 ValueTask<WebSocketReceiveResult>(而不是 Task<>),因此每次调用可以节省 8 字节的内存分配,据我所知。

  3. 如果消息不恰好为 256 大小,但在该范围内,我们必须考虑 result.Count

  4. 有趣的部分:由于数据是顺序处理的,我们可以继续使用 Memory<byte>,根本不需要使用 MemoryStream

    4.2. 如果我们想要为这段逻辑获得更高的带宽,我们不能摆脱 MemoryStream 并且必须使用额外的分配来复制数据以供 Process(..) 方法调用。
    代码会有一些不同:

    var buffer = new Memory<byte>(new byte[256]);
    await using var dataStream = new MemoryStream();
    
    while (socket.State == WebSocketState.Open)
    {
        ValueWebSocketReceiveResult result;
        do
        {
            result = await socket.ReceiveAsync(buffer, CancellationToken.None);
            if (result.MessageType == WebSocketMessageType.Close)
            {
                await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
                return;
            }
            await dataStream.WriteAsync(buffer[..result.Count]);
        } while (!result.EndOfMessage);
        // 异步处理,因此我们不等待,但必须在其中处理所有分支:正常/错误/取消
        // 必须为数据复制进行额外的分配
        _ = Process(dataStream.ToArray());
        dataStream.SetLength(0); // 清除内存流数据
    }
    

PS:如果您知道消息大小(在这种简单情况下 - 您的协议),可以摆脱 while (!result.EndOfMessage); 指令。这将相当不可靠,但只是为了指出最佳带宽情况。

英文:

> I would be grateful to see a best practice code that handles receiving messages properly

Not sure about best practice, but regarding comments above i would make some refactoring:

var buffer = new Memory&lt;byte&gt;(new byte[256]);    

while (socket.State == WebSocketState.Open)
{
    ValueWebSocketReceiveResult result;
    do
    {
        result = await socket.ReceiveAsync(buffer, CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Close)
        {
            await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, &quot;&quot;, CancellationToken.None);
            return;
        }            
    } 
    while (!result.EndOfMessage);        
    //Process functions do not detect multiple messages, and just can process one message at a time
    await Process(buffer[..result.Count]); // range op for Memory&lt;byte&gt; is quite cheap         
}

So, changes made:

  1. byte[] replaced with Memory&lt;byte&gt;() for reason Range operator usages (buffer[..result.Count]) will not cause memory allocations on that type.

  2. Other overload of socket.ReceiveAsync() is used returning ValueTask&lt;WebSocketReceiveResult&gt; (not Task&lt;&gt;), so, 8 bytes economy per call for memory allocations, afaik.

  3. If messages are not exactly of 256 size, but falls within that range, we must consider result.Count

  4. Interesting part: as data is handled sequentially we can use Memory<byte> further, and no need in MemoryStream at all.

    4.2. If we wanna get more bandwidth for this piece of logic, we can't get rid of MemoryStream and have to use extra allocation to copy data for Process(..) method call.
    Code would be a bit different:

    var buffer = new Memory&lt;byte&gt;(new byte[256]);
    await using var dataStream = new MemoryStream();
    
    while (socket.State == WebSocketState.Open)
    {
        ValueWebSocketReceiveResult result;
        do
        {
            result = await socket.ReceiveAsync(buffer, CancellationToken.None);
            if (result.MessageType == WebSocketMessageType.Close)
            {
                await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, &quot;&quot;, CancellationToken.None);
                return;
            }
            await dataStream.WriteAsync(buffer[..result.Count]);
        } while (!result.EndOfMessage);
        //Process asynchronously, so we don&#39;t wait 
        //but have to handle all of the branches there: normal/error/cancelled
        // have to make extra allocation for data copy
        _ = Process(dataStream.ToArray());
        dataStream.SetLength(0); //Clear memory stream data
    }
    

PS: If you know your message sizes (in this simple case - your protocol), it is possible to get rid of while (!result.EndOfMessage); instruction. It would be quite unreliable, but just to point out for the best bandwidth case.

huangapple
  • 本文由 发表于 2023年7月27日 20:26:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/76779732.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定