最佳方法来跨多个异步调用设置超时?

huangapple go评论50阅读模式
英文:

Best way to timeout across multiple async calls?

问题

这不是在询问如何将同时进行的多个异步调用在单个超时内封装起来,就像这样:

tokio::time::timeout(
    Duration::from_millis(10), 
    tokio::join!(call1, call2, ...)
).await

上下文是我有一个包含一系列异步调用的异步函数,它可以处于两种状态之一,根据这些调用的返回值:A 和非-A。

async fn foo() {
    // 布尔值(在状态 A 时为 true,在其他情况下为 false)
    let mut state_a = true;
    loop {
        state_a = call_1().await;  // 为了简单起见,结果都是布尔值
        state_a = call_2().await;
        ...
        state_a = call_n().await;
    }
}

在状态 A 时,一切都是稳定的。但当处于非-A 状态时,必须在截止时间内返回到状态 A(例如 5 秒),如果时间耗尽,则必须继续(返回到循环的开头)。

要在代码中添加计时器逻辑以实现上述目标的最干净方式是什么?我能想到的唯一方法是在将每个调用封装在其中的 5 秒后设置一个 tokio::time::Timeout。但是,我必须在进入非-A 状态时每次启动它,还必须检查我们是否处于状态 A,以决定是否进行封装。代码很快变得令人惊讶地凌乱。有更简洁的方法吗?

英文:

This is not asking how to wrap multiple async calls made at the same time in a single timeout like this:

tokio::time::timeout(
    Duration::from_millis(10), 
    tokio::join!(call1, call2, ...)
).await

The context is I have an async fn consisting of a loop which makes a series of async calls and it can be in one of two states based on the return values of those calls: A and not-A.

async fn foo() {
    // boolean (true when in state A, false otherwise)
    let mut state_a = true;
    loop {
        state_a = call_1().await;  // for simplicity the results are all bools
        state_a = call_2().await;
        ...
        state_a = call_n().await;
    }
}

When in state A, everything is stable. But when in state not-A, there is a deadline (e.g. 5 seconds) to get back into state A, and if the time elapses then we must continue (return to start of loop).

What's the cleanest way to add timer logic to this code to achieve the above? All I can think of is setting a tokio::time::Timeout 5 seconds in the future which every call is wrapped in. But then I have to initiate it every time state not-A is entered and also check whether we're in state A or not at every call to decide whether to wrap or not, the code quickly gets surprisingly messy. Is there a cleaner way?

答案1

得分: 2

这是一个使用常规控制流的示例。状态存储在timeout中,其中None表示A,Some表示非A。 (playground)

use core::future::Future;
use core::time::Duration;
use tokio::time::timeout_at;
use tokio::time::Instant;

pub async fn foo() {
    let mut timeout: Option<Instant> = None;
    loop {
        if !run_and_update_timeout(&mut timeout, call_1()).await {
            continue;
        }
        if !run_and_update_timeout(&mut timeout, call_2()).await {
            continue;
        }
        if !run_and_update_timeout(&mut timeout, call_n()).await {
            continue;
        }
    }
}

/// Returns true when the next future should run, returns false when the loop should be continued.
async fn run_and_update_timeout<F>(timeout: &mut Option<Instant>, future: F) -> bool
where
    F: Future<Output = bool>,
{
    if let &mut Some(t) = timeout {
        // 在状态 !A 下,超时激活
        let call_or_timeout = timeout_at(t, future).await;
        match call_or_timeout {
            // 未来首先完成并成功,清除超时
            Ok(true) => *timeout = None,
            // 未来完成并失败,继续下一个未来
            Ok(false) => (),
            // 超时首先完成,继续循环
            Err(_) => {
                *timeout = None;
                return false;
            }
        }
    } else {
        // 在状态 A 下,没有超时激活
        if !future.await {
            // 如果这个未来失败,激活超时
            *timeout = Some(Instant::now() + Duration::from_secs(5));
        }
    }
    true
}

与Chayim的方法相比,这个方法的优点是每个未来只检查一次状态,而不是在每个Pending上进行检查,但作为交换条件,状态在其中一个未来内部无法更改。

在你的问题中没有指定继续时状态应该发生什么变化。我编写了这个代码以将状态重置为A,但你可能想保留非A并只重置计时器。

英文:

Here's one that uses regular control flow. The state is stored in timeout, with None representing A and Some representing not-A. (playground)

use core::future::Future;
use core::time::Duration;
use tokio::time::timeout_at;
use tokio::time::Instant;

pub async fn foo() {
    let mut timeout: Option&lt;Instant&gt; = None;
    loop {
        if !run_and_update_timeout(&amp;mut timeout, call_1()).await {
            continue;
        }
        if !run_and_update_timeout(&amp;mut timeout, call_2()).await {
            continue;
        }
        if !run_and_update_timeout(&amp;mut timeout, call_n()).await {
            continue;
        }
    }
}

