英文:
How do I implement in memory or embedded kafka not for testing purposes?
问题
我正在寻求在仅允许由上述应用程序运行的环境中部署Spring Boot应用。我的应用将是Kafka的生产者和消费者。是否有一种方法在启动时运行内存中的实例,可用于除测试以外的其他目的?或者,是否有一种在启动时启动Spring Boot应用的方式,如果它无法连接到Kafka作为生产者和消费者也不会失败?
编辑:这是一个临时解决方案,直到我们能够在此环境中部署Kafka。该应用程序不生成和消费自己的记录。它是多应用程序部署的一部分,每个应用程序既为其他应用程序生成,也消费其他应用程序的Kafka主题。我看到了很多关于在消费者无法使用Kafka的情况下应用程序启动的信息,但在生产者方面却没有太多相关信息。我的应用程序将同时执行这两个角色。
英文:
I am looking to deploy a spring boot app in an environment that only allows Kafka to be run by the aforementioned app. My app will be a Kafka producer and consumer. Is there a way to run an in memory instance on startup that can be used for purposes other than testing? Alternatively, is there a way to startup a spring boot app that will not fail if it cannot connect to Kafka as a producer and consumer?
edit: it is a temporary solution until we are able to deploy Kafka in this environment. The app does not produce and consume its own records. It is one part of a multi app deployment where each app both produces for other apps and consumes other apps Kafka topics. I see a lot of info around app startup when Kafka is not available for a consumer, but not much is out there with regards to producers. My app will be doing both.
答案1
得分: 7
以下是翻译好的内容:
这样的应用程序(生成和消耗自己的记录)的目的是什么?嵌入式代理并不适用于生产环境。
自版本2.3.4起,容器属性missingTopicsFatal
默认为false,这将允许容器在代理不可用时启动。在早期版本中,您可以将其设置为false以获得相同的效果。
当它为true时,容器在启动期间连接到代理以验证主题是否存在。
您还可以将容器的autoStartup=false
来完全阻止容器启动。
编辑
我不建议在生产环境中使用这个,但是您可以从spring-kafka-test
中删除test
作用域,并将代理声明为@Bean
...
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<!-- <scope>test</scope> -->
</dependency>
@Bean
EmbeddedKafkaBroker broker() {
return new EmbeddedKafkaBroker(1)
.kafkaPorts(9092)
.brokerListProperty("spring.kafka.bootstrap-servers"); // 覆盖应用程序属性
}
我刚刚用这个应用程序测试过...
@SpringBootApplication
public class So63812994Application {
public static void main(String[] args) {
SpringApplication.run(So63812994Application.class, args);
}
@Bean
EmbeddedKafkaBroker broker() {
return new EmbeddedKafkaBroker(1)
.kafkaPorts(9092)
.brokerListProperty("spring.kafka.bootstrap-servers");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63812994").partitions(1).replicas(1).build();
}
@KafkaListener(id = "so63812994", topics = "so63812994")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so63812994", "foo");
};
}
}
spring.kafka.bootstrap-servers=realKafka:9092
spring.kafka.consumer.auto-offset-reset=earliest
编辑2
使用上述配置,同一主机上的其他应用程序可以使用localhost:9092
进行连接。
如果您需要远程访问此嵌入式代理,则需要一些额外的配置:
@Bean
EmbeddedKafkaBroker broker() {
return new EmbeddedKafkaBroker(1)
.kafkaPorts(9092)
.brokerProperty("listeners", "PLAINTEXT://localhost:9092,REMOTE://10.0.0.20:9093")
.brokerProperty("advertised.listeners", "PLAINTEXT://localhost:9092,REMOTE://10.0.0.20:9093")
.brokerProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,REMOTE:PLAINTEXT")
.brokerListProperty("spring.kafka.bootstrap-servers");
}
然后您可以使用10.0.0.20:9093
从其他服务器进行连接。
英文:
What would be the purpose of such an application (produce and consume its own records)? The embedded broker is not designed for production use.
Since version 2.3.4, the container property missingTopicsFatal
is false by default, which will allow the container to start even if the broker is not available. With earlier versions, you can set it to false to get the same effect.
When it is true, the container connects to the broker during startup to verify that the topic(s) exist.
You can also set the container's autoStartup=false
to prevent the container from starting at all.
EDIT
I wouldn't recommend using this in production, but you can remove the test
scope from spring-kafka-test
and declare the broker as a @Bean
...
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<!-- <scope>test</scope> -->
</dependency>
@Bean
EmbeddedKafkaBroker broker() {
return new EmbeddedKafkaBroker(1)
.kafkaPorts(9092)
.brokerListProperty("spring.kafka.bootstrap-servers"); // override application property
}
I just tested it with this app...
@SpringBootApplication
public class So63812994Application {
public static void main(String[] args) {
SpringApplication.run(So63812994Application.class, args);
}
@Bean
EmbeddedKafkaBroker broker() {
return new EmbeddedKafkaBroker(1)
.kafkaPorts(9092)
.brokerListProperty("spring.kafka.bootstrap-servers");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63812994").partitions(1).replicas(1).build();
}
@KafkaListener(id = "so63812994", topics = "so63812994")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so63812994", "foo");
};
}
}
spring.kafka.bootstrap-servers=realKafka:9092
spring.kafka.consumer.auto-offset-reset=earliest
EDIT2
With the above configuration, other applications on the same host can connect with localhost:9092
.
If you need remote access to this embedded broker, you will need some additional configuration:
@Bean
EmbeddedKafkaBroker broker() {
return new EmbeddedKafkaBroker(1)
.kafkaPorts(9092)
.brokerProperty("listeners", "PLAINTEXT://localhost:9092,REMOTE://10.0.0.20:9093")
.brokerProperty("advertised.listeners", "PLAINTEXT://localhost:9092,REMOTE://10.0.0.20:9093")
.brokerProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,REMOTE:PLAINTEXT")
.brokerListProperty("spring.kafka.bootstrap-servers");
}
You can then connect from other servers with 10.0.0.20:9093
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论