使用WebFlux实时压缩来自S3的多个文件

huangapple go评论68阅读模式
英文:

Compressing multiple files from S3 using WebFlux on the fly

问题

您需要在S3中的一组对象中动态创建一个zip文件,并将其作为Flux<ByteBuffer>(或等效项)返回。这个zip文件可能很大(100 MB到几 GB),所以无法将所有对象下载到RAM或磁盘。

当前的实现虽然复杂(使用PipedInputStream/PipedOutputStream),在较小的文件上工作正常,但在较大的输入(总共约1 GB的原始大小,数十个输入文件)上会崩溃。

是否有使用WebFlux的完全流式zip库的实现?如果有比ZipOutputStream更符合惯用法的实现,我可以切换到tar/gzip。

TIA 使用WebFlux实时压缩来自S3的多个文件

编辑:堆栈跟踪

// 请注意,此部分为堆栈跟踪,不需要翻译

如果您需要更多帮助,请告诉我。

英文:

I need to create a zip file on the fly from a set of objects in S3 and return it as a Flux<ByteBuffer> (or equivalent). The zip can be large (100's of MB's, possibly several GB's, so downloading all the objects to either RAM or disk is not an option)

The current implementation, while complicated (using PipedInputStream/PipedOutputStream), works on smaller files but crashes on larger inputs (10's of input files for a total of ~ 1 GB raw size)

Is there any implementation of a fully streamed zip library using WebFlux? I can switch to tar/gzip if a more idiomatic implementation than ZipOutputStream exists for tgz

TIA 使用WebFlux实时压缩来自S3的多个文件

Edit: stack trace

	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoFlatMap] :
reactor.core.publisher.Mono.flatMap
org.springframework.web.reactive.result.method.annotation.ResponseEntityResultHandler.handleResult(ResponseEntityResultHandler.java:132)
Error has been observed at the following site(s):
*____________Mono.flatMap ⇢ at org.springframework.web.reactive.result.method.annotation.ResponseEntityResultHandler.handleResult(ResponseEntityResultHandler.java:132)
|_             checkpoint ⇢ Exception handler com.refinitiv.itop.controller.advice.GlobalExceptionHandler#handleNotFoundException(RuntimeException), error="The downloading file='batch-conversion/EES_PDFTRON/results/5f0cd366-5fed-42ab-934a-cafca73f7423/21927289.pdf.json' was failed, message: Unable to execute HTTP request: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate.
Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate.
Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout.
If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests." [DispatcherHandler]
*____________Mono.flatMap ⇢ at org.springframework.web.reactive.DispatcherHandler.lambda$handleResult$5(DispatcherHandler.java:182)
*______Mono.onErrorResume ⇢ at org.springframework.web.reactive.DispatcherHandler.handleResult(DispatcherHandler.java:181)
|_                        ⇢ at org.springframework.web.reactive.DispatcherHandler.lambda$handle$2(DispatcherHandler.java:154)
*____________Mono.flatMap ⇢ at org.springframework.web.reactive.DispatcherHandler.handle(DispatcherHandler.java:154)
|_                        ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.lambda$filter$0(DefaultWebFilterChain.java:120)
*______________Mono.defer ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119)
|_                        ⇢ at com.refinitiv.itop.web.IndexFilter.filter(IndexFilter.java:23)
|_             checkpoint ⇢ com.refinitiv.itop.web.IndexFilter [DefaultWebFilterChain]
|_                        ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.lambda$filter$0(DefaultWebFilterChain.java:120)
*______________Mono.defer ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119)
|_          Mono.doOnEach ⇢ at org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter.filter(MetricsWebFilter.java:87)
|_        Mono.doOnCancel ⇢ at org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter.filter(MetricsWebFilter.java:88)
*__Mono.transformDeferred ⇢ at org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter.filter(MetricsWebFilter.java:82)
|_             checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
|_                        ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.lambda$filter$0(DefaultWebFilterChain.java:120)
*______________Mono.defer ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119)
|_                        ⇢ at org.springframework.web.server.handler.FilteringWebHandler.handle(FilteringWebHandler.java:59)
|_                        ⇢ at org.springframework.web.server.handler.WebHandlerDecorator.handle(WebHandlerDecorator.java:56)
|_     Mono.onErrorResume ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:77)
|_     Mono.onErrorResume ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:77)
|_     Mono.onErrorResume ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:77)
|_       Mono.doOnSuccess ⇢ at org.springframework.web.server.adapter.HttpWebHandlerAdapter.handle(HttpWebHandlerAdapter.java:249)
|_     Mono.onErrorResume ⇢ at org.springframework.web.server.adapter.HttpWebHandlerAdapter.handle(HttpWebHandlerAdapter.java:250)
*______________Mono.error ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler$CheckpointInsertingHandler.handle(ExceptionHandlingWebHandler.java:98)
|_             checkpoint ⇢ HTTP GET "/api/v1/batch-conversion/download?batchId=5f0cd366-5fed-42ab-934a-cafca73f7423&includeResults=true&includeErrors=false" [ExceptionHandlingWebHandler]
|_                        ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.lambda$handle$0(ExceptionHandlingWebHandler.java:77)
*______________Mono.error ⇢ at org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler.handle(AbstractErrorWebExceptionHandler.java:306)
|_                        ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.lambda$handle$0(ExceptionHandlingWebHandler.java:77)
*______________Mono.error ⇢ at org.springframework.web.server.handler.ResponseStatusExceptionHandler.handle(ResponseStatusExceptionHandler.java:68)
|_                        ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.lambda$handle$0(ExceptionHandlingWebHandler.java:77)
*______________Mono.error ⇢ at org.springframework.web.server.adapter.HttpWebHandlerAdapter.handleUnresolvedError(HttpWebHandlerAdapter.java:310)
|_                        ⇢ at org.springframework.web.server.adapter.HttpWebHandlerAdapter.lambda$handle$3(HttpWebHandlerAdapter.java:250)
*_______________Mono.then ⇢ at org.springframework.web.server.adapter.HttpWebHandlerAdapter.handle(HttpWebHandlerAdapter.java:251)
|_                        ⇢ at org.springframework.boot.web.reactive.context.WebServerManager$DelayedInitializationHttpHandler.handle(WebServerManager.java:98)
|_         Mono.doOnError ⇢ at org.springframework.http.server.reactive.ReactorHttpHandlerAdapter.apply(ReactorHttpHandlerAdapter.java:66)
|_       Mono.doOnSuccess ⇢ at org.springframework.http.server.reactive.ReactorHttpHandlerAdapter.apply(ReactorHttpHandlerAdapter.java:67)
|_        Mono.fromDirect ⇢ at reactor.netty.http.server.HttpServer$HttpServerHandle.lambda$onStateChange$0(HttpServer.java:962)
*____Mono.deferContextual ⇢ at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:960)
Original Stack Trace:
at org.springframework.web.reactive.result.method.annotation.AbstractMessageWriterResultHandler.writeBody(AbstractMessageWriterResultHandler.java:183)
at org.springframework.web.reactive.result.method.annotation.ResponseEntityResultHandler.lambda$handleResult$1(ResponseEntityResultHandler.java:168)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:152)
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251)
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:325)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
at org.springframework.http.server.reactive.ChannelSendOperator$WriteCompletionBarrier.onError(ChannelSendOperator.java:414)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onError(FluxConcatArray.java:207)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
at reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:351)
at reactor.netty.channel.MonoSendMany$SendManyInner.trySchedule(MonoSendMany.java:422)
at reactor.netty.channel.MonoSendMany$SendManyInner.trySuccess(MonoSendMany.java:586)
at reactor.netty.channel.MonoSendMany$SendManyInner.trySuccess(MonoSendMany.java:117)
at io.netty.util.concurrent.PromiseCombiner.tryPromise(PromiseCombiner.java:170)
at io.netty.util.concurrent.PromiseCombiner.access$600(PromiseCombiner.java:35)
at io.netty.util.concurrent.PromiseCombiner$1.operationComplete0(PromiseCombiner.java:62)
at io.netty.util.concurrent.PromiseCombiner$1.operationComplete(PromiseCombiner.java:44)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717)
at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272)
at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352)
at io.netty.channel.epoll.AbstractEpollStreamChannel.writeBytesMultiple(AbstractEpollStreamChannel.java:305)
at io.netty.channel.epoll.AbstractEpollStreamChannel.doWriteMultiple(AbstractEpollStreamChannel.java:510)
at io.netty.channel.epoll.AbstractEpollStreamChannel.doWrite(AbstractEpollStreamChannel.java:422)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:570)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:470)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
"}

