“Value not return to calling method when using ConsumerRecord<>” in the listener.

huangapple go评论71阅读模式
英文:

Value not return to calling method when using ConsumerRecord<> in the listner

问题

# KafkaListener

@KafkaListener(id = ProductTopicConstants.UPDATE_PRODUCT, topics = ProductTopicConstants.UPDATE_PRODUCT,
    containerFactory = "addUpdateProductContainerFactory")
@SendTo
public Object UpdateProduct(ConsumerRecord<String, ProductViewModel> productViewModel) {
    String id = productViewModel.key();
    Product product = productRepository.findByid(id);
    if (product != null) {
        product.setName(productViewModel.name());
        product.setPrice(productViewModel.price());
        product.setDescription(productViewModel.description());
        return productRepository.save(product);
    }
    return KafkaNull.INSTANCE;
}

# Producer

public GenericResponse<ProductViewModel> Update(ProductViewModel product, String id) throws InterruptedException, ExecutionException, TimeoutException {
    RequestReplyFuture<String, Object, Object> future =
            this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.UPDATE_PRODUCT, 0, id, product));
    LOG.info(future.getSendFuture().get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).getRecordMetadata().toString());
    Object productDb = future.get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).value();
    if (productDb == null)
        return null;
    if (productDb == HttpStatus.CONFLICT)
        return new GenericResponse<ProductViewModel>(null, HttpStatus.CONFLICT);
    Product mappedProducts = mapper.convertValue(productDb, new TypeReference<Product>() {});
    return new GenericResponse<ProductViewModel>(new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription(), mappedProducts.getVersion()), null);

}

# Container configuration

@Bean
public ConsumerFactory<String, ProductViewModel> consumerFactoryAddUpdateProduct() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
            new StringDeserializer(),
            new JsonDeserializer<>(ProductViewModel.class));
}

@Bean
public KafkaListenerContainerFactory<?> addUpdateProductContainerFactory(ProducerFactory<String, Object> pf) {
    ConcurrentKafkaListenerContainerFactory<String, ProductViewModel> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryAddUpdateProduct());
    factory.setReplyTemplate(kafkaTemplate(pf));
    return factory;
}

# Error

org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
	at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$3(ReplyingKafkaTemplate.java:339) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
英文:

KafkaListener

  @KafkaListener(id = ProductTopicConstants.UPDATE_PRODUCT, topics = ProductTopicConstants.UPDATE_PRODUCT,
containerFactory = &quot;addUpdateProductContainerFactory&quot;)
@SendTo
public Object UpdateProduct(ConsumerRecord&lt;String, ProductViewModel&gt; productViewModel) {
String id = productViewModel.key();
Product product = productRepository.findByid(id);
if (product != null) {
product.setName(productViewModel.name());
product.setPrice(productViewModel.price());
product.setDescription(productViewModel.description());
return productRepository.save(product);
}
return KafkaNull.INSTANCE;
}

Producer

   public GenericResponse&lt;ProductViewModel&gt; Update(ProductViewModel product, String id) throws InterruptedException, ExecutionException, TimeoutException {
RequestReplyFuture&lt;String, Object, Object&gt; future =
this._replyTemplate.sendAndReceive(new ProducerRecord&lt;&gt;(ProductTopicConstants.UPDATE_PRODUCT,0, id,product));
LOG.info(future.getSendFuture().get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).getRecordMetadata().toString());
Object productDb = future.get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).value();
if (productDb == null)
return null;
if (productDb == HttpStatus.CONFLICT)
return new GenericResponse&lt;ProductViewModel&gt;(null, HttpStatus.CONFLICT);
Product mappedProducts = mapper.convertValue(productDb, new TypeReference&lt;Product&gt;() {});
return new GenericResponse&lt;ProductViewModel&gt;(new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription(), mappedProducts.getVersion()), null);
}

Container configuration

   @Bean
public ConsumerFactory&lt;String, ProductViewModel&gt; consumerFactoryAddUpdateProduct() {
return new DefaultKafkaConsumerFactory&lt;&gt;(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer&lt;&gt;(ProductViewModel.class));
}
@Bean
public KafkaListenerContainerFactory&lt;?&gt; addUpdateProductContainerFactory(ProducerFactory&lt;String, Object&gt; pf) {
ConcurrentKafkaListenerContainerFactory&lt;String, ProductViewModel&gt; factory =
new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
factory.setConsumerFactory(consumerFactoryAddUpdateProduct());
factory.setReplyTemplate(kafkaTemplate(pf));
return factory;
}

Error

org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$3(ReplyingKafkaTemplate.java:339) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

答案1

得分: 1

这是一个已知的问题;我正在解决方案;请求/回复处理基于spring-messaging;当您消耗原始的ConsumerRecord时,回复处理不起作用,因为消息逻辑被绕过。

解决方法是将方法更改为public Object UpdateProduct(Message&lt;ProductViewModel&gt; message)

头部包含在message.getHeaders()中,键位于头部KafkaHeaders.MESSAGE_KEY中。

ProductViewModel就是message.getPayload()

英文:

This is a known problem; I am working on solution; the request/reply processing is based on spring-messaging; when you consume the raw ConsumerRecord, the reply handing doesn't work because the messaging logic is bypassed.

The work around is to change the method to public Object UpdateProduct(Message&lt;ProductViewModel&gt; message) instead.

The headers are contained in message.getHeaders(), the key is in header KafkaHeaders.MESSAGE_KEY.

The ProductViewModel is message.getPayload().

huangapple
  • 本文由 发表于 2020年8月27日 20:21:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/63615894.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定