向tokio的Mutex中添加值会引发移动错误。

huangapple go评论64阅读模式
英文:

Adding a values to Mutex in tokio is causing moving error

问题

在我的代码中,我创建了一个类型为 use tokio::sync::Mutex; 的共享向量 shared_vec,如下所示:

let shared_vec: Arc<Mutex<Vec<user::User>>> = Arc::new(Mutex::new(Vec::new()));

我试图在 tokio::spawn 内部获取锁并向列表中添加值,这应该是异步的,但我遇到了以下错误:

value moved here, in previous iteration of loop

heretokio::spawn(async move {... 的末尾。

有人可以告诉我如何解决这个问题吗?以下是我的代码:

use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
    net::TcpListener,
    sync::broadcast,
};

use serde_json::json;
use serde_json::Value;
use tokio::sync::Mutex;
use std::sync::Arc;
mod user;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("localhost:8080").await.unwrap();
    let (tx, _rx) = broadcast::channel(10);

    let shared_vec: Arc<Mutex<Vec<user::User>>> = Arc::new(Mutex::new(Vec::new()));

    loop {
        let (mut socket, addr) = listener.accept().await.unwrap();

        let tx = tx.clone();
        let mut rx = tx.subscribe();

        tokio::spawn(async move {
            let (reader, mut writer) = socket.split();

            let mut reader = BufReader::new(reader);
            let mut line = String::new();

            loop {
                tokio::select! {
                    result = reader.read_line(&mut line) => {
                        if result.unwrap() == 0 {
                            break;
                        }
                        tx.send((line.clone(), addr)).unwrap();
                        line.clear();
                    }
                    result = rx.recv() => {
                        let (msg, other_addr) = result.unwrap();

                        let json_msg: Value = json!(msg);

                        let mut vec = shared_vec.lock().await;

                        let mut indices_to_update = Vec::new();

                        for (index, usr) in vec.iter().enumerate() {
                            if usr.get_ip() != &addr {
                                indices_to_update.push(index);
                            }
                        }

                        for _index in indices_to_update {
                            let user = user::User::new(json_msg["username"].to_string(), addr);
                            vec.push(user);
                        }

                        if addr == other_addr {
                            writer.write_all(msg.as_bytes()).await.unwrap();
                        }
                    }
                }
            }
        });
    }
}
英文:

In my code I'm creating

let shared_vec: Arc&lt;Mutex&lt;Vec&lt;user::User&gt;&gt;&gt; = Arc::new(Mutex::new(Vec::new()));

which is type use tokio::sync::Mutex;.

I'm trying to acquire lock and add values to the list inside of tokio::spawn, which should work async, but I'm getting an error

value moved here, in previous iteration of loop

here is at the end of tokio::spawn(async move {...

Could someone tell me workaround for this?
Here is my code

use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
    net::TcpListener,
    sync::broadcast,
};

use serde_json::json;
use serde_json::Value;
use tokio::sync::Mutex;
use std::sync::Arc;
mod user;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind(&quot;localhost:8080&quot;).await.unwrap();
    let (tx, _rx) = broadcast::channel(10);


    let shared_vec: Arc&lt;Mutex&lt;Vec&lt;user::User&gt;&gt;&gt; = Arc::new(Mutex::new(Vec::new()));

    loop {
        let (mut socket, addr) = listener.accept().await.unwrap();
        
        let tx = tx.clone();
        let mut rx = tx.subscribe();

        tokio::spawn(async move {
            let (reader, mut writer) = socket.split();

            let mut reader = BufReader::new(reader);
            let mut line = String::new();

            loop {
                tokio::select! {
                    result = reader.read_line(&amp;mut line) =&gt; {
                        if result.unwrap() == 0 {
                            break;
                        }
                        tx.send((line.clone(), addr)).unwrap();
                        line.clear();
                    }
                    result = rx.recv() =&gt; {
                        let (msg, other_addr) = result.unwrap();

                        let json_msg: Value = json!(msg);
                        
                        let mut vec = shared_vec.lock().await;

                        let mut indices_to_update = Vec::new();

                        for (index, usr) in vec.iter().enumerate() {
                            if usr.get_ip() != &amp;addr {
                                indices_to_update.push(index);
                            }
                        }

                        for _index in indices_to_update {
                            let user = user::User::new(json_msg[&quot;username&quot;].to_string(), addr);
                            vec.push(user);
                        }

                        if addr == other_addr {
                            writer.write_all(msg.as_bytes()).await.unwrap();
                        }
                    }
                }
            }
        });
    }
}

答案1

得分: 3

你必须使用 Arc::clone(&amp;shared_vec) 来克隆 Arc,然后将克隆的对象传递给 move block,以便在循环块中使用。如果不克隆,共享的引用将传递给其他线程/块,并且不能再使用。

英文:

You have to clone the Arc to be passed to the async move block using Arc::clone(&amp;shared_vec) in the loop block. You can then pass the new clone to the move block.

Without the clone the shared referenced is passed to the other thread / block and is not allowed to be used anymore.

huangapple
  • 本文由 发表于 2023年6月15日 15:53:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/76480258.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定