Failed to start bean 'replyingTemplate'; nested exception is java.lang.IllegalStateException:Error handler is not compatible with the message listener

huangapple go评论117阅读模式
英文:

Failed to start bean 'replyingTemplate'; nested exception is java.lang.IllegalStateException:Error handler is not compatible with the message listener

问题

  1. 我正在尝试使用 ReplyingKafkaTemplate来自于这个示例 https://docs.spring.io/spring-kafka/docs/2.5.3.RELEASE/reference/html/#replying-template
  2. public List<ProductViewModel> GetProducts() {
  3. ProducerRecord<String, String> record = new ProducerRecord<>(ProductTopicConstants.GET_PRODUCTS, null);
  4. RequestReplyFuture<String, String, String> replyFuture = this._replyingKafkaTemplate.sendAndReceive(record);
  5. SendResult<String, String> sendResult = null;
  6. sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
  7. System.out.println("Sent ok: " + sendResult.getRecordMetadata());
  8. ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
  9. System.out.println("Return value: " + consumerRecord.value());
  10. return List.of(new ProductViewModel("", "", 0, ""));
  11. }
  12. @Bean
  13. public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
  14. ProducerFactory<String, String> pf,
  15. ConcurrentMessageListenerContainer<String, String> repliesContainer) {
  16. return new ReplyingKafkaTemplate<>(pf, repliesContainer);
  17. }
  18. @Bean
  19. public ConcurrentMessageListenerContainer<String, String> repliesContainer(
  20. ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
  21. ConcurrentMessageListenerContainer<String, String> repliesContainer =
  22. containerFactory.createContainer("replies");
  23. repliesContainer.getContainerProperties().setGroupId("repliesGroup");
  24. repliesContainer.setAutoStartup(false);
  25. return repliesContainer;
  26. }
  27. #Error
  28. org.springframework.context.ApplicationContextException: Failed to start bean 'replyingTemplate'; nested exception is java.lang.IllegalStateException: Error handler is not compatible with the message listener, expecting an instance of BatchErrorHandler not org.springframework.kafka.listener.SeekToCurrentErrorHandler
  29. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
  30. ...
  31. Caused by: java.lang.IllegalStateException: Error handler is not compatible with the message listener, expecting an instance of BatchErrorHandler not org.springframework.kafka.listener.SeekToCurrentErrorHandler
  32. at org.springframework.util.Assert.state(Assert.java:94) ~[spring-core-5.2.7.RELEASE.jar:5.2.7.RELEASE]
  33. ...

Note: I've retained the original code and the error message in your provided content. If you have any specific questions about the translation or the content, feel free to ask.

英文:

I am trying to use ReplyingKafkaTemplate from the this example https://docs.spring.io/spring-kafka/docs/2.5.3.RELEASE/reference/html/#replying-template

  1. public List&lt;ProductViewModel&gt; GetProducts() {
  2. ProducerRecord&lt;String, String&gt; record = new ProducerRecord&lt;&gt;(ProductTopicConstants.GET_PRODUCTS, null);
  3. RequestReplyFuture&lt;String, String, String&gt; replyFuture = this._replyingKafkaTemplate.sendAndReceive(record);
  4. SendResult&lt;String, String&gt; sendResult = null;
  5. sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
  6. System.out.println(&quot;Sent ok: &quot; + sendResult.getRecordMetadata());
  7. ConsumerRecord&lt;String, String&gt; consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
  8. System.out.println(&quot;Return value: &quot; + consumerRecord.value());
  9. return List.of(new ProductViewModel(&quot;&quot;,&quot;&quot;,0,&quot;&quot;));
  10. }
  11. @Bean
  12. public ReplyingKafkaTemplate&lt;String, String, String&gt; replyingTemplate(
  13. ProducerFactory&lt;String, String&gt; pf,
  14. ConcurrentMessageListenerContainer&lt;String, String&gt; repliesContainer) {
  15. return new ReplyingKafkaTemplate&lt;&gt;(pf, repliesContainer);
  16. }
  17. @Bean
  18. public ConcurrentMessageListenerContainer&lt;String, String&gt; repliesContainer(
  19. ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; containerFactory) {
  20. ConcurrentMessageListenerContainer&lt;String, String&gt; repliesContainer =
  21. containerFactory.createContainer(&quot;replies&quot;);
  22. repliesContainer.getContainerProperties().setGroupId(&quot;repliesGroup&quot;);
  23. repliesContainer.setAutoStartup(false);
  24. return repliesContainer;
  25. }

