Rust warp服务器在传递具有泛型的处理程序时出现问题。

huangapple go评论48阅读模式
英文:

Rust warp server issue when passing a handler with a generic

问题

It appears that you're encountering an error related to trait bounds and generics in your Rust code. Specifically, the error message mentions that the and_then method cannot be called on a certain struct due to unsatisfied trait bounds.

This issue is likely related to how you're using the warp library with generics and traits. To resolve this, you should ensure that the types you use in your warp filters and closures satisfy the necessary trait bounds.

Unfortunately, diagnosing the exact issue without a complete context of your code and its dependencies can be challenging. To debug similar issues in the future, consider these steps:

  1. Carefully review the documentation and type requirements of the libraries you're using, especially warp. Ensure that you're using them correctly with generics and trait bounds.

  2. Use Rust's type inference system to your advantage. Rust often provides informative error messages that point to the exact issues with type constraints. Read these error messages carefully and try to understand what they are indicating.

  3. Experiment with simplified versions of your code to isolate the problem. Start with a minimal working example and incrementally add complexity until you encounter the error. This can help pinpoint the source of the issue.

  4. Consult the documentation and community resources for the libraries you're using. Other developers may have faced similar issues and provided solutions or workarounds.

Remember that debugging complex type-related errors in Rust can be challenging, but with patience and careful examination of your code and dependencies, you can usually identify and resolve the problem.

英文:

main.rs:

use async_trait::async_trait;
use tokio::runtime::Runtime;
use warp::Filter;

fn main() {
    //create http server

    let state = CState {
        inner: 2
    };

    Runtime::new().unwrap().block_on(async move {
        run_server(state.clone()).await;
    });
}

trait TaskType {
    fn create_task(id: Option<u64>, name: &str, time: u64) -> Self;
}

#[async_trait]
trait State<T: TaskType>: Clone {
    async fn add_task(&self, task: T) -> u64;
}

#[derive(Clone)]
struct CState {
    inner: u64,
}

#[derive(Clone)]
struct CTask {
    inner: u64,
}

impl TaskType for CTask {
    fn create_task(id: Option<u64>, name: &str, time: u64) -> Self {
        CTask{
            inner: 2
        }
    }
}

#[async_trait]
impl<T: TaskType> State<T> for CState {
    async fn add_task(&self, task: T) -> u64 {
        self.inner
    }
}


async fn run_server<T: TaskType, U: State<T>>(state: U) {
    let warp_state = warp::any().map(move || {
        state.clone()
    });
    async fn post_new_task_handler<T: TaskType, U: State<T>>(task_type_str: String, time: u64, state: U) -> Result<impl warp::Reply, warp::Rejection> {
        let task = T::create_task(None, task_type_str.as_str(), time);
        let id = state.add_task(task).await;
        Ok(warp::reply::json(&id))
    }
    let post_new_task_route = warp::post()
        .and(warp::path!(String / u64))
        .and(warp::path::end())
        .and(warp_state.clone())
        .and_then(post_new_task_handler::<T, U>);
    let router = post_new_task_route;
    warp::serve(router).run(([127, 0, 0, 1], 3030)).await;
}

Cargo.toml:

[package]
name = "minimal_warp"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["full"] }
warp = "0.3"
async-trait = "0.1.68"
serde = "*"
serde_json = "*"
serde_derive = "*"

For the above code I get an error from the rust compiler which seems to indicate that I'm missing some trait bounds for the generic. However which bound I need to implement is not clear. If I write a non-generic version of the code it works. What's going on with warp here? What's the proper way to fix it and how would I diagnose such issues in the future?

To be clear I think it's not the route handlers that have an issue but more the closure where I pass in the state.

Also note I have an exact version of this code working with a concrete type instead of traits, so I know it's a problem with the traits.

The exact error message is shown below:

