这是因为互斥锁没有被释放吗?

huangapple go评论48阅读模式
英文:

Is this because the mutex was not released?

问题

我已经阅读了将单线程服务器转换为多线程服务器这篇文章,并尝试实现它。

我编写了以下代码:

use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;

type Task = dyn FnOnce() + Send + 'static;

pub struct Threadpool {
    threads: Vec<thread::JoinHandle<()>>,
    rx: Arc<Mutex<Receiver<Box<Task>>>,
    tx: Sender<Box<Task>>,
}

impl Threadpool {
    pub fn new(size: usize) -> Threadpool {
        let mut tasks = Vec::with_capacity(size);

        let (tx, rx): (Sender<Box<Task>>, Receiver<Box<Task>>) = channel();

        let rx = Arc::new(Mutex::new(rx));

        for _ in 0..size {
            let rx = rx.clone();

            let task = thread::spawn(move || {
                loop {
                    let job = rx.lock().unwrap().recv().unwrap();
                    job();
                }
            });

            tasks.push(task);
        }

        Threadpool {
            threads: tasks,
            rx,
            tx,
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        self.tx.send(Box::new(f)).unwrap();
    }
}

它可以工作。

但是,当我将以下部分的代码:

let job = rx.lock().unwrap().recv().unwrap();
job();

更改为:

rx.lock().unwrap().recv().unwrap()();

当我打开localhost:port/sleep,然后打开localhost:port时,会花费5秒钟。

我在主函数中设置了以下内容:

"GET /sleep HTTP/1.1" => {
    thread::sleep(Duration::from_secs(5));
    ("HTTP/1.1 200 OK", "hello.html")
}

我已经知道while let会导致这种情况。

但我无法弄清楚为什么我的上面的代码也会导致这种情况。

有人可以给我答案吗?

英文:

I've read this Turning Our Single-Threaded Server into a Multithreaded Server.
And tried to implement it.

I wrote this:

use std::sync::mpsc::{channel, Receiver, Sender};

use std::sync::{Arc, Mutex};

use std::thread;

type task = dyn FnOnce() + Send + &#39;static;

pub struct Threadpool {
    threads: Vec&lt;thread::JoinHandle&lt;()&gt;&gt;,

    rx: Arc&lt;Mutex&lt;Receiver&lt;Box&lt;task&gt;&gt;&gt;&gt;,

    tx: Sender&lt;Box&lt;task&gt;&gt;,
}

impl Threadpool {
    pub fn new(size: usize) -&gt; Threadpool {
        let mut tasks = Vec::with_capacity(size);

        let (tx, rx): (Sender&lt;Box&lt;task&gt;&gt;, Receiver&lt;Box&lt;task&gt;&gt;) = channel();

        let rx = Arc::new(Mutex::new(rx));

        for _ in 0..size {
            let rx = rx.clone();

            let task = thread::spawn(move || {
                loop {
                   let job= rx.lock().unwrap().recv().unwrap();
                   job();
                }
            });

            tasks.push(task);
        }

        Threadpool {
            threads: tasks,
            rx,
            tx,
        }
    }

    pub fn execute&lt;F&gt;(&amp;self, f: F)
    where
        F: FnOnce() + Send + &#39;static,
    {
        self.tx.send(Box::new(f)).unwrap();
    }
}

It works.

But when I change

let job= rx.lock().unwrap().recv().unwrap();
job();

to

rx.lock().unwrap().recv().unwrap()();

When I open localhost:port/sleep, and then open localhost:port, it will takes 5 seconds.

I set this in main

&quot;GET /sleep HTTP/1.1&quot; =&gt; {
            thread::sleep(Duration::from_secs(5));
            (&quot;HTTP/1.1 200 OK&quot;, &quot;hello.html&quot;)
        }

I already knew that while let will cause that.

But I can't figure out why my code above will also lead to that.

Can anybody give me the answer.

答案1

得分: 3

在Rust中,临时对象会在包含它们的表达式结束时被销毁(有一些不相关的注意事项)。

而我们感兴趣的临时对象是互斥锁的保护者,其drop方法负责释放互斥锁。

因此,显式编写drop的第一个代码示例:

let job = rx.lock().unwrap().recv().unwrap();
job();

等同于:

let mut guard = rx.lock().unwrap();
let job = guard.recv().unwrap();
drop(guard);
job();

而你的第二个代码示例:

rx.lock().unwrap().recv().unwrap()();

等同于:

let mut guard = rx.lock().unwrap();
let job = guard.recv().unwrap();
job();
drop(guard);

正如你所看到的,现在你在互斥锁仍然被锁住的情况下调用了job()函数。

英文:

In Rust temporary objects are dropped at the end of the expression that contains them (with a few caveats not relevant here).

And the temporary we are interested in is the guard of the mutex, whose drop is responsible of releasing the mutex lock.

So, writingh the drop explicitly, your first code:

let job = rx.lock().unwrap().recv().unwrap();
job();

Is equivalent to:

let mut guard = rx.lock().unwrap();
let job = guard.recv().unwrap();
drop(guard);
job();

And your second code:

rx.lock().unwrap().recv().unwrap()();

Is equivalent to:

let mut guard = rx.lock().unwrap();
let job = guard.recv().unwrap()
job();
drop(guard);

As you can see, now you are calling the job() function with the mutex still locked.

答案2

得分: 1

是因为互斥锁没有释放吗?

是的,你基本上正在做这个操作

{
    let rx = rx.lock().unwrap(); // 获得了锁
    let job = rx.recv().unwrap(); // 获得了作业
    // 在仍然持有互斥锁的情况下进入睡眠
    job(); // std::thread::sleep(Duration::from_secs(5))
    drop(rx); // 释放锁
}

由于所有线程共享互斥锁,并尝试获取锁,它们实际上会被阻塞,直到持有锁的睡眠线程唤醒。这就是为什么在请求sleep端点后,其他线程无法执行任何其他作业的原因。

然而,还有另一个问题。即使它不进入睡眠,它仍然在持有锁的情况下调用Receiver::recv(),这将阻塞当前线程(进入睡眠状态),直到有东西通过通道发送。但考虑到只有在通道上没有作业时一个线程才会阻塞其他线程,我想这应该是有意设计的。

英文:

> Is this because the mutex was not released?

Yes, you are basically doing this

{
    let rx = rx.lock().unwrap(); // got the lock
    let job = rx.recv().unwrap(); // got the job
    // going to sleep while still holding mutex lock
    job(); // std::thread::sleep(Duration::from_secs(5))
    drop(rx); // lock is released
}

As all the threads share the mutex, and will try to acquire lock, they are effectively blocked until sleeping thread with lock wakes up. Which is why after requesting sleep endpoint, other threads are unable to perform any other job.

There's another issue however. Even if it didn't sleep, it still calls Receiver::recv() while still holding the lock, which blocks the current thread (goes to sleep) until something is sent down the channel. But considering that one thread will only block others if there's no jobs on the channel, I guess that is by design.

huangapple
  • 本文由 发表于 2023年2月7日 01:21:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/75364568.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定