tokio::time::sleep 在 Rust 中会导致死锁吗?

huangapple go评论74阅读模式
英文:

Is tokio::time::sleep producing deadlocks in rust?

问题

在我的理解中,同时有太多的 tokio::time:sleep 调用会导致死锁。在下面的示例代码中,我想要模拟使用事件总线来使用 tokio 运行时并行执行代码。

use eventador::{Eventador, SinkExt};
use tokio::time::{sleep, Duration, Instant};
use once_cell::sync::Lazy;

static INSTANT: Lazy<Instant> = Lazy::new(|| Instant::now());

pub struct Subscriber {
    eventbus: Eventador,
    i: u16,
}

impl Subscriber {
    pub fn new(i: u16, eventbus: Eventador) -> Self {
        Self { i, eventbus }
    }

    pub async fn start(self) {
        let subscription = self.eventbus.subscribe::<Event>();
        let value = subscription.recv().value;
        println!("pre sleep {} - {}ms since start", self.i, INSTANT.elapsed().as_millis());
        let now = Instant::now();
        sleep(Duration::from_millis(1000)).await;
        println!("{}: {:?} - {}ms - {}ms since start", self.i, value, now.elapsed().as_millis(), INSTANT.elapsed().as_millis());
    }
}

#[derive(Debug)]
pub struct Event {
    pub value: u16,
}

#[tokio::main]
async fn main() {
    let eventbus = Eventador::new(1024).unwrap();

    let mut publisher = eventbus.async_publisher::<Event>(512);

    for i in 1..8 {
        let subscriber = Subscriber::new(i, eventbus.clone());
        println!("spawn {}", i);
        tokio::spawn(subscriber.start());
    }

    println!("sending at {}", INSTANT.elapsed().as_millis());
    publisher.send(Event { value: 1234 }).await.expect("Something went wrong");
    println!("send finished");

    sleep(Duration::from_millis(10000)).await;
    println!("sleep finished");
}

以上代码将产生以下输出:

spawn 1
spawn 2
spawn 3
spawn 4
spawn 5
spawn 6
spawn 7
sending at 0
pre sleep 3 - 1ms since start
pre sleep 4 - 1ms since start
pre sleep 1 - 1ms since start
pre sleep 5 - 1ms since start
send finished
pre sleep 2 - 1ms since start
pre sleep 6 - 1ms since start
pre sleep 7 - 1ms since start
5: 1234 - 1000ms - 1002ms since start
1: 1234 - 1000ms - 1002ms since start
4: 1234 - 1000ms - 1002ms since start
3: 1234 - 1000ms - 1002ms since start
7: 1234 - 1001ms - 1003ms since start
6: 1234 - 1001ms - 1004ms since start
2: 1234 - 1002ms - 1004ms since start
sleep finished

这就是我想要看到的。但是,当我将订阅者的数量增加到例如 10(在 for 循环中)时,我会得到以下输出:

spawn 1
spawn 2
spawn 3
spawn 4
spawn 5
spawn 6
spawn 7
spawn 8
spawn 9
spawn 10
sending at 0
pre sleep 4 - 3ms since start
pre sleep 3 - 3ms since start
pre sleep 1 - 3ms since start
pre sleep 5 - 3ms since start
pre sleep 6 - 3ms since start
pre sleep 2 - 3ms since start
pre sleep 7 - 3ms since start
send finished
pre sleep 8 - 3ms since start
7: 1234 - 1001ms - 1004ms since start
3: 1234 - 1001ms - 1005ms since start
6: 1234 - 1001ms - 1005ms since start
4: 1234 - 1002ms - 1005ms since start
1: 1234 - 1002ms - 1005ms since start
2: 1234 - 1002ms - 1005ms since start
8: 1234 - 1001ms - 1005ms since start
5: 1234 - 1001ms - 1005ms since start
sleep finished

此外,程序永远不会停止。为什么这很重要?因为我只是在运行模拟吗?我想确保在生产中,例如,当我同时读取 100 个文件时,不会遇到这种死锁情况 - 这会完全使我的想法无效化。

英文:

It seems to me that having to many tokio::time:sleep calls in parallel produces a deadlock. What I wanted to do in the example code below is simulating using an event bus to execute code in parallel using the tokio runtime.