error[E0599]: the method `and_then` exists for struct `warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>`, but its trait bounds were not satisfied
--> src/main.rs:65:10
|
65 |         .and_then(post_new_task_handler::<T, U>);
|          ^^^^^^^^ method cannot be called on `warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>` due to unsatisfied trait bounds
|
::: /home/brian/.cargo/registry/src/github.com-1ecc6299db9ec823/warp-0.3.5/src/filter/and.rs:13:1
|
13 | pub struct And<T, U> {
| --------------------
| |
| doesn't satisfy `_: warp::Filter`
| doesn't satisfy `_: warp::filter::FilterBase`
|
= note: the following trait bounds were not satisfied:
`warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::filter::FilterBase`
which is required by `warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::Filter`
`&warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::filter::FilterBase`
which is required by `&warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::Filter`
`&mut warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::filter::FilterBase`
which is required by `&mut warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>, warp::filter::and::And<warp::filter::and::And<warp::filter::and::And<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (String,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (u64,), Error = Rejection>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>>, impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Rejection>>, warp::filter::map::Map<impl warp::Filter + std::marker::Copy + warp::filter::FilterBase<Extract = (), Error = Infallible>, [closure@src/main.rs:53:38: 53:45]>>: warp::Filter`

Edit: This was changed to make the code a minimal running example.

答案1

得分: 1

添加一些 + Send + Sync + 'static 似乎解决了问题...

use async_trait::async_trait;
use tokio::runtime::Runtime;
use warp::Filter;

fn main() {
    // 创建 HTTP 服务器

    let state = CState { inner: 2 };

    Runtime::new().unwrap().block_on(async move {
        run_server::<CTask, CState>(state.clone()).await;
    });
}

trait TaskType {
    fn create_task(id: Option<u64>, name: &str, time: u64) -> Self;
}

#[async_trait]
trait State<T: TaskType>: Clone {
    async fn add_task(&self, task: T) -> u64;
}

#[derive(Clone)]
struct CState {
    inner: u64,
}

#[derive(Clone)]
struct CTask {
    inner: u64,
}

impl TaskType for CTask {
    fn create_task(id: Option<u64>, name: &str, time: u64) -> Self {
        CTask { inner: 2 }
    }
}

#[async_trait]
impl<T: TaskType + Send + 'static> State<T> for CState {
    async fn add_task(&self, task: T) -> u64 {
        self.inner
    }
}

async fn run_server<T: TaskType + Send + 'static, U: State<T> + Send + Sync + 'static>(state: U) {
    let warp_state = warp::any().map(move || state.clone());
    async fn post_new_task_handler<T: TaskType, U: State<T>>(
        task_type_str: String,
        time: u64,
        state: U,
    ) -> Result<impl warp::Reply, warp::Rejection> {
        let task = T::create_task(None, task_type_str.as_str(), time);
        let id = state.add_task(task).await;
        Ok(warp::reply::json(&id))
    }
    let post_new_task_route = warp::post()
        .and(warp::path!(String / u64))
        .and(warp::path::end())
        .and(warp_state.clone())
        .and_then(post_new_task_handler::<T, U>);
    let router = post_new_task_route;
    warp::serve(router).run(([127, 0, 0, 1], 3030)).await;
}

我无法解释为什么这样可以工作,解决方案纯粹是通过反复尝试找到的:

  • 我确实收到了与 #[async_trait] impl State for CState 相关的一些错误消息,涉及 Send 和一些生命周期 'async_trait(与 warp 完全无关)。遵循编译器错误消息的建议只会导致更多的错误消息。所以第一个直觉是添加 T: Send + 'staticSend 是由 rustc 建议的,但 'static 只是基于“如果在生命周期方面有问题,首先尝试使用 '_',然后尝试添加一个参数,然后尝试使用 'static'。”如果这些都不起作用,就需要开始思考。

  • 从那里开始,U: Send + Sync + 'static 只是完全凭直觉的猜测,与 warp 相关的错误消息对人类来说是不可能解析的。我确实曾将 post_new_task_handler 中的 U 参数删除,然后将其更改为某个无害的类型(i32),以确保 U 确实是问题所在。但后来它成为了“嗯,我们刚刚遇到了有关特性、未来和 Send 的问题。让我试试...”。

背景知识在于,async fn 生成的未来是状态机,在每个 .await(以及函数开头)保存所有参数和变量。过于简化的说,你可以将 fn post_new_task_handler 视为:

enum PostTaskHandlerFuture<U, T> {
    Start { task_type_str: String, time: u64, state: U },
    AwaitAtLine2 { poll: /* type for U::add_task's future */ },
    Return { /* warp::reply::… */ },
    Finished
}

