Webflux 使用 zip 处理的 DataBuffer 导致文件损坏。

huangapple go评论74阅读模式
英文:

Webflux streaming DataBuffer with zip resulting in corrupt file

问题

Currently I'm implementing a non-blocking I/O application with Spring Boot 2.7.12 and Webflux to download files with a WebClient, zip them, and stream the zip file to the browser.

Downloading and zipping is working fine if I write the ZipOutputStream to a local file. However, if I stream the zip back to the caller (Flux), it is corrupted.

I'm not sure if I misunderstand the concept of the DataBuffer or if it is a bug in the Spring framework.

I have created a small sample. If you download the zip file, each entry is duplicated multiple times, and the last entry is corrupted.

Thank you,
Roberto

英文:

Currently I'm implementing a non blocking i/o application with Spring Boot 2.7.12 and Webflux to download files with a webclient, zip them and stream the zip file to the browser.
Downloading and zipping is working fine if I write the ZipOutputStream to a local file.
However, if I stream the zip back to the caller (Flux<DataBuffer>) it is corrupted.

I'm not sure, if I misunderstand the concept of the DataBuffer, or if it is a bug in the spring framework.

I have created a small sample. If you download the zip file, each entry is duplicated multiple times and the last entry is corrupted.

Thank you
Roberto

