Spring webflux convert flux.bufferTimeout(maxSize, duration) which gives Flux<List<T>> to flux.windowTimeout(maxSize, duration).concatMap()

huangapple go评论64阅读模式
英文:

Spring webflux convert flux.bufferTimeout(maxSize, duration) which gives Flux<List<T>> to flux.windowTimeout(maxSize, duration).concatMap()

问题

I am trying to achieve the same, not by using .bufferTimeout(maxSize, duration), but using flux.windowTimeout(maxSize, duration).concatMap().

I tried using concatMap as follows: flux.windowTimeout(maxSize, duration).concatMap(f -> f).

Issue: With the above code, I am getting a T, but I would like to get the same as buffer time out.

Question: What is the correct combination of windowTimeout + concatMap which is equivalent to buffer time out?

英文:

What I am trying to achieve:

Spring webflux and reactor offer a .bufferTimeout(maxSize, duration) operator, which gives a Flux<List<T>> on a Flux<T>

I am trying to achieve the same, not by using .bufferTimeout(maxSize, duration), but using flux.windowTimeout(maxSize, duration).concatMap()

What did I try:

I tried using concatMap as follow: flux.windowTimeout(maxSize, duration).concatMap(f -&gt; f)

Issue:

With the above code, I am getting a T, but I would like to get the same as buffer time out.

Question:

What is the correct combination of windowTimeout + concatMap which is equivalent to buffer time out?

答案1

得分: 1

要实现与bufferTimeout(maxSize, duration)相同的行为,使用windowTimeout(maxSize, duration)concatMap(),您可以将您的代码修改如下:

flux.windowTimeout(maxSize, duration)
    .concatMap(f -> f.collectList())

解释:

  • windowTimeout(maxSize, duration) 会根据指定的 maxSizeduration 创建元素的窗口。
  • concatMap() 会订阅每个窗口作为单独的 Flux,并按顺序发射它们的元素。
  • 在每个窗口 Flux 上调用 collectList(),您可以将每个窗口转换为单个 List 发射,这将等效于 bufferTimeout(maxSize, duration) 的行为。
  • 这个修改确保您不会发射单独的元素,而是在指定的 maxSizeduration 后接收包含缓冲元素的 Flux<List>

请注意,在这种方法中,每个发射的 List 将包含在指定窗口内收集的元素,这可能与 bufferTimeout(maxSize, duration) 的行为不同,如果源 Flux 不规则地发射元素的话。

如果您需要进一步的帮助,请告诉我!

英文:

To achieve the same behavior as bufferTimeout(maxSize, duration) using windowTimeout(maxSize, duration) and concatMap(), you can modify your code as follows:

flux.windowTimeout(maxSize, duration)
    .concatMap(f -&gt; f.collectList())

Explanation:

  • windowTimeout(maxSize, duration) will create windows of elements based on the specified maxSize and duration.
  • concatMap() will subscribe to each window as a separate Flux and emit their elements sequentially.
  • By calling collectList() on each window Flux, you can transform each window into a single List emission, which will be equivalent to the behavior of bufferTimeout(maxSize, duration).
  • This modification ensures that instead of emitting individual elements, you receive a Flux<List> containing the buffered elements after the specified maxSize or duration.

Please note that in this approach, each emitted List will contain the elements collected within the specified window, which may differ from the behavior of bufferTimeout(maxSize, duration) if the source Flux emits elements irregularly.

Let me know if you need further assistance!

huangapple
  • 本文由 发表于 2023年5月30日 03:47:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/76359983.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定