此状态机在 tokio 上执行(通过 warp 的魔力中介 - 漂亮的错误消息在那里丢失了),为了让 tokio 的工作窃取调度器能够在线程之间分发工作,PostTaskHandlerFuture 需要是 Send。但为了使 PostTaskHandlerFuture 成为 SendU 需要是 Send。(我不知道为什么需要 Sync。)

因此,这种“使一些通用参数 Send/Sync”的方法在编写通用异步代码时是常见的,因此我想尝试一下。

英文:

Sprinkling some + Send + Sync + &#39;static seems to do the trick…

use async_trait::async_trait;
use tokio::runtime::Runtime;
use warp::Filter;
fn main() {
//create http server
let state = CState { inner: 2 };
Runtime::new().unwrap().block_on(async move {
run_server::&lt;CTask, CState&gt;(state.clone()).await;
});
}
trait TaskType {
fn create_task(id: Option&lt;u64&gt;, name: &amp;str, time: u64) -&gt; Self;
}
#[async_trait]
trait State&lt;T: TaskType&gt;: Clone {
async fn add_task(&amp;self, task: T) -&gt; u64;
}
#[derive(Clone)]
struct CState {
inner: u64,
}
#[derive(Clone)]
struct CTask {
inner: u64,
}
impl TaskType for CTask {
fn create_task(id: Option&lt;u64&gt;, name: &amp;str, time: u64) -&gt; Self {
CTask { inner: 2 }
}
}
#[async_trait]
impl&lt;T: TaskType + Send + &#39;static&gt; State&lt;T&gt; for CState {
async fn add_task(&amp;self, task: T) -&gt; u64 {
self.inner
}
}
async fn run_server&lt;T: TaskType + Send + &#39;static, U: State&lt;T&gt; + Send + Sync + &#39;static&gt;(state: U) {
let warp_state = warp::any().map(move || state.clone());
async fn post_new_task_handler&lt;T: TaskType, U: State&lt;T&gt;&gt;(
task_type_str: String,
time: u64,
state: U,
) -&gt; Result&lt;impl warp::Reply, warp::Rejection&gt; {
let task = T::create_task(None, task_type_str.as_str(), time);
let id = state.add_task(task).await;
Ok(warp::reply::json(&amp;id))
}
let post_new_task_route = warp::post()
.and(warp::path!(String / u64))
.and(warp::path::end())
.and(warp_state.clone())
.and_then(post_new_task_handler::&lt;T, U&gt;);
let router = post_new_task_route;
warp::serve(router).run(([127, 0, 0, 1], 3030)).await;
}

I can't explain why this works, the solution was found purely by trial and error:

  • I did get a few error messages from your #[async_trait] impl State for CState related to Send and some lifetime &#39;async_trait (so nothing to do with warp at all). Following the compiler error messages' suggestions there just led to more error messages. So the first "leap" of intuition was to add T: Send + &#39;static. The Send was suggested by rustc, but the &#39;static is just based on "If you have trouble with lifetimes, first try &#39;_, then try adding a parameter, then try &#39;static." If that doesn't work, start thinking.
  • From there on, U: Send + Sync + &#39;static was just complete guessing out of the blue, the warp-related error message is impossible to parse for humans. I did once remove the U parameter from post_new_task_handler, and then changed it to some benign type (i32) to make sure that U is really the problem. But then it was of the "Hmm. We just had problems with traits and futures and Send. Lemme just try…"
    The background knowledge here is that async fn-generated futures are state machines that save all parameters and variables at each .await (and at the start of the function). Oversimplifying, you can just think of fn post_new_task_handler as an

    enum PostTaskHandlerFuture&lt;U, T&gt; {
        Start { task_type_str: String, time: u64, state: U },
        AwaitAtLine2 { poll: /* type for U::add_task&#39;s future */ },
        Return { /* warp::reply::… */ },
        Finished
    }
    

    This state machine gets executed on tokio (mediated through warp's magic - the nice error messages are lost there), and for tokio's work stealing scheduler to be able to distribute work like executing PostTaskHandlerFuture over threads, PostTaskHandlerFuture needs to be Send. But for PostTaskHandlerFuture to be Send, U needs to be Send. (I have no idea why Sync is necessary.)
    So this kind of "make some generic parameter Send/Sync" is a common thing when writing generic async code, hence I thought of trying it.

huangapple
  • 本文由 发表于 2023年5月22日 03:12:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/76301520.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定