如何在Rust中直接从curl读取缓冲行

huangapple go评论98阅读模式
英文:

How to read buffered lines directly from curl in Rust

问题

我正在使用curl库来下载一个经过gzip压缩的文件:

  1. use curl::easy::Easy;
  2. fn download_file() -> MyResult<()> {
  3. let mut file = File::create("file.txt.gz")?;
  4. let mut easy = Easy::new();
  5. easy.url("https://example.com/file.txt.gz")?;
  6. easy.write_function(move |data| {
  7. file.write_all(data).unwrap();
  8. return Ok(data.len());
  9. })?;
  10. easy.perform()?;
  11. return Ok(());
  12. }

我正在使用flate2库来解压文件,并逐行读取:

  1. use flate2::read::GzDecoder;
  2. fn import_file() -> MyResult<()> {
  3. let file = File::open("file.txt.gz")?;
  4. let reader = BufReader::new(GzDecoder::new(file));
  5. for line in reader.lines() {
  6. let line = line?;
  7. println!("{}", line);
  8. }
  9. return Ok(());
  10. }

我想要将这些步骤合并,但是GzDecoder需要一个实现Read trait的对象,我不确定如何将其连接到Easy。我怀疑这需要在单独的线程上执行下载和导入步骤:

  1. use curl::easy::Easy;
  2. use flate2::read::GzDecoder;
  3. fn download_and_import_file() -> MyResult<()> {
  4. let handle = thread::spawn(|| {
  5. let mut easy = Easy::new();
  6. easy.url("https://example.com/file.txt.gz").unwrap();
  7. easy.write_function(move |data| {
  8. // (这里应该放什么?)
  9. return Ok(data.len());
  10. }).unwrap();
  11. easy.perform().unwrap();
  12. });
  13. let reader = // (这里应该放什么?)
  14. let reader = BufReader::new(GzDecoder::new(reader));
  15. for line in reader.lines() {
  16. let line = line?;
  17. println!("{}", line);
  18. }
  19. handle.join().unwrap();
  20. return Ok(());
  21. }
英文:

