分享 Tokio 通道在 Rust 中的函数之间

huangapple go评论70阅读模式
英文:

Share tokio channel between functions in rust

问题

以下是您要翻译的代码部分:

use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};
use std::error::Error;

async fn handle_connection(mut stream: TcpStream, sender: &Sender<[u8; 128]>, receiver: &Receiver<[u8; 128]>) -> Result<(), Box<dyn Error>> {
    
    // 进行一些操作...
        
}

pub async fn create_listener(addr: String) -> Result<(), Box<dyn Error>> {

    let listener = TcpListener::bind(&addr).await?;

    let (sender, receiver) = mpsc::channel(10);

    loop {
        let (mut stream, _) = listener.accept().await?;

        tokio::spawn(async move {
            handle_connection(stream, &sender, &receiver);
        });
    }

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {

    create_listener(String::from("127.0.0.1:8000")).await?;

    Ok(())
}

这是代码的翻译,不包括代码注释和问题部分。如果您有任何其他问题或需要进一步的帮助,请随时提出。

您遇到的错误是因为在 Rust 中,对不可复制(non-copy)类型的所有权传递(ownership transfer)问题。在这里,Sender<[u8; 128]>Receiver<[u8; 128]> 是不可复制的类型,因此无法简单地传递它们的引用。

解决这个问题的一种方法是使用 Arc(原子引用计数)来包装 SenderReceiver,以便多个线程可以共享所有权。这是一个修改的示例:

use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};
use std::error::Error;
use std::sync::Arc;

async fn handle_connection(mut stream: TcpStream, sender: Arc<Sender<[u8; 128]>>, receiver: Arc<Receiver<[u8; 128]>>) -> Result<(), Box<dyn Error>> {
    
    // 进行一些操作...
        
}

pub async fn create_listener(addr: String) -> Result<(), Box<dyn Error>> {

    let listener = TcpListener::bind(&addr).await?;

    let (sender, receiver) = mpsc::channel(10);
    let sender = Arc::new(sender);
    let receiver = Arc::new(receiver);

    loop {
        let (mut stream, _) = listener.accept().await?;

        let sender_clone = sender.clone();
        let receiver_clone = receiver.clone();

        tokio::spawn(async move {
            handle_connection(stream, sender_clone, receiver_clone).await.unwrap_or_else(|e| {
                eprintln!("Error in handle_connection: {:?}", e);
            });
        });
    }

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {

    create_listener(String::from("127.0.0.1:8000")).await?;

    Ok(())
}

这里,我们使用了 Arc 来包装 SenderReceiver,并且在每次迭代中都克隆了它们,以便在 tokio::spawn 内使用。这样可以确保每个线程都有自己的引用,而不会移动它们的所有权,从而避免了编译器错误。

英文:

I'm new to rust and I'm implementing a simple client server application. I managed to get the application working from the std::sync::TcpStream library. Now I want the server to be able to accept requests asynchronously using the tokio library. The operation of the application is based on two types of client, the writer and the reader. The writer will be writing to the server through the tcp stream, and the reader will be reading what is on the server. To establish an internal connection on the server I use the tokio channels.

The problem that I have is related when I try to pass by parameter the sender and the receiver of the channel that I have created to the function that processes the stream.

Here is the code. I think that I don't really understand how variable references work in rust.

use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};
use std::error::Error;

async fn handle_connection(mut stream: TcpStream, sender: &amp;Sender&lt;[u8; 128]&gt;, receiver: &amp;Receiver&lt;[u8; 128]&gt;) -&gt; Result&lt;(), Box&lt;dyn Error&gt;&gt; {
    
    //do something...
        
}

pub async fn create_listener(addr: String) -&gt; Result&lt;(), Box&lt;dyn Error&gt;&gt; {

    let listener = TcpListener::bind(&amp;addr).await?;

    let (sender, receiver) = mpsc::channel(10);

    loop {
        let (mut stream, _) = listener.accept().await?;

        tokio::spawn(async move {
            handle_connection(stream, &amp;sender, &amp;receiver);
        });
    }

    Ok(())
}

#[tokio::main]
async fn main() -&gt; Result&lt;(), Box&lt;dyn Error&gt;&gt; {

    create_listener(String::from(&quot;127.0.0.1:8000&quot;)).await?;

    Ok(())

}

The error returned is the following:

error[E0382]: use of moved value: `sender`
  --&gt; src/server-t.rs:49:33
   |
44 |       let (sender, receiver) = mpsc::channel(10);
   |            ------ move occurs because `sender` has type `tokio::sync::mpsc::Sender&lt;[u8; 128]&gt;`, which does not implement the `Copy` trait
...
49 |           tokio::spawn(async move {
   |  _________________________________^
50 | |             handle_connection(stream, &amp;sender, &amp;receiver);
   | |                                        ------ use occurs due to use in generator
51 | |         });
   | |_________^ value moved here, in previous iteration of loop

error[E0382]: use of moved value: `receiver`
  --&gt; src/server-t.rs:49:33
   |
44 |       let (sender, receiver) = mpsc::channel(10);
   |                    -------- move occurs because `receiver` has type `tokio::sync::mpsc::Receiver&lt;[u8; 128]&gt;`, which does not implement the `Copy` trait
...
49 |           tokio::spawn(async move {
   |  _________________________________^
50 | |             handle_connection(stream, &amp;sender, &amp;receiver);
   | |                                                 -------- use occurs due to use in generator
51 | |         });
   | |_________^ value moved here, in previous iteration of loop

Could you explain me why this happens? How could it be solved?

答案1

得分: 1

async move块中捕获某些变量时,它们会被移动到该块内。因此,senderreceiver被移动到异步块中,不能在循环的下一次迭代中重复使用。

经典的解决方法是将它们包装在Arc中,以便进行共享所有权。这对于发送者来说可以工作,因为Sender::send()接受&self,所以即使使用Arc,你也可以发送。但是Receiver::recv()需要&mut self(因为这是一个多生产者单消费者通道),所以你不能从&ReceiverArc<Receiver>接收。

最简单的修复方法是使用broadcast通道,这是一个多生产者多消费者通道。尽管如此,这会更不高效,并且还存在语义差异:使用广播通道时,如果接收者在发送之后注册,它将不会收到消息,而使用mpsc通道时会收到。

另一种选项是使用Arc<Mutex<Option<T>>>来包装接收者,并使用.lock().unwrap().take().expect("two readers")来为读取者获取值。

英文:

When you capture some variable in an async move block, it is moved into the block. Because of that, sender and receiver are moved into the async block, and can't be reused for the next iteration of the loop.

The classic workaround is to wrap them in Arc, so that you have shared ownership. This will work with the sender, because Sender::send() takes &amp;self so you can send even with an Arc. But Receiver::recv() requires &amp;mut self (because this is a Multi Producer Single Consumer channel), so you cannot recieve from &amp;Receiver or Arc&lt;Receiver&gt;.

The simplest fix will be to use a broadcast channel, which is a Multi-Producer Multi-Consumer channel. This will be less efficient, though, and also has a semantic difference: with broadcast channels, if the receiver registers after the send it will not receive the message, while with mpsc channel it will.

Another option is to wrap the receiver with Arc&lt;Mutex&lt;Option&gt;&gt;&gt;, and .lock().unwrap().take().expect(&quot;two readers&quot;) it for the reader.

huangapple
  • 本文由 发表于 2023年7月10日 16:56:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/76652206.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定