英文:
How to start a Kafka-Streams Pipeline by Topology using Quarkus
问题
以下是您要翻译的内容:
我正在按照Quarkus Kafka-Streams教程进行操作,但不太理解如何启动流水线。
在教程中,使用org.apache.kafka.streams.StreamsBuilder
来构建描述流水线的org.apache.kafka.streams.Topology
。构建Topology的方法带有@Produces
注解。在这个速查表中,描述了这就足以运行Kafka-Streams流水线。在教程中,此外还公开了一个HTTP端点。在我目前正在实现的服务中,不需要这样做。另外,在示例中,提供者方法从未被显式地调用过。当我在没有端点的情况下启动应用程序时,流水线不会启动。
在这个教程中,流水线明确地实例化为一个拓扑结构。但在这里,必须手动设置属性,而且配置不能从quarkus.kafka-streams.<something>
属性中获取。
问题是:我如何使用第一个教程中的Topology构建器来启动所描述的流水线?最理想的情况是,从quarkus.kafka-streams.<something>
中自动应用配置。
使用:
Java OpenJDK 11.0.8
Quarkus版本:1.8.0.Final
英文:
I am following the Quarkus Kafka-Streams tutorial and can't quite understand how a pipeline can be started.
In the tutorial the org.apache.kafka.streams.StreamsBuilder
is used to build a org.apache.kafka.streams.Topology
that describes the pipeline. The method that builds the Topology is annotated with @Produces
. In this cheat sheet it's described that this is sufficient to run a Kafka-Streams pipeline. In the tutorial an http-endpoint is exposed additionally. That's not required in the service I'm currently implementing. Also in the example the provider method is never called explicitly. When I start the application without the endpoint the pipeline is not started.
In this tutorial the pipeline is instantiated explicitly with a topology. But here properties have to be set manually and configuration is not taken from the quarkus.kafka-streams.<something>
properties.
The question is: How can I use Topology builder from the first tutorial to start the pipeline described by it? Optimal case would be that the configuration from quarkus.kafka-streams.<something>
is automatically applied.
Using:
Java OpenJDK 11.0.8
Quarkus Version: 1.8.0.Final
答案1
得分: 2
问题找到了:不能保证完全正确,只想分享解决我的问题的方法。
最重要的是使用正确的 @Produces
。必须使用 javax.enterprise.inject.Produces
来对生成 Topology
的方法进行注解。可以另外使用 javax.ws.rs.Produces
来定义输出的 MediaType,但不是必需的:
@javax.ws.rs.Produces( MediaType.TEXT_PLAIN )
@Produces
@AlternativePriority( 1 )
public Topology buildTopology() {
...
}
在启动时,框架会自动构建一个 Kafka Streams
的实例。您只需要运行以下内容来运行您的流水线:
@ApplicationScoped
public class YourApplication {
private final KafkaStreams streams;
public NasDistributorApplication( final KafkaStreams streams ) {
this.streams = streams;
}
public void onStart( @Observes final StartupEvent startupEvent ) {
streams.start();
}
public void onStop( @Observes final ShutdownEvent shutdownEvent ) {
streams.close();
}
}
通过 @AlternativePriority( 1 )
对生成 Topology
的方法进行注解可能是必要的,以告诉 quarkus
使用此方法来构建 Topology
。我不了解框架的内部情况,但我怀疑默认使用了一个 Topology
,而 @AlternativePriority
为自定义方法提供了比默认 Topology
更高的优先级。
英文:
Figured out the problem: No guarantee for complete correctness. Just want to share what solved my problem.
Most important is to use the right @Produces
. javax.enterprise.inject.Produces
is necessary to be used for the Topology
producing method. javax.ws.rs.Produces
can be used additionally to define the MediaType of the output, but is not mandatory:
@javax.ws.rs.Produces( MediaType.TEXT_PLAIN )
@Produces
@AlternativePriority( 1 )
public Topology buildTopology() {
...
}
An instance of Kafka Streams
is automatically built by the framework on start-up. All you need to run your pipeline is the following:
@ApplicationScoped
public class YourApplication {
private final KafkaStreams streams;
public NasDistributorApplication( final KafkaStreams streams ) {
this.streams = streams;
}
public void onStart( @Observes final StartupEvent startupEvent ) {
streams.start();
}
public void onStop( @Observes final ShutdownEvent shutdownEvent ) {
streams.close();
}
}
Annotating the Topology
producing method by @AlternativePriority( 1 )
might be necessary to tell quarkus
to use this method to build the Topology
. I don't know the internals of the framework but what I suspect that a default Topology is used and @AlternativePriority
gives a higher priority to the custom method than the default Topology
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论