英文:
Webflux : Difference between .map() and .flatMap() behavior in a kafka receiver
问题
以下是您要翻译的内容:
"我已经阅读了关于project reactor
中map()
和flatMap()
方法的文档,还有这里答案中的很好的解释。
但我的疑问是关于当我们使用reactor
的KafkaReceiver
时。以下是我拥有的示例代码:
//开始消费
public Disposable consumeMessage() {
return processKafkaRecord().subscribe(record -> log.info("success"),
error -> log.error("error logged" + error));
}
public Flux<String> processKafkaRecord() {
Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(() -> inputEventReceiver.receive());
return receiverRecord.doOnNext(record -> log.info("Input Event receiver record {}", record.toString()))
.flatMap(this::processMessage)
.doOnComplete(() -> log.info("MSG=Completed consuming messages from topic={}" +"for Cancel Validation processing.", inputEventKafkaConfig.getTopic()));
}
private Flux<String> processMessage(final ReceiverRecord<String, String> receiverRecord) {
//逻辑
.flatMap(this::processOne);
.flatMap(this::processTwo);
.flatMap(this::processThree);
}
我的疑问简而言之是,如果我在processMessage
中使用.map()
方法而不是.flatMap()
函数,对于KafkaReceiver
的性能会有任何差异吗?
疑问解释:在使用KafkaReceiver
中消费数据流时,我们已经使用了flux
来进行消费,在processKafkaRecord
方法中,调用是通过flatMap()
方法进行的,因此每个单独的记录应该在此方法中以异步方式处理。
一旦到达processMessage()
方法,实际上是在处理单个记录。现在,如果我的processOne
、processTwo
和processThree
方法必须以同步方式进行每个单独的事件处理,那么使用flatMap()
而不是map()
是否有意义。
一旦在processKafkaRecord()
方法中调用了flatMap()方法,内部方法将已经在每个事件的异步处理中。如果在processMessage中的每个处理都必须以同步方式进行,那么使用map更有意义吗?或者我的结论是错误的,我们应该在性能上甚至在内部方法中使用flatMap?
英文:
I have gone through the project reactor
documentation for map()
and flatMap()
method and also a good explanation in this answer.
But my query is regarding when we use the reactor
KafkaReceiver
. Following code example I have :
//start of consumption
public Disposable consumeMessage() {
return processKafkaRecord().subscribe(record -> log.info("success"),
error -> log.error("error logged" + error));
}
public Flux<String> processKafkaRecord() {
Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(() -> inputEventReceiver.receive());
return receiverRecord.doOnNext(record -> log.info("Input Event receiver record {}", record.toString()))
.flatMap(this::processMessage)
.doOnComplete(() -> log.info("MSG=Completed consuming messages from topic={}" +"for Cancel Validation processing.", inputEventKafkaConfig.getTopic()));
}
private Flux<String> processMessage(final ReceiverRecord<String, String> receiverRecord) {
//logic
.flatMap(this::processOne);
.flatMap(this::processTwo);
.flatMap(this::processThree);
}
My doubt in short is if I use .map()
method in processMessage
instead of .flatMap()
function will it have any difference in performance for the KafkaReceiver
?
Doubt explained : When consuming a stream of data in KafkaReceiver
we already using a flux
for consumption and in processKafkaRecord
method the call happens through a flatMap()
method, so each individual record should be processed in async with this only.
And once we reach processMessage()
method, it is actually processing a single record. Now if my processOne
, processTwo
and processThree
method has to happen in synchronous manner for each individual event, does it make sense to use flatMap()
instead of map()
.
Once a flatMap() method was called in processKafkaRecord() method, the inner method will already be in async for each event. And using map would make more sense if each process in the processMessage has to happen in synchronous manner?
Or I am wrong with this conclusion, and we should use flatMap even in inner method for performance?
答案1
得分: 1
不要回答我要翻译的问题。以下是要翻译的内容:
It really depends what is your processing logic but it looks like you are mixing concurrency, parallelism and asynchronous/non-blocking execution. Asynchronous execution doesn't mean fire and forget. It's more about no-blocking execution. Logic could still be sequential.
map
vs flatMap
- use
flatMap
to execute async/reactive logic such as http requests, db read/write, other I/O bound operations and returnsMono
orFlux
. - use
map
to execute sync logic such as object mapping.
Concurrency
By default, flatMap
will process Queues.SMALL_BUFFER_SIZE = 256
number of in-flight inner sequences concurrently.
You could control concurrency flatMap(item -> process(item), concurrency)
or use concatMap
operator if you want to process sequentially. Check flatMap(..., int concurrency, int prefetch) for details.
There are different "flavors" of flatMap
. If you need sequential processing - use concatMap
that is basically flatMap
with concurrency = 1.
Kafka ordered vs unordered processing
Depending on use case there are several options.
Ordered message processing
In case message order is important and messages should be processed in the same sequence as they are sent by producer. Kafka guarantees message order per partition.
In Reactior Kafka you can do it by grouping data per partition and then process then sequentially
kafkaReceiver.receive()
.groupBy(message -> message.receiverOffset().topicPartition())
.flatMap(partitions -> partitions.concatMap(this::process));
Unordered message processing
In case sequence is not imporant and messages could be processed in any order we could increase throughput by processing multiple messages in parallel.
kafkaReceiver.receive()
.flatMap(message -> process(message), concurrency);
Unordered message processing will support much higher throughput on a small number of partitions. For ordered message processing you would need to increase number of partitions to increase throughput.
英文:
It really depends what is your processing logic but it looks like you are mixing concurrency, parallelism and asynchronous/non-blocking execution. Asynchronous execution doesn't mean fire and forget. It's more about no-blocking execution. Logic could still be sequential.
map
vs flatMap
- use
flatMap
to execute async/reactive logic such as http requests, db read/write, other I/O bound operations and returnsMono
orFlux
. - use
map
to execute sync logic such as object mapping.
Concurrency
By default, flatMap
will process Queues.SMALL_BUFFER_SIZE = 256
number of in-flight inner sequences concurrently.
You could control concurrency flatMap(item -> process(item), concurrency)
or use concatMap
operator if you want to process sequentially. Check flatMap(..., int concurrency, int prefetch) for details.
There are different "flavors" of flatMap
. If you need sequential processing - use concatMap
that is basically flatMap
with concurrency = 1.
Kafka ordered vs unordered processing
Depending on use case there are several options.
Ordered message processing
In case message order is important and messages should be processed in the same sequence as they are sent by producer. Kafka guarantees message order per partition.
In Reactior Kafka you can do it by grouping data per partition and then process then sequentially
kafkaReceiver.receive()
.groupBy(message -> message.receiverOffset().topicPartition())
.flatMap(partitions -> partitions.concatMap(this::process));
Unordered message processing
In case sequence is not imporant and messages could be processed in any order we could increase throughput by processing multiple messages in parallel.
kafkaReceiver.receive()
.flatMap(message -> process(message), concurrency);
Unordered message processing will support much higher throughput on a small number of partitions. For ordered message processing you would need to increase number of partitions to increase throughput.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论