答案1

得分: 0

更新:已修复!
答案是双管齐下的:
1. 将相关代码中的所有 `flatMap` 限制为 `parallelism = 1` 和 `prefetch = 1`,以限制打开的 S3 句柄数量。
2. 将 `Flux` 分成两个 `Flux`:
1. S3 读取器 - 在不同的线程上 `.subscribe()`;向包装了 `PipedOutputStream` 的 `ZipOutputStream` 写入。
2. Web 响应 - 在主线程上创建;从 `PipedInputStream` 读取。
```java
PipedInputStream pis = new PipedInputStream(streamBufferSize);
PipedOutputStream pos = new PipedOutputStream(pis);
ZipOutputStream zos = new ZipOutputStream(pos);
Flux<ByteBuffer> resultFlux = Flux.create(sink -> {
while (true) {
var bytes = new byte[streamBufferSize];
try {
int read = pis.read(bytes);
if (read > 0) {
sink.next(ByteBuffer.wrap(bytes, 0, read));
} else {
sink.complete();
break;
}
} catch (IOException e) {
log.error("对 {} 的 zip 输出流崩溃", batchId, e);
sink.error(e);
break;
}
}
});
var reader = new Runnable() {
@Override
public void run() {
s3DownloadsFlux.map(chunk -> {
// 省略:实际写入 `ZipOutputStream` 和错误处理的细节
return new Object(); // 返回值无关紧要
}).subscribe();
}
};
new Thread(reader).start();
return resultFlux;
英文:

Update: Fixed!

The answer was 2-pronged:

  1. Limit all flatMaps in the relevant code to parallelism = 1 & prefetch = 1 to limit the amount of open S3 handles
  2. Break the Flux into 2 Fluxes:
    1. S3 reader - .subscribe() on a different Thread; writes to a ZipOutputStream wrapping a PipedOutputStream
    2. Web response - created on the main thread; reads from the PipedInputStream
PipedInputStream pis = new PipedInputStream(streamBufferSize);
PipedOutputStream pos = new PipedOutputStream(pis);
ZipOutputStream zos = new ZipOutputStream(pos);
Flux&lt;ByteBuffer&gt; resultFlux = Flux.create(sink -&gt; {
while (true) {
var bytes = new byte[streamBufferSize];
try {
int read = pis.read(bytes);
if (read &gt; 0) {
sink.next(ByteBuffer.wrap(bytes, 0, read));
} else {
sink.complete();
break;
}
} catch (IOException e) {
log.error(&quot;zip output stream crashed for {}&quot;, batchId, e);
sink.error(e);
break;
}
}
});
var reader = new Runnable() {
@Override
public void run() {
s3DownloadsFlux.map(chunk -&gt; {
//snipped: minutiae of actually writing to the `ZipOutputStream` and error handling
return new Object(); //doesn&#39;t matter what is returned
}).subscribe();
}
};
new Thread(reader).start();
return resultFlux;

huangapple
  • 本文由 发表于 2023年6月26日 21:21:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/76557101.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定