英文:
How to emit cumulative sum only from a reactive stream?
问题
我有一个使用情况,其中流只在累积的 "sum" 等于或超过给定值 n 时才发出。让我们以 n 为 5 的六个整数的示例来说明。
+---+------+---------+
| i | 发出 | 总和 |
+---+------+---------+
| 1 | - | 1 |
| 2 | - | 3 |
| 3 | 5 | 1 |
| 4 | 5 | 0 |
| 5 | 5 | 0 |
| 2 | 2 | 0 (终止) |
+---+------+---------+
如您所见,只有在总和等于或超过 5 时才发出任何内容,除了最后一个元素,无论如何都会发出它。
一旦发出一个项目,总和就会减少该值 (n)。实际上,我正在从网络调用中读取数据,然后将它们发送给下游消费者,后者只接受固定大小的块,当然最后一个除外 (上游已完成)。
我正在使用 Reactor 项目 Flux 作为 Publisher
;我在其上找不到任何允许我执行上述操作的方法。scan
方法最接近,但它还会发出需要被过滤掉的中间元素。
英文:
I've a use case where the stream should only emit when the cumulative "sum" equals or exceeds a given value, n. Let's take the example of six integers with n = 5.
+---+------+---------+
| i | Emit | Sum |
+---+------+---------+
| 1 | - | 1 |
| 2 | - | 3 |
| 3 | 5 | 1 |
| 4 | 5 | 0 |
| 5 | 5 | 0 |
| 2 | 2 | 0 (end) |
+---+------+---------+
As you can see, nothing is emitted unless the sum equals or exceeds 5, except for the last element, which is emitted anyway.
Once an item is emitted, the sum gets reduced by that value (n). In reality, I'm reading data from a network call, and subsequently sending them to a downstream consumer who only accepts fixed size chunks, except for the last one, of course (upstream completed).
I'm using project Reactor Flux as the Publisher
; I couldn't find any method on it that allows me do what is shown above. scan
comes closest, but it also emits intermediate elements that need to be filtered out.
答案1
得分: 1
在现实中,我正在从网络调用中读取数据,随后将它们发送给下游的消费者,该消费者仅接受固定大小的数据块,当然最后一个除外(上游已完成)。
我想到自己尝试拆分响应的 `Flux` 可能有点晚,并且相当困难;相反,我可以使用类似 Netty 的 [FixedLengthFrameDecoder](https://netty.io/4.1/api/io/netty/handler/codec/FixedLengthFrameDecoder.html),它恰好满足了我的需求。
这让我想到了 [reactor-netty](https://github.com/reactor/reactor-netty) 的源代码,经过深入挖掘,我找到了我需要的东西。
```kotlin
fun get(url: String, maxChunkSize: Int): List<ByteArray> {
return HttpClient.create()
.httpResponseDecoder { it.maxChunkSize(maxChunkSize) }
.get()
.uri(url)
.responseContent()
.asByteArray()
.collectList()
.block()!!
}
关键部分是 httpResponseDecoder { it.maxChunkSize(maxChunkSize) }
;一个单元测试证明了它的工作原理:
@Test
fun testHonorsMaxChunkSize() {
val maxChunkSize = 4096
val chunks = FixedLengthResponseFrameClient.get(
"http://doesnotexist.nowhere/binary", maxChunkSize
)
assertThat(chunks.subList(0, chunks.size - 1))
.allMatch { it.size == maxChunkSize}
assertThat(chunks.last().size).isLessThanOrEqualTo(maxChunkSize)
}
可以通过以下方式使用 WebClient
配置自定义的 HttpClient
(使用了 httpResponseDecoder
):
WebClient
.builder()
.clientConnector(ReactorClientHttpConnector(httpClient))
.build()
.get()
.uri("uri")
.exchange()
.flatMapMany { it.body(BodyExtractors.toDataBuffers()) }
...
这些缓冲区的大小将是 HttpClient.httpResponseDecoder
中设置的大小(默认为 8192 Kb)。
<details>
<summary>英文:</summary>
> In reality, I'm reading data from a network call, and subsequently
> sending them to a downstream consumer who only accepts fixed size
> chunks, except for the last one, of course (upstream completed).
It occurred to me that trying to split the response `Flux` myself is probably little late and quite difficult; instead, I could use something like Netty [FixedLengthFrameDecoder](https://netty.io/4.1/api/io/netty/handler/codec/FixedLengthFrameDecoder.html), which does exactly what I'm looking for.
That led me to [reactor-netty](https://github.com/reactor/reactor-netty) source code, and after extensive digging, I found exactly what I needed.
fun get(url: String, maxChunkSize: Int): List<ByteArray> {
return HttpClient.create()
.httpResponseDecoder { it.maxChunkSize(maxChunkSize) }
.get()
.uri(url)
.responseContent()
.asByteArray()
.collectList()
.block()!!
}
The crucial part is `httpResponseDecoder { it.maxChunkSize(maxChunkSize) }`; a unit test proves this to be working:
@Test
fun testHonorsMaxChunkSize() {
val maxChunkSize = 4096
val chunks = FixedLengthResponseFrameClient.get(
"http://doesnotexist.nowhere/binary", maxChunkSize
)
assertThat(chunks.subList(0, chunks.size - 1))
.allMatch { it.size == maxChunkSize}
assertThat(chunks.last().size).isLessThanOrEqualTo(maxChunkSize)
}
`WebClient` can be configured with a custom `HttpClient` (configured with `httpResponseDecoder`) as shown below:
WebClient
.builder()
.clientConnector(ReactorClientHttpConnector(httpClient))
.build()
.get()
.uri("uri")
.exchange()
.flatMapMany { it.body(BodyExtractors.toDataBuffers()) }
...
The size of these buffers would be what's set in the `HttpClient.httpResponseDecoder` (8192 Kb by default).
</details>
# 答案2
**得分**: 0
这是无法直接在 `Flux` 对象上完成的操作,但如果您可以访问创建 `Flux` 对象的资源,那么您可能可以实现解决方案。由于在流(Flux)内部,您无法访问先前元素,您可以创建一个索引的 `Flux`,然后直接从该索引的 `Flux` 中访问此资源(因为它是只读操作)。例如,像这样:
```java
List<Integer> list = List.of(1, 2, 3, 4, 5, 2);
AtomicReference<Integer> atomicSum = new AtomicReference<>(0);
return Flux.fromStream(IntStream.range(0, list.size() - 1).boxed())
.flatMap(i -> {
int sum = atomicSum.updateAndGet((integer -> integer + list.get(i)));
if (sum >= 5) {
atomicSum.updateAndGet(integer -> integer - 5);
return Flux.just(5);
}
return (i.equals(list.size() - 1))
? Flux.just(list.get(i)) // emit last element even if sum was not 5
: Flux.empty();
}); // emitted element's
请注意,这不是一个良好的做法,我不建议使用这种解决方案。Flux
对象的处理可能会在不同线程之间切换,因此如果在 Flux
外部修改对象,您应该以同步的方式进行操作(因此使用了 AtomicReference
)。列表仅用于只读操作,因此是可以的。此外,我不知道该代码的这一部分是否实际可行,但我想向您展示,如果您可以访问创建 Flux
对象的资源,可以如何找到解决方案。
编辑:即使这种解决方案也不会起作用。我弄错了,Flux
对象不会在线程之间切换,但可能会由多个线程处理,导致单个原子引用处于无效状态。这仍然可以通过一些同步机制来解决,例如锁,而不是原子引用,但这远远超出了普通开发者的经验。您确定不能使用 scan()
函数吗?因为您可以提供自己的累加器函数作为参数。
英文:
This is not possible to do directly on Flux
object, but you might achieve solution if you have access to resource from which the Flux
object is created. Since inside stream (Flux) you are not able access previous element's you can create Flux
over indices to you resource and access this resource (since its read only operation) from that Flux
of indices directly. For example something like this:
List<Integer> list = List.of(1, 2, 3, 4, 5, 2);
AtomicReference<Integer> atomicSum = new AtomicReference<>(0);
return Flux.fromStream(IntStream.range(0, list.size() - 1).boxed())
.flatMap(i -> {
int sum = atomicSum.updateAndGet((integer -> integer + list.get(i)));
if (sum >= 5) {
atomicSum.updateAndGet(integer -> integer - 5);
return Flux.just(5);
}
return (i.equals(list.size() -1))
? Flux.just(list.get(i)) // emit last element even if sum was not 5
: Flux.empty();
}); // emitted element's
Note that this is not good practice to do and i don't advice such solution. Flux
objects processing might skip between thread's, so if you modify object outside of Flux
you should do it in synchronized way (therefore usage of AtomicReference
). List is used only for read-only operation's and therefore it's OK. Also i don't know if that part of the code will actually work but i wanted to show you how you might find solution if you have access to resource over which your Flux
object is created.
Edit: even such solution would not work. I have mistaken myslef, Flux
object don't skip between threads but might be processed by multiple threads leading single atomic reference to invalid state. This cloud still be solved with some synchronizing mechanism like lock's instead of Atomic reference but is far beyond average developer experience. Are you sure you cannot use scan()
function since you can provided your own accumulator function as argument?
答案3
得分: 0
如果您需要保持一个持续的总计或以其他方式维护源自于您的 Flux 的状态,一种方法是创建一个新的 Flux,订阅第一个 Flux,并使用订阅来维护状态,例如:
Flux<Long> flux = Flux.just(1L, 2L, 3L, 4L, 5L);
Sinks.Many<Long> runningTotalSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Long> runningTotalFlux = runningTotalSink.asFlux()
.doOnSubscribe(subscription -> {
AtomicLong runningTotal = new AtomicLong();
flux
.doOnCancel(subscription::cancel)
.doOnError(runningTotalSink::tryEmitError)
.doOnComplete(runningTotalSink::tryEmitComplete)
.subscribe(i -> {
runningTotalSink.tryEmitNext(runningTotal.accumulateAndGet(i, Long::sum));
});
});
runningTotalFlux.toStream().forEach(i -> {
System.out.println(i);
});
英文:
If you need to keep a running total or otherwise maintain state from which your flux is derived, a way to go about it is to create a new flux that subscribes to the first flux, and maintains the state with the subscription, e.g
Flux<Long> flux = Flux.just(1L, 2L, 3L, 4L, 5L);
Sinks.Many<Long> runningTotalSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Long> runningTotalFlux = runningTotalSink.asFlux()
.doOnSubscribe(subscription -> {
AtomicLong runningTotal = new AtomicLong();
flux
.doOnCancel(subscription::cancel)
.doOnError(runningTotalSink::tryEmitError)
.doOnComplete(runningTotalSink::tryEmitComplete)
.subscribe(i -> {
runningTotalSink.tryEmitNext(runningTotal.accumulateAndGet(i, Long::sum));
});
});
runningTotalFlux.toStream().forEach(i -> {
System.out.println(i);
});
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论