Rust异步循环函数阻塞了另一个未来的执行。

huangapple go评论70阅读模式
英文:

Rust async loop function blocks another futures execution

问题

我有我的应用程序,其中包括服务和HTTP服务器。

  • 服务使用OS API执行一些操作,等待事件等等,并且它具有用于此目的的循环以及异步数据库写入。我使用异步函数启动此服务。
  • 服务器(使用Rocket编写)也使用异步请求处理程序,因为我当前使用SeaORM,它使用异步。

问题:
当我向服务器发出请求时,处理程序内部的异步任务永远不会启动,除非在我的服务循环中触发事件。当我的服务中触发事件时,处理程序就会成功完成,但在下一个请求中仍然是相同的情况。

我尝试过:

  • 使用 tokio::spawntokio::task::spawn,它们都以完全相同的方式工作(阻塞执行)。
  • 据我所知,我不能生成常规线程,因为无论如何我都不能使用 .await
  • 我还尝试将 main 标记为 #[rocket::main(worker_threads = 4)],这应该创建更多的异步线程?但结果仍然相同。

我该如何克服这个问题?
我可以考虑只使用不是异步的其他ORM,比如 diesel,因为除了ORM之外,我当前没有在任何其他地方使用异步,这将起作用,但我认为这不是一个好的解决方案。
另一个想法是在我的循环中添加滴答声,这样它就不会一直停在服务事件触发之前,但这看起来也很奇怪,处理延迟仍然会取决于这个。

英文:

I have my app consisting of service and http server.

  • Service has some things going on with OS api, waiting for events, etc. and it has loop for this purposes as well as async db writes. I start this service with async function.
  • Server(written with rocket) uses async request hadnles as well because I'm currently using SeaORM which uses async.

Problem:
When I hit my server with request, it never starts async tasks within handler, unless event in my service loop is fired. When event is fired in my service, handler finishes well but on the next request its the same.

I tried to:

  • Use tokio::spawn, tokio::task::spawn, both of them were working exactly the same way (blocked execution)
  • As far as I know I can't spawn regular thread, because I wouldn't be able to .await anyways.
  • Also I tried to mark main with #[rocket::main(worker_threads = 4)] which should make more async threads? But It's still the same.

How can I overcome this?
I can think of is to just use another ORM like diesel which is not async, and as I don't currently use async anywhere else beside ORM it will work, but I don't think this is a good solution.
And another thought is to add ticks to my loop, so it won't be stuck until service event is fired, but this also looks weird and handles latency will still depend on this.

Minimal reproducible example:

extern crate rocket;

use std::sync::mpsc::{channel, Sender};

use once_cell::sync::OnceCell;
use rocket::serde::{json::Json, Deserialize, Serialize};
use rocket::State;
use sea_orm::{entity::prelude::*, Database, Set};
use sea_orm::{DbBackend, Schema};
use tokio::join;
use windows::{
    w,
    Win32::Foundation::HWND,
    Win32::UI::{
        Accessibility::{SetWinEventHook, HWINEVENTHOOK},
        WindowsAndMessaging::{MessageBoxW, EVENT_SYSTEM_FOREGROUND, MB_OK},
    },
};

thread_local! {
    static TX: OnceCell<Sender<RawWindowEvent>>= OnceCell::new()
}

#[rocket::main]
async fn main() {
    let db = Database::connect("sqlite://data.db?mode=rwc")
        .await
        .unwrap();

    let builder = db.get_database_backend();

    let stmt = builder.build(
        Schema::new(DbBackend::Sqlite)
            .create_table_from_entity(Entity)
            .if_not_exists(),
    );

    db.execute(stmt).await.unwrap();

    let server = rocket::build()
        .manage(db.clone())
        .mount("/", routes![get_events])
        .launch();

    let service = tokio::spawn(service(db.clone()));

    join!(server, service);
}

#[get("/event")]
async fn get_events(db: &State<DatabaseConnection>) -> Json<Vec<Model>> {
    let db = db as &DatabaseConnection;

    let events = Entity::find().all(db).await.unwrap();

    Json(events)
}

extern "system" fn win_event_hook_callback(
    child_id: HWINEVENTHOOK,
    hook_handle: u32,
    event_id: HWND,
    window_handle: i32,
    object_id: i32,
    thread_id: u32,
    timestamp: u32,
) -> () {
    let event = RawWindowEvent {
        child_id,
        hook_handle,
        event_id,
        window_handle,
        object_id,
        thread_id,
        timestamp,
    };

    TX.with(|f| {
        let tx: &Sender<RawWindowEvent> = f.get().unwrap();

        tx.send(event).unwrap();
    });
}

async fn service(db: DatabaseConnection) {
    let (tx, cx) = channel::<RawWindowEvent>();

    std::thread::spawn(move || {
        TX.with(|f| f.set(tx)).unwrap();

        let hook = unsafe {
            SetWinEventHook(
                EVENT_SYSTEM_FOREGROUND,
                EVENT_SYSTEM_FOREGROUND,
                None,
                Some(win_event_hook_callback),
                0,
                0,
                0,
            )
        };

        let _ = unsafe { MessageBoxW(None, w!("Text"), w!("Text"), MB_OK) };
    });

    loop {
        let event = cx.recv();

        if (event.is_err()) {
            break;
        }

        let event = event.unwrap();

        // There goes some event processing with another windows api calls or simple calculations...

        let record = ActiveModel {
            timestamp: Set(event.timestamp),
            ..Default::default()
        };

        Entity::insert(record).exec(&db).await.unwrap();
    }
}

#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[serde(crate = "rocket::serde")]
#[sea_orm(table_name = "event")]
pub struct Model {
    #[sea_orm(primary_key)]
    pub id: i32,
    pub timestamp: u32,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

#[derive(Debug, Clone, Copy)]
pub struct RawWindowEvent {
    pub child_id: HWINEVENTHOOK,
    pub hook_handle: u32,
    pub event_id: HWND,
    pub window_handle: i32,
    pub object_id: i32,
    pub thread_id: u32,
    pub timestamp: u32,
}

Dependencies in Cargo.toml:

[dependencies]
dotenv = "0.15.0"

tokio = { version = "1.28.2", features = ["full"] }

sea-orm = { version = "^0.11", features = [ "sqlx-sqlite", "runtime-tokio-native-tls", "macros" ] }

rocket = {version = "0.5.0-rc.3", features = ["json"]}

once_cell = "1.17.1"

[dependencies.windows]
version = "0.48.0"
features = [
    "Win32_Foundation",
    "Win32_UI_Accessibility",
    "Win32_UI_WindowsAndMessaging",
    "Win32_System_Threading",
    "Win32_System_ProcessStatus"
]

答案1

得分: 3

你正在使用一个同步通道,因此会阻塞运行时。请使用在tokio中定义的通道:tokio::sync::mpsc

英文:

You're using a synchronous channel, and by that you're blocking the runtime. Use the channel defined in tokio: tokio::sync::mpsc.

huangapple
  • 本文由 发表于 2023年6月25日 20:29:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76550386-2.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定