use eventador::{Eventador, SinkExt};
use tokio::time::{sleep, Duration, Instant};
use once_cell::sync::Lazy;

static INSTANT: Lazy&lt;Instant&gt; = Lazy::new(|| Instant::now());

pub struct Subscriber {
    eventbus: Eventador,
    i: u16
}

impl Subscriber {
    pub fn new(i: u16, eventbus: Eventador) -&gt; Self {
        Self {
            i, eventbus
        }
    }

    pub async fn start(self) {
        let subscription = self.eventbus.subscribe::&lt;Event&gt;();
        let value = subscription.recv().value;
        println!(&quot;pre sleep {} - {}ms since start&quot;, self.i, INSTANT.elapsed().as_millis());
        let now = Instant::now();
        sleep(Duration::from_millis(1000)).await;
        println!(&quot;{}: {:?} - {}ms - {}ms since start&quot;, self.i, value, now.elapsed().as_millis(), INSTANT.elapsed().as_millis());
    }
}

#[derive(Debug)]
pub struct Event {
    pub value: u16
}

#[tokio::main]
async fn main() {
    let eventbus = Eventador::new(1024).unwrap();

    let mut publisher = eventbus.async_publisher::&lt;Event&gt;(512);

    for i in 1..8 {
        let subscriber = Subscriber::new(i, eventbus.clone());
        println!(&quot;spawn {}&quot;, i);
        tokio::spawn(subscriber.start());
    }

    println!(&quot;sending at {}&quot;, INSTANT.elapsed().as_millis());
    publisher.send(Event { value: 1234 }).await.expect(&quot;Something went wrong&quot;);
    println!(&quot;send finished&quot;);

    sleep(Duration::from_millis(10000)).await;
    println!(&quot;sleep finished&quot;);
}

The above code will produce this output:

spawn 1
spawn 2
spawn 3
spawn 4
spawn 5
spawn 6
spawn 7
sending at 0
pre sleep 3 - 1ms since start
pre sleep 4 - 1ms since start
pre sleep 1 - 1ms since start
pre sleep 5 - 1ms since start
send finished
pre sleep 2 - 1ms since start
pre sleep 6 - 1ms since start
pre sleep 7 - 1ms since start
5: 1234 - 1000ms - 1002ms since start
1: 1234 - 1000ms - 1002ms since start
4: 1234 - 1000ms - 1002ms since start
3: 1234 - 1000ms - 1002ms since start
7: 1234 - 1001ms - 1003ms since start
6: 1234 - 1001ms - 1004ms since start
2: 1234 - 1002ms - 1004ms since start
sleep finished

This is what I wanted to see. But when I increase the amount of subscribers to lets say 10 (in the for loop) I get this output

spawn 1
spawn 2
spawn 3
spawn 4
spawn 5
spawn 6
spawn 7
spawn 8
spawn 9
spawn 10
sending at 0
pre sleep 4 - 3ms since start
pre sleep 3 - 3ms since start
pre sleep 1 - 3ms since start
pre sleep 5 - 3ms since start
pre sleep 6 - 3ms since start
pre sleep 2 - 3ms since start
pre sleep 7 - 3ms since start
send finished
pre sleep 8 - 3ms since start
7: 1234 - 1001ms - 1004ms since start
3: 1234 - 1001ms - 1005ms since start
6: 1234 - 1001ms - 1005ms since start
4: 1234 - 1002ms - 1005ms since start
1: 1234 - 1002ms - 1005ms since start
2: 1234 - 1002ms - 1005ms since start
8: 1234 - 1001ms - 1005ms since start
5: 1234 - 1001ms - 1005ms since start
sleep finished

Additionally the program never stops. Why is that important, when I am just running a simulation? I want to make sure that in production when lets say I read 100 files concurrently, that I do not run into this deadlock - this would completely invalidate my idea.

答案1

得分: 2

您正在使用eventador的同步API,阻塞了运行时,这可能会导致死锁,而不是sleep。请使用异步版本(async_subscriber()async_publisher())。

英文:

You're using the sync APIs of eventador, blocking the runtime, and this can cause deadlocks, not the sleep. Use the async versions (async_subscriber() and async_publisher()).

huangapple
  • 本文由 发表于 2023年6月29日 14:27:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/76578512.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定