Flux.range 在达到 256 个元素后等待发出更多元素。

huangapple go评论85阅读模式
英文:

Flux.range waits to emit more element once 256 elements are reached

问题

我写了这段代码:

Flux.range(0, 300)
            .doOnNext(i -> System.out.println("i = " + i))
            .flatMap(i -> Mono.just(i)
                            .subscribeOn(Schedulers.elastic())
                            .delayElement(Duration.ofMillis(1000))
            )
            .doOnNext(i -> System.out.println("end " + i))
            .blockLast();

运行时,第一个 System.out.println 显示,Flux 在第 256 个元素时停止发出数字,然后等待较旧的元素完成后再发出新的。

为什么会发生这种情况?
为什么是 256?

英文:

I wrote this code:

Flux.range(0, 300)
            .doOnNext(i -> System.out.println("i = " + i))
            .flatMap(i -> Mono.just(i)
                            .subscribeOn(Schedulers.elastic())
                            .delayElement(Duration.ofMillis(1000))
            )
            .doOnNext(i -> System.out.println("end " + i))
            .blockLast();

When running it, the first System.out.println shows that the Flux stop emitting numbers at the 256th element, then it waits for the older to be completed before emitting new ones.

Why is this happening?
Why 256?

答案1

得分: 4

为什么会发生这种情况?

flatMap操作符可以被描述为一种操作符,它(从javadoc重新表述):

  1. 急切地订阅其内部元素。
  2. 不保留元素的顺序。
  3. 允许来自不同内部元素的值交错。

对于这个问题,第一个点很重要。Project Reactor通过concurrency参数限制了正在进行中的_内部_序列的数量。

flatMap(mapper) 使用默认参数,而 flatMap(mapper, concurrency) 重载明确接受此参数。

flatMap 的javadoc将参数描述为:

> 并发参数允许控制可以并行订阅和合并多少个发布者

考虑以下使用 concurrency = 500 的代码示例

Flux.range(0, 300)
        .doOnNext(i -> System.out.println("i = " + i))
        .flatMap(i -> Mono.just(i)
                        .subscribeOn(Schedulers.elastic())
                        .delayElement(Duration.ofMillis(1000)),
                500
//         ^^^^^^^^^^
        )
        .doOnNext(i -> System.out.println("end " + i))
        .blockLast();

在这种情况下,没有等待:

i = 297
i = 298
i = 299
end 0
end 1
end 2

相反,如果将**1作为concurrency**传递,输出将类似于:

i = 0
end 0
i = 1
end 1

在发射下一个元素之前等待一秒钟。

为什么是256?

256是flatMap并发的_默认_值。

看一下Queues.SMALL_BUFFER_SIZE

public static final int SMALL_BUFFER_SIZE = Math.max(16,
		Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));
英文:

Why this happening?

The flatMap operator can be characterized as operator that (rephrased from javadoc):

  1. subscribes to its inners eagerly
  2. does not preserve ordering of elements.
  3. lets values from different inners interleave.

For this question the first point is important. Project Reactor restricts the
number of in-flight inner sequences via concurrency parameter.

While flatMap(mapper) uses the default parameter the flatMap(mapper, concurrency) overload accepts this parameter explicitly.

The flatMaps javadoc describes the parameter as:

> The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel

Consider the following code using concurrency = 500

Flux.range(0, 300)
        .doOnNext(i -> System.out.println("i = " + i))
        .flatMap(i -> Mono.just(i)
                        .subscribeOn(Schedulers.elastic())
                        .delayElement(Duration.ofMillis(1000)),
                500
//         ^^^^^^^^^^
        )
        .doOnNext(i -> System.out.println("end " + i))
        .blockLast();

In this case there is no waiting:

i = 297
i = 298
i = 299
end 0
end 1
end 2

In contrast if you pass 1 as concurrency the output will be similar to:

i = 0
end 0
i = 1
end 1

Awaiting one second before emitting the next element.

Why 256?

256 is the default value for concurrency of flatMap.

Take a look at Queues.SMALL_BUFFER_SIZE:

public static final int SMALL_BUFFER_SIZE = Math.max(16,
		Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));

huangapple
  • 本文由 发表于 2020年8月12日 01:33:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/63363495.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定