org.springframework.kafka.requestreply.KafkaReplyTimeoutException: 回复超时

huangapple go评论82阅读模式
英文:

org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out

问题

使用Kafka执行Spring Rest API的DELETE操作,但在返回NULL时出现KafkaReplyTimeoutException错误。

如果在Product POJO上返回一些值,操作可以正常进行,但如果返回NULL,则会出现错误。

# 监听器

@KafkaListener(id = ProductTopicConstants.DELETE_PRODUCT, topics = ProductTopicConstants.DELETE_PRODUCT,
            containerFactory = "getDeleteProductContainerFactory")
@SendTo
public Product DeleteProduct(String id) {
    logger.info("Listening to delete product", id);
    Product product = productRepository.findByid(id);
    if (product == null)
        return null;
    productRepository.delete(product);
    return product;
}

# 生产者

public record ProductProducer(ReplyingKafkaTemplate<String, Object, Object> _replyTemplate) implements IProductProducer {

    @Override
    public ProductViewModel Delete(String id) throws InterruptedException, ExecutionException, TimeoutException {
        RequestReplyFuture<String, Object, Object> future =
                this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.DELETE_PRODUCT, 0, null, id));
        LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
        Product deletedProduct = (Product) future.get(10, TimeUnit.SECONDS).value();
        if (deletedProduct == null)
            return null;
        Product mappedProducts = mapper.convertValue(deletedProduct, new TypeReference<Product>() {});
        return new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription(), mappedProducts.getVersion());
    }
}

# 工厂容器

@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> pf) {
    return new KafkaTemplate<>(pf);
}
@Bean
public ConsumerFactory<String, String> consumerFactoryGetDeleteProduct() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
            new StringDeserializer(),
            new JsonDeserializer<>(String.class));
}

@Bean
public KafkaListenerContainerFactory<?> getDeleteProductContainerFactory(ProducerFactory<String, Object> pf) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryGetDeleteProduct());
    factory.setReplyTemplate(kafkaTemplate(pf));
    return factory;
}

# 错误信息

org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
	at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$3(ReplyingKafkaTemplate.java:339) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
英文:

Using Kafka to perform the Spring Rest API DELETE operation, but getting the KafkaReplyTimeoutException on return NULL.

If I return some value on the Product POJO, the operation works perfectly, but if I return NULL getting the error.

Listener

    @KafkaListener(id = ProductTopicConstants.DELETE_PRODUCT, topics = ProductTopicConstants.DELETE_PRODUCT,
containerFactory = &quot;getDeleteProductContainerFactory&quot;)
@SendTo
public Product DeleteProduct(String id) {
logger.info(&quot;Listening to delete product&quot;, id);
Product product = productRepository.findByid(id);
if (product == null)
return null;
productRepository.delete(product);
return product;
}

Producer

public record ProductProducer(ReplyingKafkaTemplate&lt;String, Object, Object&gt; _replyTemplate) implements IProductProducer {
@Override
public ProductViewModel Delete(String id) throws InterruptedException, ExecutionException, TimeoutException {
RequestReplyFuture&lt;String, Object, Object&gt; future =
this._replyTemplate.sendAndReceive(new ProducerRecord&lt;&gt;(ProductTopicConstants.DELETE_PRODUCT, 0, null, id));
LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Product deletedProduct = (Product) future.get(10, TimeUnit.SECONDS).value();
if (deletedProduct == null)
return null;
Product mappedProducts = mapper.convertValue(deletedProduct, new TypeReference&lt;Product&gt;() {
});
return new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription(), mappedProducts.getVersion());
}}

Factory container

@Bean
public KafkaTemplate&lt;String, Object&gt; kafkaTemplate(ProducerFactory&lt;String, Object&gt; pf) {
return new KafkaTemplate&lt;&gt;(pf);
}
@Bean
public ConsumerFactory&lt;String, String&gt; consumerFactoryGetDeleteProduct() {
return new DefaultKafkaConsumerFactory&lt;&gt;(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer&lt;&gt;(String.class));
}
@Bean
public KafkaListenerContainerFactory&lt;?&gt; getDeleteProductContainerFactory(ProducerFactory&lt;String, Object&gt; pf) {
ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; factory =
new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
factory.setConsumerFactory(consumerFactoryGetDeleteProduct());
factory.setReplyTemplate(kafkaTemplate(pf));
return factory;
}

Error

org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$3(ReplyingKafkaTemplate.java:339) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

答案1

得分: 2

为什么你会期望有什么不同?如果你返回 null,就不会有回复可以发送,因此客户端会超时。

要返回一个 null 值,请返回 KafkaNull.INSTANCE,客户端的 ConsumerRecord 将包含 null

@KafkaListener(id = "so63583664", topics = "topic1")
@SendTo
public Object listen(String in) {
    System.out.println(in);
    return in.equals("foo") ? in.toUpperCase() : KafkaNull.INSTANCE;
}

@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> replyer) {
    return args -> {
        ProducerRecord<String, String> pr = new ProducerRecord<>("topic1", "foo", "foo");
        RequestReplyFuture<String, String, String> future = replyer.sendAndReceive(pr);
        System.out.println(future.get(10, TimeUnit.SECONDS).value());
        pr = new ProducerRecord<>("topic1", "foo", "bar");
        future = replyer.sendAndReceive(pr);
        System.out.println(future.get(10, TimeUnit.SECONDS).value());
    };
}
foo
FOO
bar
null
英文:

Why would you expect anything different? If you return null there is no reply to send so we get a timeout on the client side.

To return a null value, return KafkaNull.INSTANCE and the ConsumerRecord on the client side will contain null.

@KafkaListener(id = &quot;so63583664&quot;, topics = &quot;topic1&quot;)
@SendTo
public Object listen(String in) {
	System.out.println(in);
	return in.equals(&quot;foo&quot;) ? in.toUpperCase() : KafkaNull.INSTANCE;
}

@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate&lt;String, String, String&gt; replyer) {
	return args -&gt; {
		ProducerRecord&lt;String, String&gt; pr = new ProducerRecord&lt;&gt;(&quot;topic1&quot;, &quot;foo&quot;, &quot;foo&quot;);
		RequestReplyFuture&lt;String, String, String&gt; future = replyer.sendAndReceive(pr);
		System.out.println(future.get(10, TimeUnit.SECONDS).value());
		pr = new ProducerRecord&lt;&gt;(&quot;topic1&quot;, &quot;foo&quot;, &quot;bar&quot;);
		future = replyer.sendAndReceive(pr);
		System.out.println(future.get(10, TimeUnit.SECONDS).value());
	};
}
foo
FOO
bar
null

huangapple
  • 本文由 发表于 2020年8月26日 00:54:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/63583664.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定