AsyncRead for UdpSocket

huangapple go评论49阅读模式
英文:

AsyncRead for UdpSocket

问题

I'm trying to implement AsyncRead for a UdpSocket that have an async recv function and I have some difficulties calling poll on my future:

我正在尝试为具有异步recv函数的UdpSocket实现AsyncRead,但在调用我的future上遇到了一些困难:

use async_std::{
io::Read as AsyncRead,
net::UdpSocket,
};

struct MyUdpWrapper {
socket: Arc,
fut: Option<Box<dyn Future<Output = Result>>,
}

impl AsyncRead for MyUdpWrapper {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result> {
let this = Pin::into_inner(self);
let fut = this.fut.unwrap_or(Box::new(this.socket.recv(buf)));
let fut = unsafe { Pin::new_unchecked(&mut *fut) };
fut.poll(cx)
}
}

I was thinking storing the future in an option so it continues to live after the 1st poll but I'm not clear about how to do that exactly.

我考虑将future存储在选项中,以便在第一次轮询后它继续存在,但我不太清楚如何确切地实现这一点。

The async recv() returns a Result so I guess I should store an Option<Box<dyn Future<Output=Result<size>>> but that seems a bit cumbersome?

异步的recv()返回一个Result,所以我猜我应该存储一个Option<Box<dyn Future<Output=Result<size>>>,但这似乎有点繁琐?

Is there a good way to call an async function in poll_read (or poll_* in general)?

有没有一种好的方法在poll_read(或通常的poll_*)中调用异步函数?

英文:

I'm trying to implement AsyncRead for a UdpSocket that have an async recv function and I have some difficulties calling poll on my future:

use async_std::{
    io::Read as AsyncRead,
    net::UdpSocket,
};

struct MyUdpWrapper {
    socket: Arc&lt;UdpSocket&gt;,
    fut: Option&lt;Box&lt;dyn Future&lt;Output = Result&lt;usize&gt;&gt;&gt;&gt;,
}

impl AsyncRead for MyUdpWrapper {                                                                                                                                                                                                         
    fn poll_read(self: Pin&lt;&amp;mut Self&gt;, cx: &amp;mut Context&lt;&#39;_&gt;, buf: &amp;mut [u8]) -&gt; Poll&lt;Result&lt;usize&gt;&gt; {
        let this = Pin::into_inner(self);
        let fut = this.fut.unwrap_or(Box::new(this.socket.recv(buf)));
        let fut = unsafe { Pin::new_unchecked(&amp;mut *fut) };
        fut.poll(cx)
    }
}

I was thinking storing the future in an option so it continues to live after the 1st poll but I'm not clear about how to do that exactly.
The async recv() returns a Result<usize> so I guess I should store an Option&lt;Box&lt;dyn Future&lt;Output=Result&lt;size&gt;&gt;&gt;&gt; but that seems a bit cumbersome?

Is there a good way to call an async function in poll_read (or poll_* in general)?

答案1

得分: 2

你的代码实际上有问题,因为 UdpSocket::recv() 借用了提供的缓冲区,所以在将来被释放之前不能释放缓冲区,但是传递给 poll_read() 的缓冲区是临时的。我还认为你想要的是 this.fut.get_or_insert_with() 而不是 this.fut.unwrap_or()

在底层,async_std::net::UdpSocket::recv() 实际上使用 async_io::Async::<std::net::UdpSocket>::recv(),它调用了 read_with() 方法。

我们需要直接使用 async_io::Async<std::net::UdpSocket>Async<T>T: Read 时已经实现了 AsyncRead。这对于 UdpSocket 不适用,但我们可以编写类似的实现:

use std::{
    future::Future,
    net::UdpSocket,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

use async_io::Async;
use async_std::{io::Read as AsyncRead, io::Result};

struct MyUdpWrapper {
    socket: Arc<Async<UdpSocket>>,
}

impl AsyncRead for MyUdpWrapper {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize>> {
        loop {
            match self.socket.get_ref().recv(buf) {
                // 调用 std::net::UdpSocket::recv()
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
                res => return Poll::Ready(res),
            }
            match Pin::new(&mut self.socket.readable()).poll(cx) {
                Poll::Ready(res) => res?,
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}

(注意:我不太清楚为什么 async_io::Async::poll_read() 使用 poll_readable()async_std::net::UdpSocket::recv() 使用 readable();似乎只有后者为当前任务注册唤醒,所以我选择了这个方法)。

英文:

Your code is actually unsound since UdpSocket::recv() borrows the provided buffer, so you can't release the buffer until the future is dropped, but the buffer given to poll_read() is only temporary. I also think you want something more like this.fut.get_or_insert_with() instead of this.fut.unwrap_or().

Under the hood, async_std::net::UdpSocket::recv() actually uses async_io::Async::&lt;std::net::UdpSocket&gt;::recv(), which calls the read_with() method.

We need to use async_io::Async&lt;std::net::UdpSocket&gt; directly. Async&lt;T&gt; already implements AsyncRead when T: Read. This is not the case for UdpSocket, but we can write a similar implementation:

use std::{
    future::Future,
    net::UdpSocket,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

use async_io::Async;
use async_std::{io::Read as AsyncRead, io::Result};

struct MyUdpWrapper {
    socket: Arc&lt;Async&lt;UdpSocket&gt;&gt;,
}

impl AsyncRead for MyUdpWrapper {
    fn poll_read(
        self: Pin&lt;&amp;mut Self&gt;,
        cx: &amp;mut Context&lt;&#39;_&gt;,
        buf: &amp;mut [u8],
    ) -&gt; Poll&lt;Result&lt;usize&gt;&gt; {
        loop {
            match self.socket.get_ref().recv(buf) {
                // Calls std::net::UdpSocket::recv()
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock =&gt; {}
                res =&gt; return Poll::Ready(res),
            }
            match Pin::new(&amp;mut self.socket.readable()).poll(cx) {
                Poll::Ready(res) =&gt; res?,
                Poll::Pending =&gt; return Poll::Pending,
            }
        }
    }
}

(Note: it's a bit unclear to me why async_io::Async::poll_read() uses poll_readable() but async_std::net::UdpSocket::recv() uses readable(); it seems that only the latter registers the current task for wakeup, so that's what I chose here).

huangapple
  • 本文由 发表于 2023年3月21日 03:40:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/75794604-2.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定