如何使用 Quarkus 按照拓扑结构启动 Kafka Streams 流程。

huangapple go评论77阅读模式
英文:

How to start a Kafka-Streams Pipeline by Topology using Quarkus

问题

以下是您要翻译的内容:

我正在按照Quarkus Kafka-Streams教程进行操作,但不太理解如何启动流水线。

在教程中,使用org.apache.kafka.streams.StreamsBuilder来构建描述流水线的org.apache.kafka.streams.Topology。构建Topology的方法带有@Produces注解。在这个速查表中,描述了这就足以运行Kafka-Streams流水线。在教程中,此外还公开了一个HTTP端点。在我目前正在实现的服务中,不需要这样做。另外,在示例中,提供者方法从未被显式地调用过。当我在没有端点的情况下启动应用程序时,流水线不会启动。

这个教程中,流水线明确地实例化为一个拓扑结构。但在这里,必须手动设置属性,而且配置不能从quarkus.kafka-streams.<something>属性中获取。

问题是:我如何使用第一个教程中的Topology构建器来启动所描述的流水线?最理想的情况是,从quarkus.kafka-streams.<something>中自动应用配置。

使用:

  • Java OpenJDK 11.0.8
  • Quarkus版本:1.8.0.Final
英文:

I am following the Quarkus Kafka-Streams tutorial and can't quite understand how a pipeline can be started.

In the tutorial the org.apache.kafka.streams.StreamsBuilder is used to build a org.apache.kafka.streams.Topology that describes the pipeline. The method that builds the Topology is annotated with @Produces. In this cheat sheet it's described that this is sufficient to run a Kafka-Streams pipeline. In the tutorial an http-endpoint is exposed additionally. That's not required in the service I'm currently implementing. Also in the example the provider method is never called explicitly. When I start the application without the endpoint the pipeline is not started.

In this tutorial the pipeline is instantiated explicitly with a topology. But here properties have to be set manually and configuration is not taken from the quarkus.kafka-streams.&lt;something&gt; properties.

The question is: How can I use Topology builder from the first tutorial to start the pipeline described by it? Optimal case would be that the configuration from quarkus.kafka-streams.&lt;something&gt; is automatically applied.

Using:

  • Java OpenJDK 11.0.8
  • Quarkus Version: 1.8.0.Final

答案1

得分: 2

问题找到了:不能保证完全正确,只想分享解决我的问题的方法。

最重要的是使用正确的 @Produces。必须使用 javax.enterprise.inject.Produces 来对生成 Topology 的方法进行注解。可以另外使用 javax.ws.rs.Produces 来定义输出的 MediaType,但不是必需的:

    @javax.ws.rs.Produces( MediaType.TEXT_PLAIN )
    @Produces
    @AlternativePriority( 1 )
    public Topology buildTopology() {
        ...
    }

在启动时,框架会自动构建一个 Kafka Streams 的实例。您只需要运行以下内容来运行您的流水线:

@ApplicationScoped
public class YourApplication {
    private final KafkaStreams streams;

    public NasDistributorApplication( final KafkaStreams streams ) {
        this.streams = streams;
    }

    public void onStart( @Observes final StartupEvent startupEvent ) {
        streams.start();
    }

    public void onStop( @Observes final ShutdownEvent shutdownEvent ) {
        streams.close();
    }

}

通过 @AlternativePriority( 1 ) 对生成 Topology 的方法进行注解可能是必要的,以告诉 quarkus 使用此方法来构建 Topology。我不了解框架的内部情况,但我怀疑默认使用了一个 Topology,而 @AlternativePriority 为自定义方法提供了比默认 Topology 更高的优先级。

英文:

Figured out the problem: No guarantee for complete correctness. Just want to share what solved my problem.

Most important is to use the right @Produces. javax.enterprise.inject.Produces is necessary to be used for the Topology producing method. javax.ws.rs.Produces can be used additionally to define the MediaType of the output, but is not mandatory:


    @javax.ws.rs.Produces( MediaType.TEXT_PLAIN )
    @Produces
    @AlternativePriority( 1 )
    public Topology buildTopology() {
        ...
    }

An instance of Kafka Streams is automatically built by the framework on start-up. All you need to run your pipeline is the following:


@ApplicationScoped
public class YourApplication {
    private final KafkaStreams streams;

    public NasDistributorApplication( final KafkaStreams streams ) {
        this.streams = streams;
    }

    public void onStart( @Observes final StartupEvent startupEvent ) {
        streams.start();
    }

    public void onStop( @Observes final ShutdownEvent shutdownEvent ) {
        streams.close();
    }

}

Annotating the Topology producing method by @AlternativePriority( 1 ) might be necessary to tell quarkus to use this method to build the Topology. I don't know the internals of the framework but what I suspect that a default Topology is used and @AlternativePriority gives a higher priority to the custom method than the default Topology

huangapple
  • 本文由 发表于 2020年9月17日 06:28:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/63928751.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定