英文:
How to publish Spring Kafka DLQ in 2.5.4 version
问题
需要您在此方面提供帮助和指导。
我在当前项目中使用了2.2.X版本的spring-kafka。
我创建的错误处理如下所示:
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
(record, ex) -> new TopicPartition("topic-undelivered", -1));
}
然后,我升级了所有项目依赖的版本,如spring-boot和spring-kafka到最新版本:2.5.4 RELEASE
我发现一些方法已经被弃用和更改。
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
// 在3次失败后恢复,没有退避 - 例如发送到死信主题
}, new FixedBackOff(0L, 2L));
我的问题是,如何使用这些配置生成DLQ:
已编辑
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(consumerConcurrencyCount);
factory.setErrorHandler(errorHandler());
return factory;
}
public SeekToCurrentErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler(
deadLetterPublishingRecoverer(),
new FixedBackOff(0L, 2L)
);
}
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(
getEventKafkaTemplate(),
(record, ex) -> {
if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
return new TopicPartition("topic-undelivered", -1);
}
return new TopicPartition("topic-fail", -1);
});
}
public KafkaOperations<String, Object> getEventKafkaTemplate() { // producer to DLQ
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}
这些配置可行,感谢Gary的帮助!
提前感谢您的帮助。
英文:
need your help and guidance on this.
I was using 2.2.X version spring-kafka in my current project.
The error handling that I created looks like this:
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
(record, ex) -> new TopicPartition("topic-undelivered", -1));
}
And then I upgraded all my project dependency version, such as spring-boot and the spring-kafka into the latest one : 2.5.4 RELEASE
I found that some of the methods were deprecated and changed.
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
// recover after 3 failures, woth no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
My question is,
how to produce the DLQ with these configurations:
EDITED
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(consumerConcurrencyCount);
factory.setErrorHandler(errorHandler());
return factory;
}
public SeekToCurrentErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler(
deadLetterPublishingRecoverer(),
new FixedBackOff(0L, 2L)
);
}
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(
getEventKafkaTemplate(),
(record, ex) -> {
if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
return new TopicPartition("topic-undelivered", -1);
}
return new TopicPartition("topic-fail", -1);
});
}
public KafkaOperations<String, Object> getEventKafkaTemplate() { // producer to DLQ
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}
This configurations work, thanks to Gary!
Thanks in advance
答案1
得分: 0
以下是翻译好的部分:
"在文档中,你所指的问题不太清楚,文档仍然在使用已经弃用的旧方法,这个问题出现在2.5.X版本中。"
"KafkaOperations
是一个接口,KafkaTemplate
实现了它;你需要做的唯一更改是将 maxAttempts
更改为 BackOff
..."
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), new FixedBackOff(0, 2L)));
return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
(record, ex) -> new TopicPartition("topic-undelivered", -1));
}
英文:
It's not clear what you mean by
>The problem is, in the documentation, it's still using the old method, which is deprecated for 2.5.X version
The KafkaOperations
is an interface that the KafkaTemplate
implements; the only change you need to make is to change the maxAttempts
to a BackOff
...
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), new FixedBackOff(0, 2L));
return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
(record, ex) -> new TopicPartition("topic-undelivered", -1));
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论