@GetMapping(value = &quot;/zip&quot;, produces = &quot;application/zip&quot;)
  public Flux&lt;DefaultDataBuffer&gt; zip() {
    var files = Arrays.asList(&quot;File1&quot;, &quot;File2&quot;, &quot;File3&quot;, &quot;File4&quot;, &quot;File5&quot;);
    var responseDataBuffer = new DefaultDataBufferFactory().allocateBuffer();
    ZipOutputStream zipOutputStream = new ZipOutputStream(responseDataBuffer.asOutputStream());
    return Flux.fromStream(files.stream())
        .map(file -&gt; putZipEntry(file, zipOutputStream))
        .map(x -&gt; responseDataBuffer)
        .doOnComplete(() -&gt; closeZipOutputStream(zipOutputStream));
  }

  private void closeZipOutputStream(ZipOutputStream zipOutputStream) {
    try {
    zipOutputStream.close();
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  private ZipOutputStream putZipEntry(String file, ZipOutputStream zipOutputStream) {
    try {
      zipOutputStream.putNextEntry(new ZipEntry(file + &quot;.txt&quot;));
      zipOutputStream.write(file.getBytes());
      zipOutputStream.closeEntry();
      return zipOutputStream;
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

答案1

得分: 1

以下是翻译好的代码部分:

private static final String ZIP_FILE = "UEsDBAoAAAAAAN1s5lYZ+CeZBgAAAAYAAAAJAAAAdGVzdDEudHh0VGVzdCAxUEsDBAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJAAAAdGVzdDIudHh0VGVzdCAyUEsDBAoAAAAAAPhs5lY1mSl3BgAAAAYAAAAJAAAAdGVzdDMudHh0VGVzdCAzUEsDBAoAAAAAAPts5laWDE3pBgAAAAYAAAAJAAAAdGVzdDQudHh0VGVzdCA0UEsDBAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJAAAAdGVzdDUudHh0VGVzdCA1UEsBAj8ACgAAAAAA3WzmVhn4J5kGAAAABgAAAAkAJAAAAAAAAAAgAAAAAAAAAHRlc3QxLnR4dAoAIAAAAAAAAQAYAN4h/3L+r9kBCLkhXwKw2QEkqm9t/q/ZAVBLAQI/AAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJACQAAAAAAAAAIAAAAC0AAAB0ZXN0Mi50eHQKACAAAAAAAAEAGAD2Z+2M/q/ZAVPgIV8CsNkBbwSLdP6v2QFQSwECPwAKAAAAAAD4bOZWNZkpdwYAAAAGAAAACQAkAAAAAAAAACAAAABaAAAAdGVzdDMudHh0CgAgAAAAAAABABgAGsOxkP6v2QHcByJfArDZAW2Je3b+r9kBUEsBAj8ACgAAAAAA+2zmVpYMTekGAAAABgAAAAkAJAAAAAAAAAAgAAAAhwAAAHRlc3Q0LnR4dAoAIAAAAAAAAQAYAKkIkZT+r9kBWlUiXwKw2QGkwld4/q/ZAVBLAQI/AAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJACQAAAAAAAAAIAAAALQAAAB0ZXN0NS50eHQKACAAAAAAAAEAGADtvU2Y/q/ZAZ/xIl8CsNkBEj5/d/6v2QFQSwUGAAAAAAUABQDHAQAA4QAAAAAA";
private static final int BUFFER_SIZE = 10;

@GetMapping(value = "/zip", produces = "application/zip")
public Flux<DataBuffer> test() {
    return DataBufferUtils.readInputStream(() -> new ByteArrayInputStream(Base64.getDecoder().decode(ZIP_FILE)), new DefaultDataBufferFactory(), BUFFER_SIZE);
}

请注意,代码中的HTML转义字符(例如&quot;)已被还原为双引号。

英文:

Why all the nonsense?

private static final String ZIP_FILE = &quot;UEsDBAoAAAAAAN1s5lYZ+CeZBgAAAAYAAAAJAAAAdGVzdDEudHh0VGVzdCAxUEsDBAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJAAAAdGVzdDIudHh0VGVzdCAyUEsDBAoAAAAAAPhs5lY1mSl3BgAAAAYAAAAJAAAAdGVzdDMudHh0VGVzdCAzUEsDBAoAAAAAAPts5laWDE3pBgAAAAYAAAAJAAAAdGVzdDQudHh0VGVzdCA0UEsDBAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJAAAAdGVzdDUudHh0VGVzdCA1UEsBAj8ACgAAAAAA3WzmVhn4J5kGAAAABgAAAAkAJAAAAAAAAAAgAAAAAAAAAHRlc3QxLnR4dAoAIAAAAAAAAQAYAN4h/3L+r9kBCLkhXwKw2QEkqm9t/q/ZAVBLAQI/AAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJACQAAAAAAAAAIAAAAC0AAAB0ZXN0Mi50eHQKACAAAAAAAAEAGAD2Z+2M/q/ZAVPgIV8CsNkBbwSLdP6v2QFQSwECPwAKAAAAAAD4bOZWNZkpdwYAAAAGAAAACQAkAAAAAAAAACAAAABaAAAAdGVzdDMudHh0CgAgAAAAAAABABgAGsOxkP6v2QHcByJfArDZAW2Je3b+r9kBUEsBAj8ACgAAAAAA+2zmVpYMTekGAAAABgAAAAkAJAAAAAAAAAAgAAAAhwAAAHRlc3Q0LnR4dAoAIAAAAAAAAQAYAKkIkZT+r9kBWlUiXwKw2QGkwld4/q/ZAVBLAQI/AAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJACQAAAAAAAAAIAAAALQAAAB0ZXN0NS50eHQKACAAAAAAAAEAGADtvU2Y/q/ZAZ/xIl8CsNkBEj5/d/6v2QFQSwUGAAAAAAUABQDHAQAA4QAAAAAA&quot;;

private static final int BUFFER_SIZE = 10;

@GetMapping(value = &quot;/zip&quot;, produces = &quot;application/zip&quot;)
public Flux&lt;DataBuffer&gt; test() {
    return DataBufferUtils.readInputStream(() -&gt; new ByteArrayInputStream(Base64.getDecoder().decode(ZIP_FILE)), new DefaultDataBufferFactory(), BUFFER_SIZE);
}

答案2

得分: 0

getFluxPublisherFunction方法不正确,逻辑不合理。它所做的事情是
>> DataBufferUtils.write(source, outputStream)

source是来自ZIP文件的DataBuffer的Flux。OutputStream是使用单独的缓冲区(命名为defaultDataBuffer)创建的另一个数据缓冲区。这个方法返回与source中相同的缓冲区

>> .map(buffer -> defaultDataBuffer)

对于每个缓冲区(ZIP文件的一部分),它返回一个完整的目标缓冲区,该缓冲区使用写操作进行填充。目标缓冲区在任何时候都可能包含ZIP文件的一个或多个部分。

  1. 代码之所以在有1000个缓冲区时有效,是因为这足以容纳整个ZIP文件的内容。在这种情况下,当订阅DataBufferUtils.write(source, outputStream)时,getRead方法返回单个DataBuffer。当缓冲区大小较小时,这不起作用,因为您返回了在defaultDataBuffer中累积的部分块。
  2. 要将ZIP返回给浏览器,只需返回由getRead方法创建的不同缓冲区的Flux<DataBuffer>。它适用于缓冲区大小为10、100或1000的情况。
英文:

getFluxPublisherFunction is incorrect and logic does not make sense. What it is doing is
>> DataBufferUtils.write(source, outputStream)

source is Flux of DataBuffer from ZIP_FILE. OutputStream is another data buffer created with separate buffer (named defaultDataBuffer). This method returns same buffers as in source

>> .map(buffer -> defaultDataBuffer)

For each buffer ( chunk of ZIP_file), it returns complete destimation buffer which is getting populated using write operation. Destination buffer at any point might contain one of more parts of ZIP_FILE.

  1. Why code works with 1000 buffer is because this is sufficient buffer to contain the entire contents of ZIP_FILE. getRead method in this case returns single DataBuffer when DataBufferUtils.write(source, outputStream) is subscribed. This does not work when buffer size is small because you are returning partial chunks accumulated in defaultDataBuffer.
  2. To return zip to browser, simply return the Flux<DataBuffer> created by getRead method with different buffers. It will work of buffer size of 10 or 100 or 1000.

答案3

得分: 0

I'm not sure how Spring handles that, but to my understanding it has no knowledge of when you finish the writing and you return the buffer as soon as you have the first chunk written to it. So I guess that is what the receiver sees too. In this case, what you need is to return the Data Buffer only when it is fully written.

Try this:

return Flux.fromStream(files.stream())
    .map(file -> putZipEntry(file, zipOutputStream))
    .then(Mono.fromCallable {
        closeZipOutputStream(zipOutputStream);
        responseDataBuffer
   })

PS in your case it doesn't make sense to use Reactor because you write it to a single buffer and wait until it finishes. It would be better if you produce the response in chunks, but I don't see how you can do that with the standard ZipOutputStream. However, since it already produces an OutputStream, it would be easier just to produce it as is (with InputStreamResource and ResponseEntity).

英文:

I'm not sure how Spring handles that, but to my understanding it has no knowledge of when you finish the writing and you return the buffer as soon as you have first chunk written to it. So I guess that is what the receiver sees too. In this case what you need is to return the Data Buffer only when it is fully written.

Try this:

return Flux.fromStream(files.stream())
    .map(file -&gt; putZipEntry(file, zipOutputStream))
    .then(Mono.fromCallable { 
        closeZipOutputStream(zipOutputStream);
        responseDataBuffer
   })

PS in your case it doesn't make sense to use Reactor because you write it to a single buffer and wait until it finishes. It would be better if you produce response in chunks, but I don't see how you can do that with standard ZipOutputStream. However, since it already produces an OutputStream it would be easier just to produce it as is (with InputStreamResource and ResponseEntity)

huangapple
  • 本文由 发表于 2023年7月6日 21:20:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/76629303.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定