/// Returns true when the next future should run, returns false when the loop should be continued.
async fn run_and_update_timeout&lt;F&gt;(timeout: &amp;mut Option&lt;Instant&gt;, future: F) -&gt; bool
where
    F: Future&lt;Output = bool&gt;,
{
    if let &amp;mut Some(t) = timeout {
        // In state !A, timeout active
        let call_or_timeout = timeout_at(t, future).await;
        match call_or_timeout {
            // Future completed first and succeeded, clear timeout
            Ok(true) =&gt; *timeout = None,
            // Future completed and failed, move on to next future
            Ok(false) =&gt; (),
            // Timeout finished first, continue the loop
            Err(_) =&gt; {
                *timeout = None;
                return false;
            }
        }
    } else {
        // In state A, no timeout active
        if !future.await {
            // If this future fails, activate timeout
            *timeout = Some(Instant::now() + Duration::from_secs(5));
        }
    }
    true
}

Compared to Chayim's, this one has the advantage of only checking the state once per future instead of on every Pending, but in exchange, the state can't be changed while inside one of the futures.

One thing that wasn't specified in your question is what should happen to the state when continuing. I wrote this to reset the state to A, but you might've wanted to keep not-A and just reset the timer.

答案2

得分: 1

通过编写一个未来包装器,你几乎可以完成任何任务的魔法:

use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll};
use std::time::Duration;

use tokio::time::{Instant, Sleep};

pin_project_lite::pin_project! {
    pub struct NonATimeout<'a, Fut> {
        #[pin]
        fut: Fut,
        #[pin]
        sleep: Sleep,
        state: &'a AtomicBool,
        was_in_a: bool,
    }
}

impl<'a, Fut> NonATimeout<'a, Fut> {
    pub fn new(fut: Fut, state: &'a AtomicBool) -> Self {
        Self {
            fut,
            sleep: tokio::time::sleep(Duration::ZERO),
            state,
            was_in_a: true,
        }
    }
}

impl<Fut: Future<Output = ()>> Future for NonATimeout<'_, Fut> {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();

        if this.fut.poll(cx).is_ready() {
            return Poll::Ready(());
        }

        if !this.state.load(Ordering::Relaxed) {
            // Not in A.
            if *this.was_in_a {
                this.sleep.as_mut().reset(Instant::now() + Duration::from_secs(5));
                *this.was_in_a = false;
            }
            if this.sleep.poll(cx).is_ready() {
                // Cancel the work.
                return Poll::Ready(());
            }
        }

        Poll::Pending
    }
}

然后像这样使用它:

async fn foo() {
    let state_a = AtomicBool::new(true);
    loop {
        let do_work = async {
            state_a.store(call_1().await, Ordering::Relaxed);
            state_a.store(call_2().await, Ordering::Relaxed);
            // ...
            state_a.store(call_n().await, Ordering::Relaxed);
        };
        let timeout = NonATimeout::new(do_work, &state_a);
        timeout.await;
    }
}

如果状态类型更复杂,你可以使用 Mutex。它永远不会争用,因为我们从不并行地获取它。从技术上讲,你可以使用自己的非原子 Cell 包装器,但这需要使用不安全的代码。

英文:

You can do almost anything by the magic of writing a future wrapper:

use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll};
use std::time::Duration;

use tokio::time::{Instant, Sleep};

pin_project_lite::pin_project! {
    pub struct NonATimeout&lt;&#39;a, Fut&gt; {
        #[pin]
        fut: Fut,
        #[pin]
        sleep: Sleep,
        state: &amp;&#39;a AtomicBool,
        was_in_a: bool,
    }
}

impl&lt;&#39;a, Fut&gt; NonATimeout&lt;&#39;a, Fut&gt; {
    pub fn new(fut: Fut, state: &amp;&#39;a AtomicBool) -&gt; Self {
        Self {
            fut,
            sleep: tokio::time::sleep(Duration::ZERO),
            state,
            was_in_a: true,
        }
    }
}

impl&lt;Fut: Future&lt;Output = ()&gt;&gt; Future for NonATimeout&lt;&#39;_, Fut&gt; {
    type Output = ();

    fn poll(self: Pin&lt;&amp;mut Self&gt;, cx: &amp;mut Context&lt;&#39;_&gt;) -&gt; Poll&lt;Self::Output&gt; {
        let mut this = self.project();

        if this.fut.poll(cx).is_ready() {
            return Poll::Ready(());
        }

        if !this.state.load(Ordering::Relaxed) {
            // Not in A.
            if *this.was_in_a {
                this.sleep.as_mut().reset(Instant::now() + Duration::from_secs(5));
                *this.was_in_a = false;
            }
            if this.sleep.poll(cx).is_ready() {
                // Cancel the work.
                return Poll::Ready(());
            }
        }

        Poll::Pending
    }
}

Then using it like:

async fn foo() {
    let state_a = AtomicBool::new(true);
    loop {
        let do_work = async {
            state_a.store(call_1().await, Ordering::Relaxed);
            state_a.store(call_2().await, Ordering::Relaxed);
            // ...
            state_a.store(call_n().await, Ordering::Relaxed);
        };
        let timeout = NonATimeout::new(do_work, &amp;state_a);
        timeout.await;
    }
}

If the state type is more complicated, you can use a Mutex. It will never be contended because we never acquire it in parallel. Technically, you can use your own non-atomic Cell wrapper for that, but that requires unsafe code.

huangapple
  • 本文由 发表于 2023年7月7日 00:52:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/76631005.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定