英文:
Adding a values to Mutex in tokio is causing moving error
问题
在我的代码中,我创建了一个类型为 use tokio::sync::Mutex;
的共享向量 shared_vec
,如下所示:
let shared_vec: Arc<Mutex<Vec<user::User>>> = Arc::new(Mutex::new(Vec::new()));
我试图在 tokio::spawn
内部获取锁并向列表中添加值,这应该是异步的,但我遇到了以下错误:
value moved here, in previous iteration of loop
here
在 tokio::spawn(async move {...
的末尾。
有人可以告诉我如何解决这个问题吗?以下是我的代码:
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpListener,
sync::broadcast,
};
use serde_json::json;
use serde_json::Value;
use tokio::sync::Mutex;
use std::sync::Arc;
mod user;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("localhost:8080").await.unwrap();
let (tx, _rx) = broadcast::channel(10);
let shared_vec: Arc<Mutex<Vec<user::User>>> = Arc::new(Mutex::new(Vec::new()));
loop {
let (mut socket, addr) = listener.accept().await.unwrap();
let tx = tx.clone();
let mut rx = tx.subscribe();
tokio::spawn(async move {
let (reader, mut writer) = socket.split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
tokio::select! {
result = reader.read_line(&mut line) => {
if result.unwrap() == 0 {
break;
}
tx.send((line.clone(), addr)).unwrap();
line.clear();
}
result = rx.recv() => {
let (msg, other_addr) = result.unwrap();
let json_msg: Value = json!(msg);
let mut vec = shared_vec.lock().await;
let mut indices_to_update = Vec::new();
for (index, usr) in vec.iter().enumerate() {
if usr.get_ip() != &addr {
indices_to_update.push(index);
}
}
for _index in indices_to_update {
let user = user::User::new(json_msg["username"].to_string(), addr);
vec.push(user);
}
if addr == other_addr {
writer.write_all(msg.as_bytes()).await.unwrap();
}
}
}
}
});
}
}
英文:
In my code I'm creating
let shared_vec: Arc<Mutex<Vec<user::User>>> = Arc::new(Mutex::new(Vec::new()));
which is type use tokio::sync::Mutex;
.
I'm trying to acquire lock and add values to the list inside of tokio::spawn
, which should work async, but I'm getting an error
value moved here, in previous iteration of loop
here
is at the end of tokio::spawn(async move {...
Could someone tell me workaround for this?
Here is my code
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpListener,
sync::broadcast,
};
use serde_json::json;
use serde_json::Value;
use tokio::sync::Mutex;
use std::sync::Arc;
mod user;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("localhost:8080").await.unwrap();
let (tx, _rx) = broadcast::channel(10);
let shared_vec: Arc<Mutex<Vec<user::User>>> = Arc::new(Mutex::new(Vec::new()));
loop {
let (mut socket, addr) = listener.accept().await.unwrap();
let tx = tx.clone();
let mut rx = tx.subscribe();
tokio::spawn(async move {
let (reader, mut writer) = socket.split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
tokio::select! {
result = reader.read_line(&mut line) => {
if result.unwrap() == 0 {
break;
}
tx.send((line.clone(), addr)).unwrap();
line.clear();
}
result = rx.recv() => {
let (msg, other_addr) = result.unwrap();
let json_msg: Value = json!(msg);
let mut vec = shared_vec.lock().await;
let mut indices_to_update = Vec::new();
for (index, usr) in vec.iter().enumerate() {
if usr.get_ip() != &addr {
indices_to_update.push(index);
}
}
for _index in indices_to_update {
let user = user::User::new(json_msg["username"].to_string(), addr);
vec.push(user);
}
if addr == other_addr {
writer.write_all(msg.as_bytes()).await.unwrap();
}
}
}
}
});
}
}
答案1
得分: 3
你必须使用 Arc::clone(&shared_vec)
来克隆 Arc
,然后将克隆的对象传递给 move block
,以便在循环块中使用。如果不克隆,共享的引用将传递给其他线程/块,并且不能再使用。
英文:
You have to clone
the Arc
to be passed to the async move block
using Arc::clone(&shared_vec)
in the loop block. You can then pass the new clone to the move block
.
Without the clone the shared referenced is passed to the other thread / block and is not allowed to be used anymore.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论