#Error

  1. org.springframework.context.ApplicationContextException: Failed to start bean &#39;replyingTemplate&#39;; nested exception is java.lang.IllegalStateException: Error handler is not compatible with the message listener, expecting an instance of BatchErrorHandler not org.springframework.kafka.listener.SeekToCurrentErrorHandler
  2. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
  3. at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
  4. at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
  5. at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
  6. at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
  7. at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
  8. at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
  9. at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
  10. at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
  11. at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
  12. at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
  13. at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
  14. at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
  15. at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
  16. at fete.bird.fetebirdproduct.FeteBirdProductApplication.main(FeteBirdProductApplication.java:13) ~[main/:na]
  17. at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
  18. at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
  19. at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
  20. at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
  21. at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.3.1.RELEASE.jar:2.3.1.RELEASE]
  22. Caused by: java.lang.IllegalStateException: Error handler is not compatible with the message listener, expecting an instance of BatchErrorHandler not org.springframework.kafka.listener.SeekToCurrentErrorHandler
  23. at org.springframework.util.Assert.state(Assert.java:94) ~[spring-core-5.2.7.RELEASE.jar:5.2.7.RELEASE]
  24. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.validateErrorHandler(KafkaMessageListenerContainer.java:947) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  25. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.&lt;init&gt;(KafkaMessageListenerContainer.java:629) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  26. at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:295) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  27. at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  28. at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  29. at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  30. at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.start(ReplyingKafkaTemplate.java:280) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
  31. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
  32. ... 19 common frames omitted

答案1

得分: 2

看起来您已经定义了一个类型为SeekToCurrentErrorHandler的Bean,并且Boot已将其自动配置到了回复容器中。

由于回复模板是一个BatchMessageListener,它与该错误处理程序不兼容。

您需要自己使用适当的错误处理程序配置容器,比如BatchLoggingErrorHandler

  1. @Bean
  2. public ConcurrentMessageListenerContainer<String, String> repliesContainer(
  3. ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
  4. ConcurrentMessageListenerContainer<String, String> repliesContainer =
  5. containerFactory.createContainer("replies");
  6. repliesContainer.getContainerProperties().setGroupId("repliesGroup");
  7. repliesContainer.setAutoStartup(false);
  8. repliesContainer.setBatchErrorHandler(new BatchLoggingErrorHandler());
  9. return repliesContainer;
  10. }
英文:

It appears that you have defined a bean of type SeekToCurrentErrorHandler and Boot has auto-configured it into the reply container.

Since the replying template is a BatchMessageListener, it is not compatible with that error handler.

You need to configure the container yourself with a suitable error handler, such as the BatchLoggingErrorHandler.

  1. @Bean
  2. public ConcurrentMessageListenerContainer&lt;String, String&gt; repliesContainer(
  3. ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; containerFactory) {
  4. ConcurrentMessageListenerContainer&lt;String, String&gt; repliesContainer =
  5. containerFactory.createContainer(&quot;replies&quot;);
  6. repliesContainer.getContainerProperties().setGroupId(&quot;repliesGroup&quot;);
  7. repliesContainer.setAutoStartup(false);
  8. repliesContainer.setBatchErrorHandler(new BatchLoggingErrorHandler());
  9. return repliesContainer;
  10. }

huangapple
  • 本文由 发表于 2020年7月24日 20:39:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/63073761.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定