英文:
Unable to send messages to dead letter topic from consumer in Kafka
问题
'I am trying to route a message to a Dead letter topic in Kafka in case of any failures in processing the corresponding message. I have setup the SeektoCurrentErrorHandler and DeadLetterPublishingRecoverer for this functionality.
The consumer throws the following exception while doing this:
2020-08-07 12:09:38.841 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='a6558a22-470d-4708-b297-814996a42045' and payload='{123, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 116, 101, 115, 116, 95, 101, 120, 1...' to topic test_execution.DLT and partition 2:
org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.
2020-08-07 12:09:38.846 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer : Dead-letter publication failed for: ProducerRecord(topic=test_execution.DLT, partition=2, headers=RecordHeaders(headers = [RecordHeader(key = kafka_dlt-original-topic, value = [116, 101, 115, 116, 95, 101, 120, 101, 99, 117, 116, 105, 111, 110]), RecordHeader(key = kafka_dlt-original-partition, value = [0, 0, 0, 2]), RecordHeader(key = kafka_dlt-original-offset, value = [0, 0, 0, 0, 0, 23, 15, -72]), RecordHeader(key = kafka_dlt-original-timestamp, value = [0, 0, 1, 115, -57, 103, -70, -126]), RecordHeader(key = kafka_dlt-original-timestamp-type, value = [67, 114, 101, 97, 116, 101, 84, 105, 109, 101]), RecordHeader(key = kafka_dlt-exception-fqcn, value = [111, 114, 103, 46, 115, 112, 114, 105, 110, 103, 102, 114, 97, 109, 101, 119, 111, 114, 107, 46, 107, 97, 102, 107, 97, 46, 108, 105, 115, 116, 101, 110, 101, 114, 46, 76, 105, 115, 116, 101, 110, 101, 114, 69, 120, 101, 99, 117, 116, 105, 111, 110, 70, 97, 105, 108, 69, 120, 99, 101, 112, 116, 105, 111, 110]), RecordHeader(key = kafka_dlt-exception-message, value = [
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:385) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:278) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:214) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:54) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.FailedRecordProcessor.getSkipPredicate(FailedRecordProcessor.java:167) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:104) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.
I have already created the test_execution.DLT topic in the kafka cluster and I am clearly able to produce messages to this topic from
英文:
'I am trying to route a message to a Dead letter topic in Kafka in case of any failures in processing the corresponding message. I have setup the SeektoCurrentErrorHandler and DeadLetterPublishingRecoverer for this functionality.
The consumer throws the following exception while doing this:
2020-08-07 12:09:38.841 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='a6558a22-470d-4708-b297-814996a42045' and payload='{123, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 116, 101, 115, 116, 95, 101, 120, 1...' to topic test_execution.DLT and partition 2:
org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.
2020-08-07 12:09:38.846 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer : Dead-letter publication failed for: ProducerRecord(topic=test_execution.DLT, partition=2, headers=RecordHeaders(headers = [RecordHeader(key = kafka_dlt-original-topic, value = [116, 101, 115, 116, 95, 101, 120, 101, 99, 117, 116, 105, 111, 110]), RecordHeader(key = kafka_dlt-original-partition, value = [0, 0, 0, 2]), RecordHeader(key = kafka_dlt-original-offset, value = [0, 0, 0, 0, 0, 23, 15, -72]), RecordHeader(key = kafka_dlt-original-timestamp, value = [0, 0, 1, 115, -57, 103, -70, -126]), RecordHeader(key = kafka_dlt-original-timestamp-type, value = [67, 114, 101, 97, 116, 101, 84, 105, 109, 101]), RecordHeader(key = kafka_dlt-exception-fqcn, value = [111, 114, 103, 46, 115, 112, 114, 105, 110, 103, 102, 114, 97, 109, 101, 119, 111, 114, 107, 46, 107, 97, 102, 107, 97, 46, 108, 105, 115, 116, 101, 110, 101, 114, 46, 76, 105, 115, 116, 101, 110, 101, 114, 69, 120, 101, 99, 117, 116, 105, 111, 110, 70, 97, 105, 108, 101, 100, 69, 120, 99, 101, 112, 116, 105, 111, 110]), RecordHeader(key = kafka_dlt-exception-message, value = [
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:385) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:278) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:214) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:54) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.FailedRecordProcessor.getSkipPredicate(FailedRecordProcessor.java:167) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:104) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.
I have already created the test_execution.DLT topic in the kafka cluster and I am clearly able to produce messages to this topic from the consoler-producer.
The consumer is running inside a docker container and the kafka cluster is a 3 VM setup.
These are the configs used by the kafka consumer:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put("spring.kafka.consumer.properties.spring.deserializer.key.delegate.class", StringDeserializer.class);
props.put("spring.kafka.consumer.properties.spring.deserializer.value.delegate.class", JsonDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return props;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new ErrorHandlingDeserializer<>(new StringDeserializer()),
new ErrorHandlingDeserializer<>(new JsonDeserializer<>(AutomationEvent.class,false)));
}
@Bean
public SeekToCurrentErrorHandler errorHandler(KafkaOperations kafkaOperations) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaOperations), new FixedBackOff(10000, 3));
}
Am I missing something here? Do I need to modify any server configuration for this to be updated?
答案1
得分: 3
>test_execution.DLT 不存在
该框架不会自动为您创建死信主题,您必须已经存在该主题。
您可以通过添加一个`NewTopic` `@Bean`来指示框架创建主题。
有关示例,请参见[此答案](https://stackoverflow.com/questions/63270367/deadletterpublishingrecoverer-dead-letter-publication-failed-with-invalidtopic/63270883#63270883)。
__编辑__
默认情况下,我们会将记录发送到同一个分区,因此DLT必须至少具有与原始主题相同数量的分区,除非您提供目标解析器。
请参阅[文档](https://docs.spring.io/spring-kafka/docs/2.5.4.RELEASE/reference/html/#dead-letters)。
>默认情况下,死信记录会被发送到名为<originalTopic>.DLT的主题(原始主题名称加上.DLT后缀),并发送到与原始记录相同的分区。因此,当您使用默认解析器时,死信主题的分区至少必须与原始主题的分区数量相同。如果返回的TopicPartition具有负分区,则分区未在ProducerRecord中设置,因此分区由Kafka选择。
英文:
>test_execution.DLT not present
The framework does not automatically create the dead letter topic for you; it has to exist already.
You can instruct the framework to create the topic by adding a NewTopic
@Bean
.
See this answer for an example.
EDIT
By default, we will send the record to the same partition so the DLT must have at least as many partitions as the original topic, unless you supply a destination resolver.
See the documentation.
>By default, the dead-letter record is sent to a topic named <originalTopic>.DLT (the original topic name suffixed with .DLT) and to the same partition as the original record. Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic. If the returned TopicPartition has a negative partition, the partition is not set in the ProducerRecord, so the partition is selected by Kafka.
答案2
得分: 1
确保在想要从消费者发送消息到主题时,还要指定生产者配置。用于此的属性是 spring.kafka.producer.bootstrap-servers
。
没有这个属性,生产者组件会默认尝试连接到 localhost,从而导致找不到主题。
英文:
Make sure you also specify the producer configuration if you want to send messages to topics from the consumers. The property for this is spring.kafka.producer.bootstrap-servers
This property is necessary or else the producer component tries to connect to locahost by default, which leads to no topics found.
答案3
得分: 0
消费者无需向DLT发送任何信息,由框架处理,只需在主题存在之前。
英文:
Consumer does not have to send any information to DLT, it is handled by the framework, only that the topic has to exist before
Follow this discussion -> https://stackoverflow.com/questions/63270367/deadletterpublishingrecoverer-dead-letter-publication-failed-with-invalidtopic/63270883#63270883
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论