英文:
Flux.range waits to emit more element once 256 elements are reached
问题
我写了这段代码:
Flux.range(0, 300)
.doOnNext(i -> System.out.println("i = " + i))
.flatMap(i -> Mono.just(i)
.subscribeOn(Schedulers.elastic())
.delayElement(Duration.ofMillis(1000))
)
.doOnNext(i -> System.out.println("end " + i))
.blockLast();
运行时,第一个 System.out.println
显示,Flux 在第 256 个元素时停止发出数字,然后等待较旧的元素完成后再发出新的。
为什么会发生这种情况?
为什么是 256?
英文:
I wrote this code:
Flux.range(0, 300)
.doOnNext(i -> System.out.println("i = " + i))
.flatMap(i -> Mono.just(i)
.subscribeOn(Schedulers.elastic())
.delayElement(Duration.ofMillis(1000))
)
.doOnNext(i -> System.out.println("end " + i))
.blockLast();
When running it, the first System.out.println
shows that the Flux stop emitting numbers at the 256th element, then it waits for the older to be completed before emitting new ones.
Why is this happening?
Why 256?
答案1
得分: 4
为什么会发生这种情况?
flatMap
操作符可以被描述为一种操作符,它(从javadoc重新表述):
- 急切地订阅其内部元素。
- 不保留元素的顺序。
- 允许来自不同内部元素的值交错。
对于这个问题,第一个点很重要。Project Reactor通过concurrency
参数限制了正在进行中的_内部_序列的数量。
而 flatMap(mapper)
使用默认参数,而 flatMap(mapper, concurrency)
重载明确接受此参数。
flatMap
的javadoc将参数描述为:
> 并发参数允许控制可以并行订阅和合并多少个发布者
考虑以下使用 concurrency
= 500
的代码示例
Flux.range(0, 300)
.doOnNext(i -> System.out.println("i = " + i))
.flatMap(i -> Mono.just(i)
.subscribeOn(Schedulers.elastic())
.delayElement(Duration.ofMillis(1000)),
500
// ^^^^^^^^^^
)
.doOnNext(i -> System.out.println("end " + i))
.blockLast();
在这种情况下,没有等待:
i = 297
i = 298
i = 299
end 0
end 1
end 2
相反,如果将**1
作为concurrency
**传递,输出将类似于:
i = 0
end 0
i = 1
end 1
在发射下一个元素之前等待一秒钟。
为什么是256?
256是flatMap
并发的_默认_值。
public static final int SMALL_BUFFER_SIZE = Math.max(16,
Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));
英文:
Why this happening?
The flatMap
operator can be characterized as operator that (rephrased from javadoc):
- subscribes to its inners eagerly
- does not preserve ordering of elements.
- lets values from different inners interleave.
For this question the first point is important. Project Reactor restricts the
number of in-flight inner sequences via concurrency
parameter.
While flatMap(mapper)
uses the default parameter the flatMap(mapper, concurrency)
overload accepts this parameter explicitly.
The flatMap
s javadoc describes the parameter as:
> The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel
Consider the following code using concurrency
= 500
Flux.range(0, 300)
.doOnNext(i -> System.out.println("i = " + i))
.flatMap(i -> Mono.just(i)
.subscribeOn(Schedulers.elastic())
.delayElement(Duration.ofMillis(1000)),
500
// ^^^^^^^^^^
)
.doOnNext(i -> System.out.println("end " + i))
.blockLast();
In this case there is no waiting:
i = 297
i = 298
i = 299
end 0
end 1
end 2
In contrast if you pass 1
as concurrency
the output will be similar to:
i = 0
end 0
i = 1
end 1
Awaiting one second before emitting the next element.
Why 256?
256 is the default value for concurrency of flatMap
.
Take a look at Queues.SMALL_BUFFER_SIZE
:
public static final int SMALL_BUFFER_SIZE = Math.max(16,
Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论