英文:
Failed to start bean 'replyingTemplate'; nested exception is java.lang.IllegalStateException:Error handler is not compatible with the message listener
问题
我正在尝试使用 ReplyingKafkaTemplate,来自于这个示例 https://docs.spring.io/spring-kafka/docs/2.5.3.RELEASE/reference/html/#replying-template
public List<ProductViewModel> GetProducts() {
ProducerRecord<String, String> record = new ProducerRecord<>(ProductTopicConstants.GET_PRODUCTS, null);
RequestReplyFuture<String, String, String> replyFuture = this._replyingKafkaTemplate.sendAndReceive(record);
SendResult<String, String> sendResult = null;
sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
return List.of(new ProductViewModel("", "", 0, ""));
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("replies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
#Error
org.springframework.context.ApplicationContextException: Failed to start bean 'replyingTemplate'; nested exception is java.lang.IllegalStateException: Error handler is not compatible with the message listener, expecting an instance of BatchErrorHandler not org.springframework.kafka.listener.SeekToCurrentErrorHandler
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
...
Caused by: java.lang.IllegalStateException: Error handler is not compatible with the message listener, expecting an instance of BatchErrorHandler not org.springframework.kafka.listener.SeekToCurrentErrorHandler
at org.springframework.util.Assert.state(Assert.java:94) ~[spring-core-5.2.7.RELEASE.jar:5.2.7.RELEASE]
...
Note: I've retained the original code and the error message in your provided content. If you have any specific questions about the translation or the content, feel free to ask.
英文:
I am trying to use ReplyingKafkaTemplate from the this example https://docs.spring.io/spring-kafka/docs/2.5.3.RELEASE/reference/html/#replying-template
public List<ProductViewModel> GetProducts() {
ProducerRecord<String, String> record = new ProducerRecord<>(ProductTopicConstants.GET_PRODUCTS, null);
RequestReplyFuture<String, String, String> replyFuture = this._replyingKafkaTemplate.sendAndReceive(record);
SendResult<String, String> sendResult = null;
sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
return List.of(new ProductViewModel("","",0,""));
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("replies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
#Error
org.springframework.context.ApplicationContextException: Failed to start bean 'replyingTemplate'; nested exception is java.lang.IllegalStateException: Error handler is not compatible with the message listener, expecting an instance of BatchErrorHandler not org.springframework.kafka.listener.SeekToCurrentErrorHandler
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at fete.bird.fetebirdproduct.FeteBirdProductApplication.main(FeteBirdProductApplication.java:13) ~[main/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.3.1.RELEASE.jar:2.3.1.RELEASE]
Caused by: java.lang.IllegalStateException: Error handler is not compatible with the message listener, expecting an instance of BatchErrorHandler not org.springframework.kafka.listener.SeekToCurrentErrorHandler
at org.springframework.util.Assert.state(Assert.java:94) ~[spring-core-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.validateErrorHandler(KafkaMessageListenerContainer.java:947) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:629) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:295) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.start(ReplyingKafkaTemplate.java:280) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
... 19 common frames omitted
答案1
得分: 2
看起来您已经定义了一个类型为SeekToCurrentErrorHandler
的Bean,并且Boot已将其自动配置到了回复容器中。
由于回复模板是一个BatchMessageListener
,它与该错误处理程序不兼容。
您需要自己使用适当的错误处理程序配置容器,比如BatchLoggingErrorHandler
。
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("replies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
repliesContainer.setBatchErrorHandler(new BatchLoggingErrorHandler());
return repliesContainer;
}
英文:
It appears that you have defined a bean of type SeekToCurrentErrorHandler
and Boot has auto-configured it into the reply container.
Since the replying template is a BatchMessageListener
, it is not compatible with that error handler.
You need to configure the container yourself with a suitable error handler, such as the BatchLoggingErrorHandler
.
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("replies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
repliesContainer.setBatchErrorHandler(new BatchLoggingErrorHandler());
return repliesContainer;
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论