I'm using the curl crate to download a gzipped file:

  1. use curl::easy::Easy;
  2. fn download_file() -&gt; MyResult&lt;()&gt; {
  3. let mut file = File::create(&quot;file.txt.gz&quot;)?;
  4. let mut easy = Easy::new();
  5. easy.url(&quot;https://example.com/file.txt.gz&quot;)?;
  6. easy.write_function(move |data| {
  7. file.write_all(data).unwrap();
  8. return Ok(data.len());
  9. })?;
  10. easy.perform()?;
  11. return Ok(());
  12. }

I'm using the flate2 crate to decompress the file, and read one line at a time:

  1. use flate2::read::GzDecoder;
  2. fn import_file() -&gt; MyResult&lt;()&gt; {
  3. let file = File::open(&quot;file.txt.gz&quot;)?;
  4. let reader = BufReader::new(GzDecoder::new(file));
  5. for line in reader.lines() {
  6. let line = line?;
  7. println!(&quot;{line}&quot;);
  8. }
  9. return Ok(());
  10. }

I would like to combine these steps, but GzDecoder requires an object implementing the Read trait, and I'm not sure how to hook that up to Easy. I suspect this will require doing the download and import steps on separate threads:

  1. use curl::easy::Easy;
  2. use flate2::read::GzDecoder;
  3. fn download_and_import_file() -&gt; MyResult&lt;()&gt; {
  4. let handle = thread::spawn(|| {
  5. let mut easy = Easy::new();
  6. easy.url(&quot;https://example.com/file.txt.gz&quot;).unwrap();
  7. easy.write_function(move |data| {
  8. // (what goes here?)
  9. return Ok(data.len());
  10. }).unwrap();
  11. easy.perform().unwrap();
  12. });
  13. let reader = // (what goes here?)
  14. let reader = BufReader::new(GzDecoder::new(reader));
  15. for line in reader.lines() {
  16. let line = line?;
  17. println!(&quot;{line}&quot;);
  18. }
  19. handle.join().unwrap();
  20. return Ok(());
  21. }

答案1

得分: 2

以下是您要求的代码部分的翻译:

  1. use std::collections::VecDeque;
  2. use std::io::{self, Read};
  3. use std::sync::atomic::{AtomicBool, Ordering};
  4. use std::sync::{Arc, Condvar, Mutex};
  5. #[derive(Clone, Default)]
  6. pub struct InMemoryStream {
  7. inner: Arc<InMemoryStreamInner>,
  8. }
  9. #[derive(Default)]
  10. struct InMemoryStreamInner {
  11. data: Mutex<VecDeque<u8>>,
  12. condvar: Condvar,
  13. closed: AtomicBool,
  14. }
  15. impl InMemoryStream {
  16. pub fn write(&self, data: &[u8]) {
  17. self.inner.data.lock().unwrap().extend(data);
  18. self.inner.condvar.notify_one();
  19. }
  20. pub fn close(&self, data: &[u8]) {
  21. self.inner.closed.store(true, Ordering::SeqCst);
  22. self.inner.condvar.notify_all();
  23. }
  24. }
  25. impl Read for InMemoryStream {
  26. fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
  27. let mut lock = self.inner.data.lock().unwrap();
  28. let read = lock.read(buf)?;
  29. if read > 0 {
  30. return Ok(read);
  31. }
  32. if self.inner.closed.load(Ordering::SeqCst) {
  33. return Ok(0);
  34. }
  35. let mut lock = self.inner.condvar.wait(lock).unwrap();
  36. lock.read(buf)
  37. }
  38. }

然后使用它如下:

  1. fn download_and_import_file() -> MyResult<()> {
  2. let stream = InMemoryStream::default();
  3. let handle = thread::spawn({
  4. let stream = stream.clone();
  5. let mut stream2 = stream.clone();
  6. || {
  7. let mut easy = Easy::new();
  8. easy.url("https://example.com/file.txt.gz").unwrap();
  9. easy.write_function(move |data| {
  10. stream.write(data);
  11. return Ok(data.len());
  12. })
  13. .unwrap();
  14. easy.perform().unwrap();
  15. stream2.close();
  16. }
  17. });
  18. let reader = BufReader::new(GzDecoder::new(stream));
  19. for line in reader.lines() {
  20. let line = line?;
  21. println!("{line}");
  22. }
  23. handle.join().unwrap();
  24. return Ok(());
  25. }

请注意,我已经将HTML编码符号(例如<和>)翻译回原始的Rust代码中的角括号。

英文:

You can create a type that serves as an in-memory Read implementation which you can write into:

  1. use std::collections::VecDeque;
  2. use std::io::{self, Read};
  3. use std::sync::atomic::{AtomicBool, Ordering};
  4. use std::sync::{Arc, Condvar, Mutex};
  5. #[derive(Clone, Default)]
  6. pub struct InMemoryStream {
  7. inner: Arc&lt;InMemoryStreamInner&gt;,
  8. }
  9. #[derive(Default)]
  10. struct InMemoryStreamInner {
  11. data: Mutex&lt;VecDeque&lt;u8&gt;&gt;,
  12. condvar: Condvar,
  13. closed: AtomicBool,
  14. }
  15. impl InMemoryStream {
  16. pub fn write(&amp;self, data: &amp;[u8]) {
  17. self.inner.data.lock().unwrap().extend(data);
  18. self.inner.condvar.notify_one();
  19. }
  20. pub fn close(&amp;self, data: &amp;[u8]) {
  21. self.inner.closed.store(true, Ordering::SeqCst);
  22. self.inner.condvar.notify_all();
  23. }
  24. }
  25. impl Read for InMemoryStream {
  26. fn read(&amp;mut self, buf: &amp;mut [u8]) -&gt; io::Result&lt;usize&gt; {
  27. let mut lock = self.inner.data.lock().unwrap();
  28. let read = lock.read(buf)?;
  29. if read &gt; 0 {
  30. return Ok(read);
  31. }
  32. if self.inner.closed.load(Ordering::SeqCst) {
  33. return Ok(0);
  34. }
  35. let mut lock = self.inner.condvar.wait(lock).unwrap();
  36. lock.read(buf)
  37. }
  38. }

Then use it as follows:

  1. fn download_and_import_file() -&gt; MyResult&lt;()&gt; {
  2. let stream = InMemoryStream::default();
  3. let handle = thread::spawn({
  4. let stream = stream.clone();
  5. let mut stream2 = stream.clone();
  6. || {
  7. let mut easy = Easy::new();
  8. easy.url(&quot;https://example.com/file.txt.gz&quot;).unwrap();
  9. easy.write_function(move |data| {
  10. stream.write(data);
  11. return Ok(data.len());
  12. })
  13. .unwrap();
  14. easy.perform().unwrap();
  15. stream2.close();
  16. }
  17. });
  18. let reader = BufReader::new(GzDecoder::new(stream));
  19. for line in reader.lines() {
  20. let line = line?;
  21. println!(&quot;{line}&quot;);
  22. }
  23. handle.join().unwrap();
  24. return Ok(());
  25. }

huangapple
  • 本文由 发表于 2023年7月23日 18:06:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/76747671.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定