如何在Rust中直接从curl读取缓冲行

huangapple go评论74阅读模式
英文:

How to read buffered lines directly from curl in Rust

问题

我正在使用curl库来下载一个经过gzip压缩的文件:

use curl::easy::Easy;

fn download_file() -> MyResult<()> {
    let mut file = File::create("file.txt.gz")?;
    let mut easy = Easy::new();
    easy.url("https://example.com/file.txt.gz")?;
    easy.write_function(move |data| {
        file.write_all(data).unwrap();
        return Ok(data.len());
    })?;
    easy.perform()?;
    return Ok(());
}

我正在使用flate2库来解压文件,并逐行读取:

use flate2::read::GzDecoder;

fn import_file() -> MyResult<()> {
    let file = File::open("file.txt.gz")?;
    let reader = BufReader::new(GzDecoder::new(file));
    for line in reader.lines() {
        let line = line?;
        println!("{}", line);
    }
    return Ok(());
}

我想要将这些步骤合并,但是GzDecoder需要一个实现Read trait的对象,我不确定如何将其连接到Easy。我怀疑这需要在单独的线程上执行下载和导入步骤:

use curl::easy::Easy;
use flate2::read::GzDecoder;

fn download_and_import_file() -> MyResult<()> {
    let handle = thread::spawn(|| {
        let mut easy = Easy::new();
        easy.url("https://example.com/file.txt.gz").unwrap();
        easy.write_function(move |data| {
            // (这里应该放什么?)
            return Ok(data.len());
        }).unwrap();
        easy.perform().unwrap();
    });
    let reader = // (这里应该放什么?)
    let reader = BufReader::new(GzDecoder::new(reader));
    for line in reader.lines() {
        let line = line?;
        println!("{}", line);
    }
    handle.join().unwrap();
    return Ok(());
}
英文:

I'm using the curl crate to download a gzipped file:

use curl::easy::Easy;

fn download_file() -&gt; MyResult&lt;()&gt; {
    let mut file = File::create(&quot;file.txt.gz&quot;)?;
    let mut easy = Easy::new();
    easy.url(&quot;https://example.com/file.txt.gz&quot;)?;
    easy.write_function(move |data| {
        file.write_all(data).unwrap();
        return Ok(data.len());
    })?;
    easy.perform()?;
    return Ok(());
}

I'm using the flate2 crate to decompress the file, and read one line at a time:

use flate2::read::GzDecoder;

fn import_file() -&gt; MyResult&lt;()&gt; {
    let file = File::open(&quot;file.txt.gz&quot;)?;
    let reader = BufReader::new(GzDecoder::new(file));
    for line in reader.lines() {
        let line = line?;
        println!(&quot;{line}&quot;);
    }
    return Ok(());
}

I would like to combine these steps, but GzDecoder requires an object implementing the Read trait, and I'm not sure how to hook that up to Easy. I suspect this will require doing the download and import steps on separate threads:

use curl::easy::Easy;
use flate2::read::GzDecoder;

fn download_and_import_file() -&gt; MyResult&lt;()&gt; {
    let handle = thread::spawn(|| {
        let mut easy = Easy::new();
        easy.url(&quot;https://example.com/file.txt.gz&quot;).unwrap();
        easy.write_function(move |data| {
            // (what goes here?)
            return Ok(data.len());
        }).unwrap();
        easy.perform().unwrap();
    });
    let reader = // (what goes here?)
    let reader = BufReader::new(GzDecoder::new(reader));
    for line in reader.lines() {
        let line = line?;
        println!(&quot;{line}&quot;);
    }
    handle.join().unwrap();
    return Ok(());
}

答案1

得分: 2

以下是您要求的代码部分的翻译:

use std::collections::VecDeque;
use std::io::{self, Read};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};

#[derive(Clone, Default)]
pub struct InMemoryStream {
    inner: Arc<InMemoryStreamInner>,
}

#[derive(Default)]
struct InMemoryStreamInner {
    data: Mutex<VecDeque<u8>>,
    condvar: Condvar,
    closed: AtomicBool,
}

impl InMemoryStream {
    pub fn write(&self, data: &[u8]) {
        self.inner.data.lock().unwrap().extend(data);
        self.inner.condvar.notify_one();
    }

    pub fn close(&self, data: &[u8]) {
        self.inner.closed.store(true, Ordering::SeqCst);
        self.inner.condvar.notify_all();
    }
}

impl Read for InMemoryStream {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let mut lock = self.inner.data.lock().unwrap();
        let read = lock.read(buf)?;
        if read > 0 {
            return Ok(read);
        }
        if self.inner.closed.load(Ordering::SeqCst) {
            return Ok(0);
        }

        let mut lock = self.inner.condvar.wait(lock).unwrap();
        lock.read(buf)
    }
}

