std mpsc 发送者通道在闭包中使用时关闭

huangapple go评论85阅读模式
英文:

std mpsc sender channel closed when used in closure

问题

我正在尝试创建一个在Rust中监视文件更改的线程。我正在使用 notify crate 来获取文件更改事件。

以下代码显示了在监视线程中运行的内容:

use std::{path::PathBuf, thread};

use log::{error, info};
use notify::Watcher;

fn main() {
    env_logger::init();

    let (tx, rx) = std::sync::mpsc::channel();

    thread::spawn({
        let config_path = PathBuf::from("config.toml");

        move || {
            let mut watcher =
                notify::recommended_watcher(move |res: Result<notify::Event, notify::Error>| {
                    match res {
                        Ok(event) => match event.kind {
                            notify::event::EventKind::Modify(notify::event::ModifyKind::Data(
                                notify::event::DataChange::Any,
                            )) => {
                                info!("Reloading config file");
                                tx.send(true).ok();
                            }
                            _ => (),
                        },
                        Err(e) => {
                            error!("Notify Error: {}", e);
                        }
                    }
                })
                .unwrap();
            watcher.watch(&config_path, notify::RecursiveMode::NonRecursive)
        }
    });

    loop {
        match rx.recv() {
            Ok(_) => {
                // 重新加载配置
            }
            Err(e) => {
                error!("Error while watching config file: {}", e);
                break;
            }
        }
    }
}
# Cargo.toml

[dependencies]
env_logger = "0.10.0"
log = "0.4.19"
notify = "6.0.1"

问题是当尝试从通道接收时,我得到了输出 Error while watching config file: receiving on a closed channel

我期望发送器 tx 被移动到闭包中而不关闭通道。但不知何故它被关闭了。

我已经查看了 notify crate 的 已知问题,但我认为没有与我的问题相关的内容。

英文:

I'm trying to create a thread watching for file changes in rust. I'm using the notify crate to get file change events.

The following code shows what is run in the watch thread:

use std::{path::PathBuf, thread};

use log::{error, info};
use notify::Watcher;

fn main() {
    env_logger::init();

    let (tx, rx) = std::sync::mpsc::channel();

    thread::spawn({
        let config_path = PathBuf::from(&quot;config.toml&quot;);

        move || {
            let mut watcher =
                notify::recommended_watcher(move |res: Result&lt;notify::Event, notify::Error&gt;| {
                    match res {
                        Ok(event) =&gt; match event.kind {
                            notify::event::EventKind::Modify(notify::event::ModifyKind::Data(
                                notify::event::DataChange::Any,
                            )) =&gt; {
                                info!(&quot;Reloading config file&quot;);
                                tx.send(true).ok();
                            }
                            _ =&gt; (),
                        },
                        Err(e) =&gt; {
                            error!(&quot;Notify Error: {}&quot;, e);
                        }
                    }
                })
                .unwrap();
            watcher.watch(&amp;config_path, notify::RecursiveMode::NonRecursive)
        }
    });

    loop {
        match rx.recv() {
            Ok(_) =&gt; {
                // reload config
            }
            Err(e) =&gt; {
                error!(&quot;Error while watching config file: {}&quot;, e);
                break;
            }
        }
    }
}
# Cargo.toml

[dependencies]
env_logger = &quot;0.10.0&quot;
log = &quot;0.4.19&quot;
notify = &quot;6.0.1&quot;

The problem is that I get the output Error while watching config file: receiving on a closed channel when trying to receive from the channel.

I expected that the sender tx is moved into the closure without the channel being closed. But somehow it gets closed.

I already looked through known problems of the notify crate, but I think nothing relates to my problem.

答案1

得分: 0

watch() 是非阻塞的。根据文档(重点在于):

开始监视新路径。

因此,它是“发射并忘记”的。您无需为其启动新线程。实际上,在执行watch后,线程立即终止,观察者被丢弃,通道被关闭。

以下是您应该执行的操作:

fn main() {
    let (tx, rx) = std::sync::mpsc::channel();

    let config_path = PathBuf::from("config.toml");

    let mut watcher =
        notify::recommended_watcher(move |res: Result<notify::Event, notify::Error>| match res {
            Ok(event) => match event.kind {
                notify::event::EventKind::Modify(notify::event::ModifyKind::Data(
                    notify::event::DataChange::Any,
                )) => {
                    info!("重新加载配置文件");
                    tx.send(true).ok();
                }
                _ => (),
            },
            Err(e) => {
                error!("通知错误:{}", e);
            }
        })
        .unwrap();
    watcher.watch(&config_path, notify::RecursiveMode::NonRecursive);

    loop {
        match rx.recv() {
            Ok(_) => {
                // 重新加载配置
            }
            Err(e) => {
                error!("监视配置文件时出错:{}", e);
                break;
            }
        }
    }
}

请注意,代码中的 HTML 实体已被替换为相应的字符。

英文:

watch() is non-blocking. From the documentation (emphasis mine):

> Begin watching a new path.

So it is fire-and-forget. You don't need to spawn a new thread for it. And in fact, when you do, the thread terminates immediately after the watch, the watcher is dropped and the channel is closed.

Here's what you should do:

fn main() {
    let (tx, rx) = std::sync::mpsc::channel();

    let config_path = PathBuf::from(&quot;config.toml&quot;);

    let mut watcher =
        notify::recommended_watcher(move |res: Result&lt;notify::Event, notify::Error&gt;| match res {
            Ok(event) =&gt; match event.kind {
                notify::event::EventKind::Modify(notify::event::ModifyKind::Data(
                    notify::event::DataChange::Any,
                )) =&gt; {
                    info!(&quot;Reloading config file&quot;);
                    tx.send(true).ok();
                }
                _ =&gt; (),
            },
            Err(e) =&gt; {
                error!(&quot;Notify Error: {}&quot;, e);
            }
        })
        .unwrap();
    watcher.watch(&amp;config_path, notify::RecursiveMode::NonRecursive);

    loop {
        match rx.recv() {
            Ok(_) =&gt; {
                // reload config
            }
            Err(e) =&gt; {
                error!(&quot;Error while watching config file: {}&quot;, e);
                break;
            }
        }
    }
}

huangapple
  • 本文由 发表于 2023年7月27日 19:58:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/76779505.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定