为什么在 macOS/iOS 切换 WiFi 网络时,reqwest 的响应会挂起?

huangapple go评论60阅读模式
英文:

Why does a reqwest Response hang when switching WiFi networks on macOS / iOS?

问题

我构建了一个概念验证,在切换到蜂窝网络、WiFi网络或有线网络时,tokio::io::copy将永远挂起,如果读取器是reqwest::async_impl::Response,并包装在使用FuturesAsyncReadCompatExttokio_io::AsyncRead中。

这种情况发生在我可以访问的macOS和iOS平台上。

#[tokio::main()]
async fn main() {
    let mut target_file = std::env::current_dir().unwrap();
    target_file.push("bbb.mp4");
    println!("File will be downloaded to {target_file:?}");
    let client = ClientBuilder::default()
        // 似乎没有帮助
        .tcp_keepalive(Some(Duration::from_secs(1)))
        // 似乎没有帮助
        .connect_timeout(Duration::from_secs(1))
        .build()
        .unwrap();
    let response = client.get("http://distribution.bbb3d.renderfarming.net/video/mp4/bbb_sunflower_native_60fps_stereo_abl.mp4").send().await.unwrap();
    match response_to_file(response, target_file).await {
        Ok(_) => println!("Everything OK"),
        Err(err) => eprintln!("{err}"),
    }
}

async fn response_to_file(response: Response, path: PathBuf) -> Result<(), ApiError> {
    let download = response.bytes_stream();

    let download = download
        .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
        .into_async_read();

    let download = download.compat();

    // 包装下载以能够在终端中获取进度
    let mut download = ProgressReadAdapter::new(download);

    let temp_file = tokio::task::spawn_blocking(NamedTempFile::new)
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    let mut outfile = tokio::fs::File::create(temp_file.path())
        .await
        .wrap_api_err()?;

    // 在这里,代码在网络切换后永远挂起
    tokio::io::copy(&mut download, &mut outfile)
        .await
        .wrap_api_err()?;

    outfile.flush().await.wrap_api_err()?;

    let _persisted_file: File = tokio::task::spawn_blocking(move || temp_file.persist(path))
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    Ok(())
}

上述代码中有一些概念,比如GitHub上可以找到的wrap_api_err,但我认为它们对于分析问题并不重要。

主要问题是 - 如何使response_to_file在切换网络后以Err退出?

第二个问题可能是 - 如果没有简单的方法来修复这段代码,如何使对网络资源的流式复制到临时文件在出现错误时实际上能够干净地退出?

英文:

I have constructed a proof-of-concept where tokio::io::copy will hang forever when switching between Cellular / WiFi / Wired networks if the reader is a reqwest::async_impl::Response wrapped in a tokio_io::AsyncRead using FuturesAsyncReadCompatExt.

This happens on macOS and iOS, which are the platforms I have access to.

