向tokio的Mutex中添加值会引发移动错误。

huangapple go评论97阅读模式
英文:

Adding a values to Mutex in tokio is causing moving error

问题

在我的代码中,我创建了一个类型为 use tokio::sync::Mutex; 的共享向量 shared_vec,如下所示:

  1. let shared_vec: Arc<Mutex<Vec<user::User>>> = Arc::new(Mutex::new(Vec::new()));

我试图在 tokio::spawn 内部获取锁并向列表中添加值,这应该是异步的,但我遇到了以下错误:

  1. value moved here, in previous iteration of loop

heretokio::spawn(async move {... 的末尾。

有人可以告诉我如何解决这个问题吗?以下是我的代码:

  1. use tokio::{
  2. io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
  3. net::TcpListener,
  4. sync::broadcast,
  5. };
  6. use serde_json::json;
  7. use serde_json::Value;
  8. use tokio::sync::Mutex;
  9. use std::sync::Arc;
  10. mod user;
  11. #[tokio::main]
  12. async fn main() {
  13. let listener = TcpListener::bind("localhost:8080").await.unwrap();
  14. let (tx, _rx) = broadcast::channel(10);
  15. let shared_vec: Arc<Mutex<Vec<user::User>>> = Arc::new(Mutex::new(Vec::new()));
  16. loop {
  17. let (mut socket, addr) = listener.accept().await.unwrap();
  18. let tx = tx.clone();
  19. let mut rx = tx.subscribe();
  20. tokio::spawn(async move {
  21. let (reader, mut writer) = socket.split();
  22. let mut reader = BufReader::new(reader);
  23. let mut line = String::new();
  24. loop {
  25. tokio::select! {
  26. result = reader.read_line(&mut line) => {
  27. if result.unwrap() == 0 {
  28. break;
  29. }
  30. tx.send((line.clone(), addr)).unwrap();
  31. line.clear();
  32. }
  33. result = rx.recv() => {
  34. let (msg, other_addr) = result.unwrap();
  35. let json_msg: Value = json!(msg);
  36. let mut vec = shared_vec.lock().await;
  37. let mut indices_to_update = Vec::new();
  38. for (index, usr) in vec.iter().enumerate() {
  39. if usr.get_ip() != &addr {
  40. indices_to_update.push(index);
  41. }
  42. }
  43. for _index in indices_to_update {
  44. let user = user::User::new(json_msg["username"].to_string(), addr);
  45. vec.push(user);
  46. }
  47. if addr == other_addr {
  48. writer.write_all(msg.as_bytes()).await.unwrap();
  49. }
  50. }
  51. }
  52. }
  53. });
  54. }
  55. }
英文:

In my code I'm creating

  1. let shared_vec: Arc&lt;Mutex&lt;Vec&lt;user::User&gt;&gt;&gt; = Arc::new(Mutex::new(Vec::new()));

which is type use tokio::sync::Mutex;.

I'm trying to acquire lock and add values to the list inside of tokio::spawn, which should work async, but I'm getting an error

  1. value moved here, in previous iteration of loop

here is at the end of tokio::spawn(async move {...

Could someone tell me workaround for this?
Here is my code

  1. use tokio::{
  2. io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
  3. net::TcpListener,
  4. sync::broadcast,
  5. };
  6. use serde_json::json;
  7. use serde_json::Value;
  8. use tokio::sync::Mutex;
  9. use std::sync::Arc;
  10. mod user;
  11. #[tokio::main]
  12. async fn main() {
  13. let listener = TcpListener::bind(&quot;localhost:8080&quot;).await.unwrap();
  14. let (tx, _rx) = broadcast::channel(10);
  15. let shared_vec: Arc&lt;Mutex&lt;Vec&lt;user::User&gt;&gt;&gt; = Arc::new(Mutex::new(Vec::new()));
  16. loop {
  17. let (mut socket, addr) = listener.accept().await.unwrap();
  18. let tx = tx.clone();
  19. let mut rx = tx.subscribe();
  20. tokio::spawn(async move {
  21. let (reader, mut writer) = socket.split();
  22. let mut reader = BufReader::new(reader);
  23. let mut line = String::new();
  24. loop {
  25. tokio::select! {
  26. result = reader.read_line(&amp;mut line) =&gt; {
  27. if result.unwrap() == 0 {
  28. break;
  29. }
  30. tx.send((line.clone(), addr)).unwrap();
  31. line.clear();
  32. }
  33. result = rx.recv() =&gt; {
  34. let (msg, other_addr) = result.unwrap();
  35. let json_msg: Value = json!(msg);
  36. let mut vec = shared_vec.lock().await;
  37. let mut indices_to_update = Vec::new();
  38. for (index, usr) in vec.iter().enumerate() {
  39. if usr.get_ip() != &amp;addr {
  40. indices_to_update.push(index);
  41. }
  42. }
  43. for _index in indices_to_update {
  44. let user = user::User::new(json_msg[&quot;username&quot;].to_string(), addr);
  45. vec.push(user);
  46. }
  47. if addr == other_addr {
  48. writer.write_all(msg.as_bytes()).await.unwrap();
  49. }
  50. }
  51. }
  52. }
  53. });
  54. }
  55. }

答案1

得分: 3

你必须使用 Arc::clone(&amp;shared_vec) 来克隆 Arc,然后将克隆的对象传递给 move block,以便在循环块中使用。如果不克隆,共享的引用将传递给其他线程/块,并且不能再使用。

英文:

You have to clone the Arc to be passed to the async move block using Arc::clone(&amp;shared_vec) in the loop block. You can then pass the new clone to the move block.

Without the clone the shared referenced is passed to the other thread / block and is not allowed to be used anymore.

huangapple
  • 本文由 发表于 2023年6月15日 15:53:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/76480258.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定