Webflux:.map() 和 .flatMap() 在 Kafka 接收器中的行为差异

huangapple go评论51阅读模式
英文:

Webflux : Difference between .map() and .flatMap() behavior in a kafka receiver

问题

以下是您要翻译的内容:

"我已经阅读了关于project reactormap()flatMap()方法的文档,还有这里答案中的很好的解释。

但我的疑问是关于当我们使用reactorKafkaReceiver时。以下是我拥有的示例代码:

//开始消费
public Disposable consumeMessage() {
return processKafkaRecord().subscribe(record -> log.info("success"),
error -> log.error("error logged" + error));
}

public Flux<String> processKafkaRecord() {
Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(() -> inputEventReceiver.receive());
return receiverRecord.doOnNext(record -> log.info("Input Event receiver record {}", record.toString()))
.flatMap(this::processMessage)
.doOnComplete(() -> log.info("MSG=Completed consuming messages from topic={}" +"for Cancel Validation processing.", inputEventKafkaConfig.getTopic()));
}

private Flux<String> processMessage(final ReceiverRecord<String, String> receiverRecord) {
//逻辑
.flatMap(this::processOne);
.flatMap(this::processTwo);
.flatMap(this::processThree);
}

我的疑问简而言之是,如果我在processMessage中使用.map()方法而不是.flatMap()函数,对于KafkaReceiver的性能会有任何差异吗?

疑问解释:在使用KafkaReceiver中消费数据流时,我们已经使用了flux来进行消费,在processKafkaRecord方法中,调用是通过flatMap()方法进行的,因此每个单独的记录应该在此方法中以异步方式处理。

一旦到达processMessage()方法,实际上是在处理单个记录。现在,如果我的processOneprocessTwoprocessThree方法必须以同步方式进行每个单独的事件处理,那么使用flatMap()而不是map()是否有意义。

一旦在processKafkaRecord()方法中调用了flatMap()方法,内部方法将已经在每个事件的异步处理中。如果在processMessage中的每个处理都必须以同步方式进行,那么使用map更有意义吗?或者我的结论是错误的,我们应该在性能上甚至在内部方法中使用flatMap?

英文:

I have gone through the project reactor documentation for map() and flatMap() method and also a good explanation in this answer.

But my query is regarding when we use the reactor KafkaReceiver. Following code example I have :

    //start of consumption    
    public Disposable consumeMessage() {
        return processKafkaRecord().subscribe(record -&gt; log.info(&quot;success&quot;),
                error -&gt; log.error(&quot;error logged&quot; + error));
    }

    public Flux&lt;String&gt; processKafkaRecord() {
         Flux&lt;ReceiverRecord&lt;String, String&gt;&gt; receiverRecord = Flux.defer(() -&gt; inputEventReceiver.receive());
         return receiverRecord.doOnNext(record -&gt; log.info(&quot;Input Event receiver record {}&quot;, record.toString()))
            .flatMap(this::processMessage)
            .doOnComplete(() -&gt; log.info(&quot;MSG=Completed consuming messages from topic={}&quot; +&quot;for Cancel Validation processing.&quot;, inputEventKafkaConfig.getTopic()));
    }

    private Flux&lt;String&gt; processMessage(final ReceiverRecord&lt;String, String&gt; receiverRecord) {
         //logic
        .flatMap(this::processOne);
        .flatMap(this::processTwo);
        .flatMap(this::processThree);
    }

My doubt in short is if I use .map() method in processMessage instead of .flatMap() function will it have any difference in performance for the KafkaReceiver?

Doubt explained : When consuming a stream of data in KafkaReceiver we already using a flux for consumption and in processKafkaRecord method the call happens through a flatMap() method, so each individual record should be processed in async with this only.

And once we reach processMessage() method, it is actually processing a single record. Now if my processOne, processTwo and processThree method has to happen in synchronous manner for each individual event, does it make sense to use flatMap() instead of map().

Once a flatMap() method was called in processKafkaRecord() method, the inner method will already be in async for each event. And using map would make more sense if each process in the processMessage has to happen in synchronous manner?
Or I am wrong with this conclusion, and we should use flatMap even in inner method for performance?

答案1

得分: 1

不要回答我要翻译的问题。以下是要翻译的内容:

It really depends what is your processing logic but it looks like you are mixing concurrency, parallelism and asynchronous/non-blocking execution. Asynchronous execution doesn't mean fire and forget. It's more about no-blocking execution. Logic could still be sequential.

map vs flatMap

  • use flatMap to execute async/reactive logic such as http requests, db read/write, other I/O bound operations and returns Mono or Flux.
  • use map to execute sync logic such as object mapping.

Concurrency

By default, flatMap will process Queues.SMALL_BUFFER_SIZE = 256 number of in-flight inner sequences concurrently.

You could control concurrency flatMap(item -&gt; process(item), concurrency) or use concatMap operator if you want to process sequentially. Check flatMap(..., int concurrency, int prefetch) for details.

There are different "flavors" of flatMap. If you need sequential processing - use concatMap that is basically flatMap with concurrency = 1.

Kafka ordered vs unordered processing

Depending on use case there are several options.

Ordered message processing
In case message order is important and messages should be processed in the same sequence as they are sent by producer. Kafka guarantees message order per partition.

In Reactior Kafka you can do it by grouping data per partition and then process then sequentially

kafkaReceiver.receive()
        .groupBy(message -&gt; message.receiverOffset().topicPartition())
        .flatMap(partitions -&gt; partitions.concatMap(this::process));

Unordered message processing

In case sequence is not imporant and messages could be processed in any order we could increase throughput by processing multiple messages in parallel.

kafkaReceiver.receive()
        .flatMap(message -&gt; process(message), concurrency);

Unordered message processing will support much higher throughput on a small number of partitions. For ordered message processing you would need to increase number of partitions to increase throughput.

英文:

It really depends what is your processing logic but it looks like you are mixing concurrency, parallelism and asynchronous/non-blocking execution. Asynchronous execution doesn't mean fire and forget. It's more about no-blocking execution. Logic could still be sequential.

map vs flatMap

  • use flatMap to execute async/reactive logic such as http requests, db read/write, other I/O bound operations and returns Mono or Flux.
  • use map to execute sync logic such as object mapping.

Concurrency

By default, flatMap will process Queues.SMALL_BUFFER_SIZE = 256 number of in-flight inner sequences concurrently.

You could control concurrency flatMap(item -&gt; process(item), concurrency) or use concatMap operator if you want to process sequentially. Check flatMap(..., int concurrency, int prefetch) for details.

There are different "flavors" of flatMap. If you need sequential processing - use concatMap that is basically flatMap with concurrency = 1.

Kafka ordered vs unordered processing

Depending on use case there are several options.

Ordered message processing
In case message order is important and messages should be processed in the same sequence as they are sent by producer. Kafka guarantees message order per partition.

In Reactior Kafka you can do it by grouping data per partition and then process then sequentially

kafkaReceiver.receive()
        .groupBy(message -&gt; message.receiverOffset().topicPartition())
        .flatMap(partitions -&gt; partitions.concatMap(this::process));

Unordered message processing

In case sequence is not imporant and messages could be processed in any order we could increase throughput by processing multiple messages in parallel.

kafkaReceiver.receive()
        .flatMap(message -&gt; process(message), concurrency);

Unordered message processing will support much higher throughput on a small number of partitions. For ordered message processing you would need to increase number of partitions to increase throughput.

huangapple
  • 本文由 发表于 2023年2月26日 20:49:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/75572080.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定