简单的通用流程演员,使用tokio。

huangapple go评论62阅读模式
英文:

Simple generic stream actor with tokio

问题

I understand that you're facing an issue with the Rust code you provided, particularly related to the Send trait and generic types. Here's the translated portion of your text:

你好,我正在尝试为使用 Tokio 的流编写一个简单且通用的 actor trait。
我无法同时在一个任务中侦听流和 mpsc 接收器(StreamActor::run)。

我尝试使用宏 tokio::select。以下是我的尝试:

```rust
#[async_trait]
trait StreamActor<S>
where
    Self: Sized + Sync + Send + 'static,
    S: Stream + Unpin + Send + 'static,
{
    type Message: Send + Debug;

    async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
        info!("started");
        self.initialize(&mut ctx).await?;

        loop {
            tokio::select! {
                Some(msg) = ctx.receiver.recv() => {
                    self.handle_actor_message(msg, &mut ctx).await?
                },
                Some(msg) = ctx.stream.next() => {
                    self.handle_stream_message(msg, &mut ctx).await?
                },
                else => {
                    ctx.receiver.close();
                    break;
                }
            }
        }

        self.finalize(&mut ctx).await?;
        info!("ended");

        Ok(())
    }

    async fn handle_actor_message(
        &mut self,
        msg: Self::Message,
        ctx: &mut Context<Self, S>,
    ) -> Result<()>;

    async fn handle_stream_message(
        &mut self,
        msg: S::Item,
        ctx: &mut Context<Self, S>,
    ) -> Result<()>;

    async fn initialize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
        Ok(())
    }

    async fn finalize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
        Ok(())
    }
}

完整代码:Rust Playground(由于依赖关系而无法执行)

编译器错误:

错误:将未来不能在线程之间安全发送
--&gt; src\main.rs:73:70
   |
73 | async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
   | ______________________________________________________________________^
74 |         info!("started");
75 |         self.initialize(&mut ctx).await?;
76 |
...  |
95 |         Ok(())
96 |     }
   |_____^ 由异步块创建的未来不是 `Send`
   |
   = 帮助:在 `impl futures::Future<Output = Result<(), anyhow::Error>>` 中,对于 `&lt;S as Stream&gt;::Item`,没有实现 trait `std::marker::Send`
注意:由于跨 await 使用此值,未来不是 `Send`
--&gt; src\main.rs:83:62
   |
78 |             tokio::select! {
79 |                 Some(msg) = ctx.receiver.recv() => {
80 |                     self.handle_actor_message(msg, &mut ctx).await?
81 |                 },
82 |                 Some(msg) = ctx.stream.next() => {
   |                  --- 具有类型 `&lt;S as Stream&gt;::Item`,不是 `Send`
83 |                     self.handle_stream_message(msg, &mut ctx).await?
   |                                                              ^^^^^^ 在这里发生 await,可能稍后使用 `msg`
...  |
88 |                 }
89 |             }
   |_____________ `msg`稍后在此处被丢弃
   = 注意:从 `impl futures::Future<Output = Result<(), anyhow::Error>>` 到对象类型 `dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send` 的转换需要

至我理解的问题是,流项目(S::Item)没有 Send trait,但需要它,因为需要等待异步消息处理程序,因此可以在线程之间发送。我不知道如何以及是否可以将 trait 的关联类型限制为 Send。如果我用具体类型替换泛型类型 S,例如在我的情况下为 SplitStream<Websocket>,则一切正常。

但我还需要该 actor trait 用于其他流,所以我的问题是,是否有办法使泛型方法起作用,如果有,应该如何实现?


If you have any specific questions or need further assistance with this code, please feel free to ask.

<details>
<summary>英文:</summary>

Good day, I am trying to write a simple and generic actor trait for streams with tokio.
I fail to listen to the stream and the mpsc reciever at the same time in one task (StreamActor::run).

I tried it with the macro tokio::select. Here is my attempt:

#[async_trait]
trait StreamActor<S>
where
Self: Sized + Sync + Send + 'static,
S: Stream + Unpin + Send + 'static,
{
type Message: Send + Debug;

async fn run(&amp;mut self, mut ctx: Context&lt;Self, S&gt;) -&gt; Result&lt;()&gt; {
    info!(&quot;started&quot;);
    self.initialize(&amp;mut ctx).await?;

    loop {
        tokio::select! {
            Some(msg) = ctx.receiver.recv() =&gt; {
                self.handle_actor_message(msg, &amp;mut ctx).await?
            },
            Some(msg) = ctx.stream.next() =&gt; {
                self.handle_stream_message(msg, &amp;mut ctx).await?
            },
            else =&gt; {
                ctx.receiver.close();
                break;
            }
        }
    }

    self.finalize(&amp;mut ctx).await?;
    info!(&quot;ended&quot;);

    Ok(())
}

async fn handle_actor_message(
    &amp;mut self,
    msg: Self::Message,
    ctx: &amp;mut Context&lt;Self, S&gt;,
) -&gt; Result&lt;()&gt;;

async fn handle_stream_message(
    &amp;mut self,
    msg: S::Item,
    ctx: &amp;mut Context&lt;Self, S&gt;,
) -&gt; Result&lt;()&gt;;

async fn initialize(&amp;mut self, _: &amp;mut Context&lt;Self, S&gt;) -&gt; Result&lt;()&gt; {
    Ok(())
}

async fn finalize(&amp;mut self, _: &amp;mut Context&lt;Self, S&gt;) -&gt; Result&lt;()&gt; {
    Ok(())
}

}


Full code: [Rust Playground](https://play.rust-lang.org/?version=stable&amp;mode=debug&amp;edition=2021&amp;gist=305acce966499b9574ca97ff1735aa2b) (unfortunately not executable due to dependencies)

Compiler error:


error: future cannot be sent between threads safely
--> src\main.rs:73:70
|
73 | async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
| _________________________________________________________^
74 | | info!("started");
75 | | self.initialize(&mut ctx).await?;
76 | |
... |
95 | | Ok(())
96 | | }
| |_____^ future created by async block is not Send
|
= help: within impl futures::Future&lt;Output = Result&lt;(), anyhow::Error&gt;&gt;, the trait std::marker::Send is not implemented for &lt;S as Stream&gt;::Item
note: future is not Send as this value is used across an await
--> src\main.rs:83:62
|
78 | / tokio::select! {
79 | | Some(msg) = ctx.receiver.recv() => {
80 | | self.handle_actor_message(msg, &mut ctx).await?
81 | | },
82 | | Some(msg) = ctx.stream.next() => {
| | --- has type &lt;S as Stream&gt;::Item which is not Send
83 | | self.handle_stream_message(msg, &mut ctx).await?
| | ^^^^^^ await occurs here, with msg maybe used later
... |
88 | | }
89 | | }
| |
- msg is later dropped here
= note: required for the cast from impl futures::Future&lt;Output = Result&lt;(), anyhow::Error&gt;&gt; to the object type dyn futures::Future&lt;Output = Result&lt;(), anyhow::Error&gt;&gt; + std::marker::Send



Here the problem, as far as I understand it, is that the stream item (S::Item) doesn&#39;t have the &#39;Send&#39; trait, but needs it because I need to await the asyncronous message handlers and so it can be sent across threads. I don&#39;t know how and if it is possible to restrict an associated type of a trait to &#39;Send&#39;. If I replace the Generic type S with a concrete type, like in my case for example &#39;SplitStream\&lt;Websocket\&gt;&#39; everything works.

But I need the actor trait also for other streams, so my question is, is there a way to make the generic approach work and if so how?




</details>


# 答案1
**得分**: 1

只需在正确的位置添加 `S::Item: Send` 限制:

```rust
#[async_trait]
trait StreamActor<S>
where
    Self: Sized + Sync + Send + 'static,
    S: Stream + Unpin + Send + 'static,
    S::Item: Send,
{ ... }

这会引发一些级联错误,您需要在其他几个地方添加 S::Item: Send 以修复它们。

英文:

Just add a bound S::Item: Send in the correct place:

#[async_trait]
trait StreamActor&lt;S&gt;
where
    Self: Sized + Sync + Send + &#39;static,
    S: Stream + Unpin + Send + &#39;static,
    S::Item: Send,
{ ... }

This causes a few cascaded errors and you need to add S::Item: Send in few other places to fix them.

huangapple
  • 本文由 发表于 2023年3月12日 08:57:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/75710492.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定