我们可以使用CompletableFutures进行并行的Kafka Streams处理吗?

huangapple go评论72阅读模式
英文:

Can we do parallel Kafka Streams processing with CompletableFutures

问题

可以使用Java的CompletableFuture在Kafka流应用程序中执行并行工作吗?

我想从一个Kafka主题中读取数据,然后创建两个窗口计数,一个用于分钟级别,另一个用于小时级别,但要并行执行它们。

我编写了一些示例代码。我能够使其正常工作,但根据Kafka流文档的描述,由于KafkaStreams为每个分区分配一个任务,而且不能超过一个线程,我不确定这段代码是否会产生期望的效果。

CompletableFuture completableFutureOfMinute = CompletableFuture.runAsync(() -> {
    inputStream.groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
                    "minute-store")
                    .withRetention(Duration.ofMinutes(1)))
            .toStream()
            .to("result-topic");
});

CompletableFuture completableFutureOfHour = CompletableFuture.runAsync(() -> {
    inputStream.groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofHours(1)))
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
                    "hour-store")
                    .withRetention(Duration.ofHours(1)))
            .toStream()
            .to("result-topic-2", produced);
});

final CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(completableFutureOfMinute,
        completableFutureOfHour);

try {
    combinedFutures.get();
} catch (final Exception ex) {

}

注意:我已经省略了代码中的注释以保持简洁。

英文:

Is it possible to do parallel work within a Kafka stream application using Java CompletableFutures?

I want to read from 1 Kafka topic, create two windowed counts, 1 for minute and another for hour but do them in parallel.

I wrote some sample code. I am able to get this to work but looking at the Kafka stream documentation, since KafkaStreams assigns 1 task per partition and it can't go beyond one-thread I'm not sure if this code will have desired effect.

CompletableFuture completableFutureOfMinute = CompletableFuture.runAsync(() -&gt; {
        inputStream.groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
                .count(Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as(
                        &quot;minute-store&quot;)
                        .withRetention(Duration.ofMinutes(1)))
                .toStream()
                .to(&quot;result-topic&quot;);
    });

    CompletableFuture completableFutureOfHour = CompletableFuture.runAsync(() -&gt; {
        inputStream.groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofHours(1)))
                .count(Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as(
                        &quot;hour-store&quot;)
                        .withRetention(Duration.ofHours(1)))
                .toStream()
                .to(&quot;result-topic-2&quot;, produced);
    });

    final CompletableFuture&lt;Void&gt; combinedFutures = CompletableFuture.allOf(completableFutureOfMinute,
            completableFutureOfHour);

    try {
        combinedFutures.get();
    } catch (final Exception ex) {

    }

答案1

得分: 0

你的程序似乎不正确。

请注意,使用DSL时,基本上是在组装数据流程序,只有在调用KafkaStreams#start()时才会开始数据处理。因此,在指定处理逻辑时使用Futures是无助的,因为尚未处理任何数据。

Kafka Streams基于任务并行化。因此,如果您想并行处理两个窗口,您需要将输入主题“复制”(通过调用Topology)以将程序拆分为独立部分(称为SubTopology):

KStream input = builder.stream(...);
input.groupByKey().windowBy(/* 1分钟 */).count(...);
input.repartition().groupByKey().windowBy(/* 1小时 */).count();

使用repartition(),您的程序将被拆分为两个子拓扑,您将获得每个子拓扑的任务,可以由不同线程并行处理。

然而,我实际上怀疑这个程序是否会增加您的吞吐量。如果您真的想要增加吞吐量,您应该增加输入主题分区的数量,以获得更多的并行任务。

英文:

Your program does not seem to be right.

Note, that using the DSL, you basically assemble a dataflow program and data processing only starts when you call KafkaStreams#start(). Thus, using Futures while specifying your processing logic does not help, as no data is processed yet.

Kafka Streams parallelizes based on tasks. Thus, if you want to process both windows in parallel, you would need to "replicate" the input topic to split your program (called Topology) into independent parts (call SubTopology):

<!-- language java -->

KStream input = builder.stream(...);
input.groupByKey().windowBy(/* 1 min */).count(...);
input.repartition().groupByKey().windowBy(/* 1 hour */).count();

Using repartition() your program will be split into two sub-topologies and you will get tasks for each sub-topology that can be processed by different threads in parallel.

However, I actually doubt if this program will increase your throughput. If you really want to increase your throughput, you should increase the number of input topic partitions to get more parallel tasks.

huangapple
  • 本文由 发表于 2020年7月25日 01:38:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/63078748.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定