英文:
org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
问题
使用Kafka执行Spring Rest API的DELETE操作,但在返回NULL时出现KafkaReplyTimeoutException错误。
如果在Product POJO上返回一些值,操作可以正常进行,但如果返回NULL,则会出现错误。
# 监听器
@KafkaListener(id = ProductTopicConstants.DELETE_PRODUCT, topics = ProductTopicConstants.DELETE_PRODUCT,
containerFactory = "getDeleteProductContainerFactory")
@SendTo
public Product DeleteProduct(String id) {
logger.info("Listening to delete product", id);
Product product = productRepository.findByid(id);
if (product == null)
return null;
productRepository.delete(product);
return product;
}
# 生产者
public record ProductProducer(ReplyingKafkaTemplate<String, Object, Object> _replyTemplate) implements IProductProducer {
@Override
public ProductViewModel Delete(String id) throws InterruptedException, ExecutionException, TimeoutException {
RequestReplyFuture<String, Object, Object> future =
this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.DELETE_PRODUCT, 0, null, id));
LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Product deletedProduct = (Product) future.get(10, TimeUnit.SECONDS).value();
if (deletedProduct == null)
return null;
Product mappedProducts = mapper.convertValue(deletedProduct, new TypeReference<Product>() {});
return new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription(), mappedProducts.getVersion());
}
}
# 工厂容器
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public ConsumerFactory<String, String> consumerFactoryGetDeleteProduct() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(String.class));
}
@Bean
public KafkaListenerContainerFactory<?> getDeleteProductContainerFactory(ProducerFactory<String, Object> pf) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryGetDeleteProduct());
factory.setReplyTemplate(kafkaTemplate(pf));
return factory;
}
# 错误信息
org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$3(ReplyingKafkaTemplate.java:339) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
英文:
Using Kafka to perform the Spring Rest API DELETE operation, but getting the KafkaReplyTimeoutException on return NULL.
If I return some value on the Product POJO, the operation works perfectly, but if I return NULL getting the error.
Listener
@KafkaListener(id = ProductTopicConstants.DELETE_PRODUCT, topics = ProductTopicConstants.DELETE_PRODUCT,
containerFactory = "getDeleteProductContainerFactory")
@SendTo
public Product DeleteProduct(String id) {
logger.info("Listening to delete product", id);
Product product = productRepository.findByid(id);
if (product == null)
return null;
productRepository.delete(product);
return product;
}
Producer
public record ProductProducer(ReplyingKafkaTemplate<String, Object, Object> _replyTemplate) implements IProductProducer {
@Override
public ProductViewModel Delete(String id) throws InterruptedException, ExecutionException, TimeoutException {
RequestReplyFuture<String, Object, Object> future =
this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.DELETE_PRODUCT, 0, null, id));
LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Product deletedProduct = (Product) future.get(10, TimeUnit.SECONDS).value();
if (deletedProduct == null)
return null;
Product mappedProducts = mapper.convertValue(deletedProduct, new TypeReference<Product>() {
});
return new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription(), mappedProducts.getVersion());
}}
Factory container
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public ConsumerFactory<String, String> consumerFactoryGetDeleteProduct() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(String.class));
}
@Bean
public KafkaListenerContainerFactory<?> getDeleteProductContainerFactory(ProducerFactory<String, Object> pf) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryGetDeleteProduct());
factory.setReplyTemplate(kafkaTemplate(pf));
return factory;
}
Error
org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$3(ReplyingKafkaTemplate.java:339) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
答案1
得分: 2
为什么你会期望有什么不同?如果你返回 null
,就不会有回复可以发送,因此客户端会超时。
要返回一个 null
值,请返回 KafkaNull.INSTANCE
,客户端的 ConsumerRecord
将包含 null
。
@KafkaListener(id = "so63583664", topics = "topic1")
@SendTo
public Object listen(String in) {
System.out.println(in);
return in.equals("foo") ? in.toUpperCase() : KafkaNull.INSTANCE;
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> replyer) {
return args -> {
ProducerRecord<String, String> pr = new ProducerRecord<>("topic1", "foo", "foo");
RequestReplyFuture<String, String, String> future = replyer.sendAndReceive(pr);
System.out.println(future.get(10, TimeUnit.SECONDS).value());
pr = new ProducerRecord<>("topic1", "foo", "bar");
future = replyer.sendAndReceive(pr);
System.out.println(future.get(10, TimeUnit.SECONDS).value());
};
}
foo
FOO
bar
null
英文:
Why would you expect anything different? If you return null
there is no reply to send so we get a timeout on the client side.
To return a null
value, return KafkaNull.INSTANCE
and the ConsumerRecord
on the client side will contain null
.
@KafkaListener(id = "so63583664", topics = "topic1")
@SendTo
public Object listen(String in) {
System.out.println(in);
return in.equals("foo") ? in.toUpperCase() : KafkaNull.INSTANCE;
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> replyer) {
return args -> {
ProducerRecord<String, String> pr = new ProducerRecord<>("topic1", "foo", "foo");
RequestReplyFuture<String, String, String> future = replyer.sendAndReceive(pr);
System.out.println(future.get(10, TimeUnit.SECONDS).value());
pr = new ProducerRecord<>("topic1", "foo", "bar");
future = replyer.sendAndReceive(pr);
System.out.println(future.get(10, TimeUnit.SECONDS).value());
};
}
foo
FOO
bar
null
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论