Understanding DataBufferUtils.write to OutputStream

huangapple go评论70阅读模式
英文:

Understanding DataBufferUtils.write to OutputStream

问题

我正在从内部服务器下载大型文档,使用一种代理方式。简而言之,我有一个Flux<DataBuffer>和一个OutputStream。所以我想我可以使用

org.springframework.core.io.buffer.DataBufferUtils#write(
    org.reactivestreams.Publisher<org.springframework.core.io.buffer.DataBuffer>,
    java.io.OutputStream
)

但是它返回一个Flux,我不确定该怎么处理它。

Flux.blockFirst()只会处理文件的开头。

Flux.blockLast()会记录数千个资源泄漏。我认为这是有道理的,因为特定的write()方法明确说明调用者负责释放DataBuffer。

当我从单元测试中运行代码时,Flux.subscribe(DataBufferUtils.releaseConsumer())没有任何作用。我猜测是因为在Spring有机会启动处理之前,虚拟机就终止了。

来自这个答案的代码可以工作,但我不明白为什么:

Mono.create(sink ->
    DataBufferUtils
        .write(myFlux, outputStream)
        .subscribe(
            DataBufferUtils::release,
            sink::error,
            sink::success
        )
)
    .block()

为什么这样可以下载文档而不会有资源泄漏?将Flux包装在Mono中有什么区别?

英文:

I'm downloading huge documents from internal servers in a kind of proxy. In a nutshell, I have a Flux&lt;DataBuffer&gt; and an OutputStream. So I thought I could use

org.springframework.core.io.buffer.DataBufferUtils#write(
    org.reactivestreams.Publisher&lt;org.springframework.core.io.buffer.DataBuffer&gt;,
    java.io.OutputStream
)

but that returns a Flux<DataBuffer> and I'm not sure what to do with it.

Flux.blockFirst() will just process the start of the file.

Flux.blockLast() logs thousands of resource leaks. I think that makes sense because the specific write() method clearly states that the caller is responsible for releasing the DataBuffers.

Flux.subscribe(DataBufferUtils.releaseConsumer()) doesn't do anything when I run the code from a unit test. My guess is that the VM terminates before Spring has a chance to start the processing.

The code from this answer works but I don't understand why:

Mono.create(sink -&gt;
    DataBufferUtils
        .write(myFlux, outputStream)
        .subscribe(
            DataBufferUtils::release,
            sink::error,
            sink::success
        )
)
    .block()

Why does this download the document without resource leaks? How does wrapping the Flux in a Mono make a difference?

答案1

得分: 0

那段代码能够根据给定的Flux进行写入,并释放资源:

DataBufferUtils
    .write(myFlux, outputStream)
    .subscribe(
        DataBufferUtils::release,
        // ...
    );

write(myFlux, outputStream) 执行写入操作,DataBufferUtils::release 作为subscribe的第一个参数提供,它是一个函数,Reactor会在每个缓冲区上调用该函数。这就是如何避免内存泄漏的方法。

其余的代码,包括将其包装为Mono,只是一种阻塞的方法,直到你的flux完成,即写入完成:

Mono.create(sink ->
    // ...
        sink::error,
        sink::success
    )
)
    .block();

sink::errorsink::successsubscribe的第二个和第三个参数,它们是函数,Reactor会在原始的Flux完成(正常或异常)时调用它们。由于这里的sinkMono的sink,调用它们会完成手动创建的Mono。这就是将Flux转换为Mono的方法,当Flux完成时,Mono也完成。

最后,调用.block方法会阻塞线程,直到Mono(因此也是原始的Flux)完成。这就是实际的阻塞发生的地方。

我认为这里有一个更简单的解决方案:

DataBufferUtils.write(myFlux, outputStream).doOnNext(DataBufferUtils::release).blockLast();

实际上不需要额外的Mono,你只需要在每个缓冲区写入后调用DataBufferUtils::release,并阻塞直到Flux结束,即使用blockLast方法。你可以将doOnNext(添加副作用)替换为map(将项目转换为可能具有副作用的项目),这将把Flux<DataBuffer>转换为Flux<Boolean>,因为release方法返回布尔值。由于你既不使用缓冲区也不使用布尔值,只是阻塞直到结束,这两种方法都适合你在这里使用。

英文:

That piece of code is capable of writing given Flux and releasing resources:

DataBufferUtils
        .write(myFlux, outputStream)
        .subscribe(
            DataBufferUtils::release,

write(myFlux, outputStream) do the writing and DataBufferUtils::release is provided as a subscribe's first argument, which is a function that reactor will call on each buffer. So that's how you avoid memory leak.

Rest of code including wrapping to Mono is just an approach to block until your flux is finished, i.e. written:

Mono.create(sink -&gt;
    // ...
            sink::error,
            sink::success
        )
)
    .block()

sink::error and sink::success is 2nd and 3rd arguments of subscribe which are functions that reactor will call when original Flux is finished (normally or exceptionally), i.e. written. Since sink here is Mono's sink calling them finishes the manually created Mono. That's how you turn a Flux to a Mono, which is completed when the Flux is completed.

Finally, calling .block on that Mono blocks a thread until Mono's (and hence the original Flux) is finished. That is were the actual block happens.

I expect here is a simpler solution:

DataBufferUtils.write(myFlux, outputStream).doOnNext(DataBufferUtils::release).blockLast()

You don't actually need extra Mono, all you need is to call DataBufferUtils::release on each buffer after is was written and block until end of Flux, i.e. blockLast. You can replace doOnNext (adds a side-effect) with map (converts items with a possible side-effect), what will turn Flux&lt;DataBuffer&gt; to a Flux&lt;Boolean&gt;, since release returns boolean. Inasmuch as you don't use neither buffer's nor booleans and just block until the end, both methods are suitable for you here.

huangapple
  • 本文由 发表于 2023年8月8日 21:11:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/76859907.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定