英文:
Simple generic stream actor with tokio
问题
I understand that you're facing an issue with the Rust code you provided, particularly related to the Send
trait and generic types. Here's the translated portion of your text:
你好,我正在尝试为使用 Tokio 的流编写一个简单且通用的 actor trait。
我无法同时在一个任务中侦听流和 mpsc 接收器(StreamActor::run)。
我尝试使用宏 tokio::select。以下是我的尝试:
```rust
#[async_trait]
trait StreamActor<S>
where
Self: Sized + Sync + Send + 'static,
S: Stream + Unpin + Send + 'static,
{
type Message: Send + Debug;
async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
info!("started");
self.initialize(&mut ctx).await?;
loop {
tokio::select! {
Some(msg) = ctx.receiver.recv() => {
self.handle_actor_message(msg, &mut ctx).await?
},
Some(msg) = ctx.stream.next() => {
self.handle_stream_message(msg, &mut ctx).await?
},
else => {
ctx.receiver.close();
break;
}
}
}
self.finalize(&mut ctx).await?;
info!("ended");
Ok(())
}
async fn handle_actor_message(
&mut self,
msg: Self::Message,
ctx: &mut Context<Self, S>,
) -> Result<()>;
async fn handle_stream_message(
&mut self,
msg: S::Item,
ctx: &mut Context<Self, S>,
) -> Result<()>;
async fn initialize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
Ok(())
}
async fn finalize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
Ok(())
}
}
完整代码:Rust Playground(由于依赖关系而无法执行)
编译器错误:
错误:将未来不能在线程之间安全发送
--> src\main.rs:73:70
|
73 | async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
| ______________________________________________________________________^
74 | info!("started");
75 | self.initialize(&mut ctx).await?;
76 |
... |
95 | Ok(())
96 | }
|_____^ 由异步块创建的未来不是 `Send`
|
= 帮助:在 `impl futures::Future<Output = Result<(), anyhow::Error>>` 中,对于 `<S as Stream>::Item`,没有实现 trait `std::marker::Send`
注意:由于跨 await 使用此值,未来不是 `Send`
--> src\main.rs:83:62
|
78 | tokio::select! {
79 | Some(msg) = ctx.receiver.recv() => {
80 | self.handle_actor_message(msg, &mut ctx).await?
81 | },
82 | Some(msg) = ctx.stream.next() => {
| --- 具有类型 `<S as Stream>::Item`,不是 `Send`
83 | self.handle_stream_message(msg, &mut ctx).await?
| ^^^^^^ 在这里发生 await,可能稍后使用 `msg`
... |
88 | }
89 | }
|_____________ `msg`稍后在此处被丢弃
= 注意:从 `impl futures::Future<Output = Result<(), anyhow::Error>>` 到对象类型 `dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send` 的转换需要
至我理解的问题是,流项目(S::Item)没有 Send
trait,但需要它,因为需要等待异步消息处理程序,因此可以在线程之间发送。我不知道如何以及是否可以将 trait 的关联类型限制为 Send
。如果我用具体类型替换泛型类型 S,例如在我的情况下为 SplitStream<Websocket>
,则一切正常。
但我还需要该 actor trait 用于其他流,所以我的问题是,是否有办法使泛型方法起作用,如果有,应该如何实现?
If you have any specific questions or need further assistance with this code, please feel free to ask.
<details>
<summary>英文:</summary>
Good day, I am trying to write a simple and generic actor trait for streams with tokio.
I fail to listen to the stream and the mpsc reciever at the same time in one task (StreamActor::run).
I tried it with the macro tokio::select. Here is my attempt:
#[async_trait]
trait StreamActor<S>
where
Self: Sized + Sync + Send + 'static,
S: Stream + Unpin + Send + 'static,
{
type Message: Send + Debug;
async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
info!("started");
self.initialize(&mut ctx).await?;
loop {
tokio::select! {
Some(msg) = ctx.receiver.recv() => {
self.handle_actor_message(msg, &mut ctx).await?
},
Some(msg) = ctx.stream.next() => {
self.handle_stream_message(msg, &mut ctx).await?
},
else => {
ctx.receiver.close();
break;
}
}
}
self.finalize(&mut ctx).await?;
info!("ended");
Ok(())
}
async fn handle_actor_message(
&mut self,
msg: Self::Message,
ctx: &mut Context<Self, S>,
) -> Result<()>;
async fn handle_stream_message(
&mut self,
msg: S::Item,
ctx: &mut Context<Self, S>,
) -> Result<()>;
async fn initialize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
Ok(())
}
async fn finalize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
Ok(())
}
}
Full code: [Rust Playground](https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=305acce966499b9574ca97ff1735aa2b) (unfortunately not executable due to dependencies)
Compiler error:
error: future cannot be sent between threads safely
--> src\main.rs:73:70
|
73 | async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
| _________________________________________________________^
74 | | info!("started");
75 | | self.initialize(&mut ctx).await?;
76 | |
... |
95 | | Ok(())
96 | | }
| |_____^ future created by async block is not Send
|
= help: within impl futures::Future<Output = Result<(), anyhow::Error>>
, the trait std::marker::Send
is not implemented for <S as Stream>::Item
note: future is not Send
as this value is used across an await
--> src\main.rs:83:62
|
78 | / tokio::select! {
79 | | Some(msg) = ctx.receiver.recv() => {
80 | | self.handle_actor_message(msg, &mut ctx).await?
81 | | },
82 | | Some(msg) = ctx.stream.next() => {
| | --- has type <S as Stream>::Item
which is not Send
83 | | self.handle_stream_message(msg, &mut ctx).await?
| | ^^^^^^ await occurs here, with msg
maybe used later
... |
88 | | }
89 | | }
| |- msg
is later dropped here
= note: required for the cast from impl futures::Future<Output = Result<(), anyhow::Error>>
to the object type dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send
Here the problem, as far as I understand it, is that the stream item (S::Item) doesn't have the 'Send' trait, but needs it because I need to await the asyncronous message handlers and so it can be sent across threads. I don't know how and if it is possible to restrict an associated type of a trait to 'Send'. If I replace the Generic type S with a concrete type, like in my case for example 'SplitStream\<Websocket\>' everything works.
But I need the actor trait also for other streams, so my question is, is there a way to make the generic approach work and if so how?
</details>
# 答案1
**得分**: 1
只需在正确的位置添加 `S::Item: Send` 限制:
```rust
#[async_trait]
trait StreamActor<S>
where
Self: Sized + Sync + Send + 'static,
S: Stream + Unpin + Send + 'static,
S::Item: Send,
{ ... }
这会引发一些级联错误,您需要在其他几个地方添加 S::Item: Send
以修复它们。
英文:
Just add a bound S::Item: Send
in the correct place:
#[async_trait]
trait StreamActor<S>
where
Self: Sized + Sync + Send + 'static,
S: Stream + Unpin + Send + 'static,
S::Item: Send,
{ ... }
This causes a few cascaded errors and you need to add S::Item: Send
in few other places to fix them.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论