Reactor Kafka:无论机器上有多少个CPU,消息消费始终在一个线程上。

huangapple go评论54阅读模式
英文:

Reactor Kafka: message consumption always on one thread no matter the number of CPU from machine

问题

关于Reactor Kafka的一个小问题。

我有一个非常简单的Reactor Kafka项目。

package com.example.micrometer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Consumer;

@SpringBootApplication
public class StreamReactiveConsumerApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(StreamReactiveConsumerApplication.class);

    public static void main(String... args) {
        new SpringApplicationBuilder(StreamReactiveConsumerApplication.class).run(args);
    }

    @Override
    public void run(String... args) {
    }

    @Bean
    Consumer<Flux<Message<String>>> consume() {
        return flux -> flux.flatMap(one -> myHandle(one)).subscribe();
    }

    private Mono<String> myHandle(Message<String> one) {
        log.info("<==== look at this thread" + "\u001B[32m" + one.getPayload() + "\u001B[0m");
        String payload = one.getPayload();
        String decryptedPayload = complexInMemoryDecryption(payload); //这是非阻塞的,需要1秒
        String complexMatrix = convertDecryptedPayloadToGiantMatrix(decryptedPayload);  //这是非阻塞的,需要1秒
        String newMatrix = matrixComputation(complexMatrix); //这是非阻塞的,需要1秒
        return myNonBlockingReactiveRepository.save(complexMatrix);
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>streamreactiveconsumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.2</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2022.0.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

(注意,这不是一个Spring Kafka项目,也不是一个Spring Cloud Stream项目)

我从一个有3个分区的主题中消费。消息的发送速率是每秒一条。

消息的消费和处理每条消息需要大约3秒。

重要提示:请注意处理过程中没有任何阻塞操作。这是一个巨大的内存解密 + 巨大矩阵计算。它经过了BlockHound测试,是非阻塞的。

实际
当我使用Reactor Kafka消费消息时,整个消费过程只发生在一个线程上。所有操作都发生在container-0-C-1上。

在具有2个CPU、4个CPU、8个CPU的硬件上进行了测试,所有操作都将在container-0-C-1上执行。

2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :
2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :
2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :

期望
我们从基于Reactor Netty的Spring Webflux应用程序迁移到了基于Kafka消费的应用程序。业务逻辑没有任何变化。

在Reactor Netty Spring Webflux应用程序中,我们可以看到来自多个线程的处理,对应于反应器核心。在具有多个核心的机器上,这样可以轻松应对。

[or-http-epoll-1] [or-http-epoll-2] [or-http-epoll-3] [or-http-epoll-4]

处理过程会在这些reactor-http-epoll-N线程之间切换。我可以看到当reactor-http-epoll-1处理第一条消息的复杂内存计算时,reactor-http-epoll-3将处理第二条消息的计算,依此类推...并行性是明显的。

我理解有多种方法可以“扩展”这个应用程序,但这是关于Reactor Kafka本身的问题。

我期望消息会被并行处理。第一条消息由某种container-0-C-1处理,第二条消息由container-0-C-2处理,依此类推...

请问我应该如何实现这一点呢?我漏掉了什么吗?

英文:

Small question regarding Reactor Kafka please.

I am having a very straightforward Reactor Kafka project.

package com.example.micrometer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Consumer;
@SpringBootApplication
public class StreamReactiveConsumerApplication implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(StreamReactiveConsumerApplication.class);
public static void main(String... args) {
new SpringApplicationBuilder(StreamReactiveConsumerApplication.class).run(args);
}
@Override
public void run(String... args) {
}
@Bean
Consumer&lt;Flux&lt;Message&lt;String&gt;&gt;&gt; consume() {
return flux -&gt; flux.flatMap(one -&gt; myHandle(one) ).subscribe();
}
private Mono&lt;String&gt; myHandle(Message&lt;String&gt; one) {
log.info(&quot;&lt;==== look at this thread&quot; + &quot;\u001B[32m&quot; + one.getPayload() + &quot;\u001B[0m&quot;);
String payload = one.getPayload();
String decryptedPayload = complexInMemoryDecryption(payload); //this is NON blocking, takes 1 second
String complexMatrix = convertDecryptedPayloadToGiantMatrix(decryptedPayload);  //this is NON blocking, takes 1 second
String newMatrix = matrixComputation(complexMatrix); //this is NON blocking, takes 1 second
return myNonBlockingReactiveRepository.save(complexMatrix);
}
}
&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;
&lt;project xmlns=&quot;http://maven.apache.org/POM/4.0.0&quot;
xmlns:xsi=&quot;http://www.w3.org/2001/XMLSchema-instance&quot;
xsi:schemaLocation=&quot;http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd&quot;&gt;
&lt;modelVersion&gt;4.0.0&lt;/modelVersion&gt;
&lt;groupId&gt;org.example&lt;/groupId&gt;
&lt;artifactId&gt;streamreactiveconsumer&lt;/artifactId&gt;
&lt;version&gt;1.0-SNAPSHOT&lt;/version&gt;
&lt;parent&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-starter-parent&lt;/artifactId&gt;
&lt;version&gt;3.0.2&lt;/version&gt;
&lt;relativePath/&gt;
&lt;/parent&gt;
&lt;properties&gt;
&lt;maven.compiler.source&gt;17&lt;/maven.compiler.source&gt;
&lt;maven.compiler.target&gt;17&lt;/maven.compiler.target&gt;
&lt;project.build.sourceEncoding&gt;UTF-8&lt;/project.build.sourceEncoding&gt;
&lt;/properties&gt;
&lt;dependencyManagement&gt;
&lt;dependencies&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-dependencies&lt;/artifactId&gt;
&lt;version&gt;2022.0.1&lt;/version&gt;
&lt;type&gt;pom&lt;/type&gt;
&lt;scope&gt;import&lt;/scope&gt;
&lt;/dependency&gt;
&lt;/dependencies&gt;
&lt;/dependencyManagement&gt;
&lt;dependencies&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-stream-binder-kafka&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;/dependencies&gt;
&lt;build&gt;
&lt;plugins&gt;
&lt;plugin&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-maven-plugin&lt;/artifactId&gt;
&lt;/plugin&gt;
&lt;/plugins&gt;
&lt;/build&gt;
&lt;/project&gt;

(Note, it is not a Spring Kafka project, not a Spring Cloud Stream project)

I am consuming from a topic with 3 partitions. The rate of the messages sent is one message per second.

The consumption and the processing of the message takes 3ish seconds second per message.

Important: please note the processing does not contain any blocking operation. It is a giant in memory decryption + giant matrix computation. It is BlockHound tested NON blocking.

Actual:
When I consume the messages with project Reactor Kafka, the whole consumption happens on one thread only. Everything happens on container-0-C-1

Everything will happen on container-0-C-1, tested with hardware with 2 CPUs, 4 CPUs, 8 CPUs

2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName=&#39;prod_audit_hdfs&#39;, partitions=3, dlqName=&#39;null&#39;}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :
2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName=&#39;prod_audit_hdfs&#39;, partitions=3, dlqName=&#39;null&#39;}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :
2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName=&#39;prod_audit_hdfs&#39;, partitions=3, dlqName=&#39;null&#39;}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :

Expected:
We migrated from http webflux based to Kafka consumption based. The business logic did not change one bit.

On the Reactor Netty Spring webflux application, we could see processing happening from multiple thread, corresponding to the reactor cores. On a machine with many cores, this could keep up easily.

[or-http-epoll-1] [or-http-epoll-2] [or-http-epoll-3] [or-http-epoll-4]

The processing with just switch between any of those reactor-http-epoll-N.
I could see when reactor-http-epoll-1 is handling the complex in memory computation for the first message, reactor-http-epoll-3 would handle the computation for the second message, etc... The parallelism is clear

I understand there are way to "scale" this application, but this a question in terms of Reactor Kafka itself.

I expect the messages would be handled in parallel. Some kind of container-0-C-1 for the first message, container-0-C-2 for the second message, etc...

How can I achieve that please?
What am I missing?

Thank you

答案1

得分: 1

通常在Kafka消费者中,将轮询周期与处理逻辑分离是一个好主意。KafkaConsumer本身有一个本地的I/O线程。有时这种架构被称为"带有流水线的消费者"。在这种架构中,轮询线程不断地从Kafka中获取记录,然后将它们"馈送"到某个有界缓冲区/队列(例如ArrayBlockingQueueLinkedBlockingQueue)中。另一方面,处理线程从队列中获取记录并处理它们。这样可以将轮询逻辑与处理逻辑解耦,实现缓冲和背压。

Reactor Kafka是建立在KafkaConsumerAPI之上的,使用类似的架构实现了带有背压的响应式流。KafkaReceiver提供了轮询周期,并且默认情况下将获取到的记录发布到一个Schedulers.single线程上。

现在,根据您的逻辑,您可以按顺序或并行处理数据和提交偏移量。对于并发处理,请使用flatMap,它默认并行处理256条记录,可以使用concurrency参数进行控制。

kafkaReceiver.receive()
    .flatMap(rec -> process(rec), concurrency)

如果您添加了日志记录,您会看到所有记录都是在kafka-receiver-2上接收的,但是在不同的parallel-#线程上处理的。请注意,记录是按照分区的顺序接收的。

换句话说,这是设计上的考虑,您不需要担心轮询逻辑。您可以通过增加flatMap的并行度来扩展处理能力。

英文:

Typically in kafka consumers it's a good idea to separate polling cycle from processing logic. There is also I/O thread that is native to the KafkaConsumer. Sometimes this architecture is called "consumer with pipelining". In this architecture polling thread are continuously fetching records from kafka and then "feed" them to some bounded buffer/queue (i.e. ArrayBlockingQueue or LinkedBlockingQueue). On the other side processing threads take records from the queue and process them. It allows to separate decouple polling logic from the processing implementing buffering and backpreasure.

Reactor Kafka is built on top of KafkaConsumer API and use similar architecture implementing reactive streams with backpreasure. KafkaReceiver provides polling cycle and by default, publishes fetched records on a Schedulers.single thread.

Now, depends on your logic, you could process data and commit offsets sequentially or in parallel. For concurrent processing use flatMap that by default processes 256 records in parallel and could be controlled using concurrency parameter.

kafkaReceiver.receive()
.flatMap(rec -&gt; proces(rec), concurrency)

If you add logging, you would see that all records are received on kafka-receiver-2 but processed on different parallel-# threads. Note, that records are received in order per partition.

12:50:08.347  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-2, partition: 0
12:50:08.349  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-3, partition: 0
12:50:08.350  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-4, partition: 0
12:50:08.350  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-6, partition: 0
12:50:08.351  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-9, partition: 0
12:50:08.353  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-0, partition: 2
12:50:08.354  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-8, partition: 2
12:50:08.355  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-1, partition: 1
12:50:08.356  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-5, partition: 1
12:50:08.358  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-7, partition: 1
12:50:09.353  [parallel-3] INFO [c.e.d.KafkaConsumerTest] - process: value-2, partition: 0
12:50:09.353  [parallel-6] INFO [c.e.d.KafkaConsumerTest] - process: value-6, partition: 0
12:50:09.353  [parallel-4] INFO [c.e.d.KafkaConsumerTest] - process: value-3, partition: 0
12:50:09.353  [parallel-5] INFO [c.e.d.KafkaConsumerTest] - process: value-4, partition: 0
12:50:09.355  [parallel-7] INFO [c.e.d.KafkaConsumerTest] - process: value-9, partition: 0
12:50:09.360  [parallel-10] INFO [c.e.d.KafkaConsumerTest] - process: value-1, partition: 1
12:50:09.360  [parallel-9] INFO [c.e.d.KafkaConsumerTest] - process: value-8, partition: 2
12:50:09.360  [parallel-8] INFO [c.e.d.KafkaConsumerTest] - process: value-0, partition: 2
12:50:09.361  [parallel-11] INFO [c.e.d.KafkaConsumerTest] - process: value-5, partition: 1
12:50:09.361  [parallel-12] INFO [c.e.d.KafkaConsumerTest] - process: value-7, partition: 1

In other words this is by design and you should not worry about polling logic. You can scale processing by increasing parallelism for flatMap.

huangapple
  • 本文由 发表于 2023年2月7日 03:45:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/75365882.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定