然后使用它如下:

fn download_and_import_file() -> MyResult<()> {
    let stream = InMemoryStream::default();
    let handle = thread::spawn({
        let stream = stream.clone();
        let mut stream2 = stream.clone();
        || {
            let mut easy = Easy::new();
            easy.url("https://example.com/file.txt.gz").unwrap();
            easy.write_function(move |data| {
                stream.write(data);
                return Ok(data.len());
            })
            .unwrap();
            easy.perform().unwrap();
            stream2.close();
        }
    });
    let reader = BufReader::new(GzDecoder::new(stream));
    for line in reader.lines() {
        let line = line?;
        println!("{line}");
    }
    handle.join().unwrap();
    return Ok(());
}

请注意,我已经将HTML编码符号(例如<和>)翻译回原始的Rust代码中的角括号。

英文:

You can create a type that serves as an in-memory Read implementation which you can write into:

use std::collections::VecDeque;
use std::io::{self, Read};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};

#[derive(Clone, Default)]
pub struct InMemoryStream {
    inner: Arc&lt;InMemoryStreamInner&gt;,
}

#[derive(Default)]
struct InMemoryStreamInner {
    data: Mutex&lt;VecDeque&lt;u8&gt;&gt;,
    condvar: Condvar,
    closed: AtomicBool,
}

impl InMemoryStream {
    pub fn write(&amp;self, data: &amp;[u8]) {
        self.inner.data.lock().unwrap().extend(data);
        self.inner.condvar.notify_one();
    }

    pub fn close(&amp;self, data: &amp;[u8]) {
        self.inner.closed.store(true, Ordering::SeqCst);
        self.inner.condvar.notify_all();
    }
}

impl Read for InMemoryStream {
    fn read(&amp;mut self, buf: &amp;mut [u8]) -&gt; io::Result&lt;usize&gt; {
        let mut lock = self.inner.data.lock().unwrap();
        let read = lock.read(buf)?;
        if read &gt; 0 {
            return Ok(read);
        }
        if self.inner.closed.load(Ordering::SeqCst) {
            return Ok(0);
        }

        let mut lock = self.inner.condvar.wait(lock).unwrap();
        lock.read(buf)
    }
}

Then use it as follows:

fn download_and_import_file() -&gt; MyResult&lt;()&gt; {
    let stream = InMemoryStream::default();
    let handle = thread::spawn({
        let stream = stream.clone();
        let mut stream2 = stream.clone();
        || {
            let mut easy = Easy::new();
            easy.url(&quot;https://example.com/file.txt.gz&quot;).unwrap();
            easy.write_function(move |data| {
                stream.write(data);
                return Ok(data.len());
            })
            .unwrap();
            easy.perform().unwrap();
            stream2.close();
        }
    });
    let reader = BufReader::new(GzDecoder::new(stream));
    for line in reader.lines() {
        let line = line?;
        println!(&quot;{line}&quot;);
    }
    handle.join().unwrap();
    return Ok(());
}

huangapple
  • 本文由 发表于 2023年7月23日 18:06:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/76747671.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定