英文:
Webflux streaming DataBuffer with zip resulting in corrupt file
问题
Currently I'm implementing a non-blocking I/O application with Spring Boot 2.7.12 and Webflux to download files with a WebClient, zip them, and stream the zip file to the browser.
Downloading and zipping is working fine if I write the ZipOutputStream to a local file. However, if I stream the zip back to the caller (Flux
I'm not sure if I misunderstand the concept of the DataBuffer or if it is a bug in the Spring framework.
I have created a small sample. If you download the zip file, each entry is duplicated multiple times, and the last entry is corrupted.
Thank you,
Roberto
英文:
Currently I'm implementing a non blocking i/o application with Spring Boot 2.7.12 and Webflux to download files with a webclient, zip them and stream the zip file to the browser.
Downloading and zipping is working fine if I write the ZipOutputStream to a local file.
However, if I stream the zip back to the caller (Flux<DataBuffer>) it is corrupted.
I'm not sure, if I misunderstand the concept of the DataBuffer, or if it is a bug in the spring framework.
I have created a small sample. If you download the zip file, each entry is duplicated multiple times and the last entry is corrupted.
Thank you
Roberto
@GetMapping(value = "/zip", produces = "application/zip")
public Flux<DefaultDataBuffer> zip() {
var files = Arrays.asList("File1", "File2", "File3", "File4", "File5");
var responseDataBuffer = new DefaultDataBufferFactory().allocateBuffer();
ZipOutputStream zipOutputStream = new ZipOutputStream(responseDataBuffer.asOutputStream());
return Flux.fromStream(files.stream())
.map(file -> putZipEntry(file, zipOutputStream))
.map(x -> responseDataBuffer)
.doOnComplete(() -> closeZipOutputStream(zipOutputStream));
}
private void closeZipOutputStream(ZipOutputStream zipOutputStream) {
try {
zipOutputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private ZipOutputStream putZipEntry(String file, ZipOutputStream zipOutputStream) {
try {
zipOutputStream.putNextEntry(new ZipEntry(file + ".txt"));
zipOutputStream.write(file.getBytes());
zipOutputStream.closeEntry();
return zipOutputStream;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
答案1
得分: 1
以下是翻译好的代码部分:
private static final String ZIP_FILE = "UEsDBAoAAAAAAN1s5lYZ+CeZBgAAAAYAAAAJAAAAdGVzdDEudHh0VGVzdCAxUEsDBAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJAAAAdGVzdDIudHh0VGVzdCAyUEsDBAoAAAAAAPhs5lY1mSl3BgAAAAYAAAAJAAAAdGVzdDMudHh0VGVzdCAzUEsDBAoAAAAAAPts5laWDE3pBgAAAAYAAAAJAAAAdGVzdDQudHh0VGVzdCA0UEsDBAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJAAAAdGVzdDUudHh0VGVzdCA1UEsBAj8ACgAAAAAA3WzmVhn4J5kGAAAABgAAAAkAJAAAAAAAAAAgAAAAAAAAAHRlc3QxLnR4dAoAIAAAAAAAAQAYAN4h/3L+r9kBCLkhXwKw2QEkqm9t/q/ZAVBLAQI/AAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJACQAAAAAAAAAIAAAAC0AAAB0ZXN0Mi50eHQKACAAAAAAAAEAGAD2Z+2M/q/ZAVPgIV8CsNkBbwSLdP6v2QFQSwECPwAKAAAAAAD4bOZWNZkpdwYAAAAGAAAACQAkAAAAAAAAACAAAABaAAAAdGVzdDMudHh0CgAgAAAAAAABABgAGsOxkP6v2QHcByJfArDZAW2Je3b+r9kBUEsBAj8ACgAAAAAA+2zmVpYMTekGAAAABgAAAAkAJAAAAAAAAAAgAAAAhwAAAHRlc3Q0LnR4dAoAIAAAAAAAAQAYAKkIkZT+r9kBWlUiXwKw2QGkwld4/q/ZAVBLAQI/AAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJACQAAAAAAAAAIAAAALQAAAB0ZXN0NS50eHQKACAAAAAAAAEAGADtvU2Y/q/ZAZ/xIl8CsNkBEj5/d/6v2QFQSwUGAAAAAAUABQDHAQAA4QAAAAAA";
private static final int BUFFER_SIZE = 10;
@GetMapping(value = "/zip", produces = "application/zip")
public Flux<DataBuffer> test() {
return DataBufferUtils.readInputStream(() -> new ByteArrayInputStream(Base64.getDecoder().decode(ZIP_FILE)), new DefaultDataBufferFactory(), BUFFER_SIZE);
}
请注意,代码中的HTML转义字符(例如"
)已被还原为双引号。
英文:
Why all the nonsense?
private static final String ZIP_FILE = "UEsDBAoAAAAAAN1s5lYZ+CeZBgAAAAYAAAAJAAAAdGVzdDEudHh0VGVzdCAxUEsDBAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJAAAAdGVzdDIudHh0VGVzdCAyUEsDBAoAAAAAAPhs5lY1mSl3BgAAAAYAAAAJAAAAdGVzdDMudHh0VGVzdCAzUEsDBAoAAAAAAPts5laWDE3pBgAAAAYAAAAJAAAAdGVzdDQudHh0VGVzdCA0UEsDBAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJAAAAdGVzdDUudHh0VGVzdCA1UEsBAj8ACgAAAAAA3WzmVhn4J5kGAAAABgAAAAkAJAAAAAAAAAAgAAAAAAAAAHRlc3QxLnR4dAoAIAAAAAAAAQAYAN4h/3L+r9kBCLkhXwKw2QEkqm9t/q/ZAVBLAQI/AAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJACQAAAAAAAAAIAAAAC0AAAB0ZXN0Mi50eHQKACAAAAAAAAEAGAD2Z+2M/q/ZAVPgIV8CsNkBbwSLdP6v2QFQSwECPwAKAAAAAAD4bOZWNZkpdwYAAAAGAAAACQAkAAAAAAAAACAAAABaAAAAdGVzdDMudHh0CgAgAAAAAAABABgAGsOxkP6v2QHcByJfArDZAW2Je3b+r9kBUEsBAj8ACgAAAAAA+2zmVpYMTekGAAAABgAAAAkAJAAAAAAAAAAgAAAAhwAAAHRlc3Q0LnR4dAoAIAAAAAAAAQAYAKkIkZT+r9kBWlUiXwKw2QGkwld4/q/ZAVBLAQI/AAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJACQAAAAAAAAAIAAAALQAAAB0ZXN0NS50eHQKACAAAAAAAAEAGADtvU2Y/q/ZAZ/xIl8CsNkBEj5/d/6v2QFQSwUGAAAAAAUABQDHAQAA4QAAAAAA";
private static final int BUFFER_SIZE = 10;
@GetMapping(value = "/zip", produces = "application/zip")
public Flux<DataBuffer> test() {
return DataBufferUtils.readInputStream(() -> new ByteArrayInputStream(Base64.getDecoder().decode(ZIP_FILE)), new DefaultDataBufferFactory(), BUFFER_SIZE);
}
答案2
得分: 0
getFluxPublisherFunction
方法不正确,逻辑不合理。它所做的事情是
>> DataBufferUtils.write(source, outputStream)
source是来自ZIP文件的DataBuffer的Flux。OutputStream是使用单独的缓冲区(命名为defaultDataBuffer)创建的另一个数据缓冲区。这个方法返回与source中相同的缓冲区
>> .map(buffer -> defaultDataBuffer)
对于每个缓冲区(ZIP文件的一部分),它返回一个完整的目标缓冲区,该缓冲区使用写操作进行填充。目标缓冲区在任何时候都可能包含ZIP文件的一个或多个部分。
- 代码之所以在有1000个缓冲区时有效,是因为这足以容纳整个ZIP文件的内容。在这种情况下,当订阅
DataBufferUtils.write(source, outputStream)
时,getRead
方法返回单个DataBuffer。当缓冲区大小较小时,这不起作用,因为您返回了在defaultDataBuffer中累积的部分块。 - 要将ZIP返回给浏览器,只需返回由
getRead
方法创建的不同缓冲区的Flux<DataBuffer>。它适用于缓冲区大小为10、100或1000的情况。
英文:
getFluxPublisherFunction
is incorrect and logic does not make sense. What it is doing is
>> DataBufferUtils.write(source, outputStream)
source is Flux of DataBuffer from ZIP_FILE. OutputStream is another data buffer created with separate buffer (named defaultDataBuffer). This method returns same buffers as in source
>> .map(buffer -> defaultDataBuffer)
For each buffer ( chunk of ZIP_file), it returns complete destimation buffer which is getting populated using write operation. Destination buffer at any point might contain one of more parts of ZIP_FILE.
- Why code works with 1000 buffer is because this is sufficient buffer to contain the entire contents of ZIP_FILE.
getRead
method in this case returns single DataBuffer whenDataBufferUtils.write(source, outputStream)
is subscribed. This does not work when buffer size is small because you are returning partial chunks accumulated in defaultDataBuffer. - To return zip to browser, simply return the Flux<DataBuffer> created by
getRead
method with different buffers. It will work of buffer size of 10 or 100 or 1000.
答案3
得分: 0
I'm not sure how Spring handles that, but to my understanding it has no knowledge of when you finish the writing and you return the buffer as soon as you have the first chunk written to it. So I guess that is what the receiver sees too. In this case, what you need is to return the Data Buffer only when it is fully written.
Try this:
return Flux.fromStream(files.stream())
.map(file -> putZipEntry(file, zipOutputStream))
.then(Mono.fromCallable {
closeZipOutputStream(zipOutputStream);
responseDataBuffer
})
PS in your case it doesn't make sense to use Reactor because you write it to a single buffer and wait until it finishes. It would be better if you produce the response in chunks, but I don't see how you can do that with the standard ZipOutputStream
. However, since it already produces an OutputStream
, it would be easier just to produce it as is (with InputStreamResource
and ResponseEntity
).
英文:
I'm not sure how Spring handles that, but to my understanding it has no knowledge of when you finish the writing and you return the buffer as soon as you have first chunk written to it. So I guess that is what the receiver sees too. In this case what you need is to return the Data Buffer only when it is fully written.
Try this:
return Flux.fromStream(files.stream())
.map(file -> putZipEntry(file, zipOutputStream))
.then(Mono.fromCallable {
closeZipOutputStream(zipOutputStream);
responseDataBuffer
})
PS in your case it doesn't make sense to use Reactor because you write it to a single buffer and wait until it finishes. It would be better if you produce response in chunks, but I don't see how you can do that with standard ZipOutputStream
. However, since it already produces an OutputStream
it would be easier just to produce it as is (with InputStreamResource
and ResponseEntity
)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论