#[tokio::main()]
async fn main() {
    let mut target_file = std::env::current_dir().unwrap();
    target_file.push(&quot;bbb.mp4&quot;);
    println!(&quot;File will be downloaded to {target_file:?}&quot;);
    let client = ClientBuilder::default()
        // Doesn&#39;t seem to help
        .tcp_keepalive(Some(Duration::from_secs(1)))
        // Doesn&#39;t seem to help
        .connect_timeout(Duration::from_secs(1))
        .build()
        .unwrap();
    let response = client.get(&quot;http://distribution.bbb3d.renderfarming.net/video/mp4/bbb_sunflower_native_60fps_stereo_abl.mp4&quot;).send().await.unwrap();
    match response_to_file(response, target_file).await {
        Ok(_) =&gt; println!(&quot;Everything OK&quot;),
        Err(err) =&gt; eprintln!(&quot;{err}&quot;),
    }
}

async fn response_to_file(response: Response, path: PathBuf) -&gt; Result&lt;(), ApiError&gt; {
    let download = response.bytes_stream();

    let download = download
        .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
        .into_async_read();

    let download = download.compat();

    // Wrap download to be able to get progress in terminal
    let mut download = ProgressReadAdapter::new(download);

    let temp_file = tokio::task::spawn_blocking(NamedTempFile::new)
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    let mut outfile = tokio::fs::File::create(temp_file.path())
        .await
        .wrap_api_err()?;

    // Code hangs here forever after a network switch
    tokio::io::copy(&amp;mut download, &amp;mut outfile)
        .await
        .wrap_api_err()?;

    outfile.flush().await.wrap_api_err()?;

    let _persisted_file: File = tokio::task::spawn_blocking(move || temp_file.persist(path))
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    Ok(())
}

There are some concepts in the code above, such as wrap_api_err that can be found on GitHub, but I don't think they're important for analyzing the problem.

The main question is - How can I make response_to_file exit with an Err after switching networks?

The second question might be - If there is no easy way to fix this code, how do I make a streaming copy of a network resource to a temp file that actually exits cleanly when there is an error?

答案1

得分: 1

能够得出一些结论。这个问题在curl的GitHub页面上使我相信问题不是reqwest,而是macOS/iOS上的底层网络堆栈。

我在seanmonstar/reqwest的这个问题上提问,Sean回答说已经有一个类似想法的问题(在reqwest中低/无活动超时)。基本上我认为正在发生的是网络堆栈有我的未完成响应,并尝试从底层TCP连接读取更多数据,即使原始的WiFi连接已经“断开”。根据curl讨论,这只是发生的事情,TCP/HTTP并没有错,所以客户端库实际上无法检测到。

客户端库_可以_检测到响应中没有数据到达(至少在reqwest的情况下)。目前,reqwest没有内置此功能,但可以通过一些工作来模拟。

使用这个StackOverflow答案作为起点,我构建了一个AsyncRead包装器,用于检测停滞的响应,并在经过一定时间后以错误的形式干净地退出。

完整的代码可以在我的GitHub仓库bes/network-switch-hang上找到,该仓库最初是漏洞概念的仓库,但现在也是一个答案。

出于完整性考虑,这里是代码的最重要部分,至少在reqwest增加本地检测停滞响应的方法之前是这样的。

main.rs

#[tokio::main()]
async fn main() {
    let mut target_file = std::env::current_dir().unwrap();
    target_file.push("bbb.mp4");
    println!("File will be downloaded to {target_file:?}");
    let client = ClientBuilder::default()
        .connect_timeout(Duration::from_secs(5))
        .build()
        .unwrap();
    let response = client.get("http://distribution.bbb3d.renderfarming.net/video/mp4/bbb_sunflower_native_60fps_stereo_abl.mp4").send().await.unwrap();
    match response_to_file(response, target_file).await {
        Ok(_) => println!("Everything OK"),
        Err(err) => eprintln!("{err}"),
    }
}

async fn response_to_file(response: Response, path: PathBuf) -> Result<(), ApiError> {
    let download = response.bytes_stream();

    let download = download
        .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
        .into_async_read();

    let download = download.compat();

    // Wrap download to be able to detect stalled downloads
    let mut download = StalledReadMonitor::new(download);

    let temp_file = tokio::task::spawn_blocking(NamedTempFile::new)
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    let mut outfile = tokio::fs::File::create(temp_file.path())
        .await
        .wrap_api_err()?;

    // Code used to hang here, but will now exit with an error after being stalled for
    // more than 5 seconds. See StalledReadMonitor for details.
    tokio::io::copy(&mut download, &mut outfile)
        .await
        .wrap_api_err()?;

    outfile.flush().await.wrap_api_err()?;

    let _persisted_file: File = tokio::task::spawn_blocking(move || temp_file.persist(path))
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    Ok(())
}

stalled_monitor.rs

/// This monitor can wrap an [AsyncRead] and make sure that it is making progress.
/// If the inner reader isn't making progress, we can stop the download.
/// The monitoring is done by keeping an [Interval] and measuring progress
/// by counting the number of bytes during each interval.
///
/// Please note that this monitor won't stop the download after _exactly_
/// five seconds of inactivity, but rather five seconds after the last interval
/// that had data. So the worst case is 10 seconds, and the average will be 7.5 seconds.
#[pin_project]
pub struct StalledReadMonitor<R: AsyncRead> {
    #[pin]
    inner: R,
    interval: Interval,
    interval_bytes: usize,
}

impl<R: AsyncRead> StalledReadMonitor<R> {
    pub fn new(inner: R) -> Self {
        Self {
            inner,
            interval: interval_at(
                Instant::now().add(Duration::from_millis(5_000)),
                Duration::from_millis(5_000),
            ),
            interval_bytes: 0,
        }
    }
}

impl<R: AsyncRead> AsyncRead for StalledReadMonitor<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<()>> {
        let this = self.project();

        let before = buf.filled().len();
        let mut result = this.inner.poll_read(cx, buf);
        let after = buf.filled().len();

        *this.interval_bytes += after - before;
        match this.interval.poll_tick(cx) {
            Poll::Pending => {}
            Poll::Ready(_) => {
                if *this.interval_bytes == 0 {
                    println!("Rate is too low, aborting fetch");
                    result = Poll::Ready(Err(std::io::Error::new(
                        ErrorKind::TimedOut,
                        StalledError {},
                    )))
                }
                *this.interval_bytes = 0;
            }
        };
        result
    }
}
英文:

I was able to finally reach some conclusions.

This issue on curl's GitHub page led me to believe that the reason was not reqwest but rather the underlying network stack on macOS / iOS.

I asked this question on seanmonstar/reqwest, which was answered by Sean stating that there was already an issue for a similar idea (low/no-activity timeouts in reqwest).

Basically what I believe is happening is that the network stack has my outstanding response and tries to keep reading more data from the underlying TCP connection even though the original WiFi connection has been "disconnected". As per the curl discussion, this is a thing that just happens and TCP / HTTP isn't at fault so it can't really be detected by client libraries.

What client libraries can do is to detect that there is no data coming into the Response (in the case of reqwest, at least). Currently, reqwest doesn't have this functionality built in, but it can be emulated with a little bit of work.

Using this StackOverflow answer as a starting point I built an AsyncRead wrapper that detects a stalled Response and exits cleanly with an error after a given time has elapsed.

The full code can be found on my GitHub repo bes/network-switch-hang, which was originally the repo for the bug proof-of-concept, but is now also an answer.

For completeness, here are the most important parts of the code, at least until reqwest grows a native way of detecting stalled Responses.

main.rs

#[tokio::main()]
async fn main() {
    let mut target_file = std::env::current_dir().unwrap();
    target_file.push(&quot;bbb.mp4&quot;);
    println!(&quot;File will be downloaded to {target_file:?}&quot;);
    let client = ClientBuilder::default()
        .connect_timeout(Duration::from_secs(5))
        .build()
        .unwrap();
    let response = client.get(&quot;http://distribution.bbb3d.renderfarming.net/video/mp4/bbb_sunflower_native_60fps_stereo_abl.mp4&quot;).send().await.unwrap();
    match response_to_file(response, target_file).await {
        Ok(_) =&gt; println!(&quot;Everything OK&quot;),
        Err(err) =&gt; eprintln!(&quot;{err}&quot;),
    }
}

async fn response_to_file(response: Response, path: PathBuf) -&gt; Result&lt;(), ApiError&gt; {
    let download = response.bytes_stream();

    let download = download
        .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
        .into_async_read();

    let download = download.compat();

    // Wrap download to be able to detect stalled downloads
    let mut download = StalledReadMonitor::new(download);

    let temp_file = tokio::task::spawn_blocking(NamedTempFile::new)
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    let mut outfile = tokio::fs::File::create(temp_file.path())
        .await
        .wrap_api_err()?;

    // Code used to hang here, but will now exit with an error after being stalled for
    // more than 5 seconds. See StalledReadMonitor for details.
    tokio::io::copy(&amp;mut download, &amp;mut outfile)
        .await
        .wrap_api_err()?;

    outfile.flush().await.wrap_api_err()?;

    let _persisted_file: File = tokio::task::spawn_blocking(move || temp_file.persist(path))
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    Ok(())
}

stalled_monitor.rs

/// This monitor can wrap an [AsyncRead] and make sure that it is making progress.
/// If the inner reader isn&#39;t making progress, we can stop the download.
/// The monitoring is done by keeping an [Interval] and measuring progress
/// by counting the number of bytes during each interval.
///
/// Please note that this monitor won&#39;t stop the download after _exactly_
/// five seconds of inactivity, but rather five seconds after the last interval
/// that had data. So the worst case is 10 seconds, and the averge will be 7.5 seconds.
#[pin_project]
pub struct StalledReadMonitor&lt;R: AsyncRead&gt; {
    #[pin]
    inner: R,
    interval: Interval,
    interval_bytes: usize,
}

impl&lt;R: AsyncRead&gt; StalledReadMonitor&lt;R&gt; {
    pub fn new(inner: R) -&gt; Self {
        Self {
            inner,
            interval: interval_at(
                Instant::now().add(Duration::from_millis(5_000)),
                Duration::from_millis(5_000),
            ),
            interval_bytes: 0,
        }
    }
}

impl&lt;R: AsyncRead&gt; AsyncRead for StalledReadMonitor&lt;R&gt; {
    fn poll_read(
        self: Pin&lt;&amp;mut Self&gt;,
        cx: &amp;mut Context&lt;&#39;_&gt;,
        buf: &amp;mut ReadBuf&lt;&#39;_&gt;,
    ) -&gt; Poll&lt;Result&lt;()&gt;&gt; {
        let this = self.project();

        let before = buf.filled().len();
        let mut result = this.inner.poll_read(cx, buf);
        let after = buf.filled().len();

        *this.interval_bytes += after - before;
        match this.interval.poll_tick(cx) {
            Poll::Pending =&gt; {}
            Poll::Ready(_) =&gt; {
                if *this.interval_bytes == 0 {
                    println!(&quot;Rate is too low, aborting fetch&quot;);
                    result = Poll::Ready(Err(std::io::Error::new(
                        ErrorKind::TimedOut,
                        StalledError {},
                    )))
                }
                *this.interval_bytes = 0;
            }
        };
        result
    }
}

huangapple
  • 本文由 发表于 2023年3月12日 16:38:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/75711940.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定