英文:
How to publish Spring Kafka DLQ in 2.5.4 version
问题
需要您在此方面提供帮助和指导。
我在当前项目中使用了2.2.X版本的spring-kafka。
我创建的错误处理如下所示:
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
    return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -> new TopicPartition("topic-undelivered", -1));
}
然后,我升级了所有项目依赖的版本,如spring-boot和spring-kafka到最新版本:2.5.4 RELEASE
我发现一些方法已经被弃用和更改。
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
    // 在3次失败后恢复,没有退避 - 例如发送到死信主题
}, new FixedBackOff(0L, 2L));
我的问题是,如何使用这些配置生成DLQ:
已编辑
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(consumerConcurrencyCount);
    factory.setErrorHandler(errorHandler());
    return factory;
}
public SeekToCurrentErrorHandler errorHandler() {
    return new SeekToCurrentErrorHandler(
            deadLetterPublishingRecoverer(),
            new FixedBackOff(0L, 2L)
    );
}
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(
            getEventKafkaTemplate(),
            (record, ex) -> {
                if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
                    return new TopicPartition("topic-undelivered", -1);
                }
                return new TopicPartition("topic-fail", -1);
            });
}
public KafkaOperations<String, Object> getEventKafkaTemplate() { // producer to DLQ
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}
这些配置可行,感谢Gary的帮助!
提前感谢您的帮助。
英文:
need your help and guidance on this.
I was using 2.2.X version spring-kafka in my current project.
The error handling that I created looks like this:
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
    return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -> new TopicPartition("topic-undelivered", -1));
}
And then I upgraded all my project dependency version, such as spring-boot and the spring-kafka into the latest one : 2.5.4 RELEASE
I found that some of the methods were deprecated and changed.
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
    // recover after 3 failures, woth no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
My question is,
how to produce the DLQ with these configurations:
EDITED
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(consumerConcurrencyCount);
    factory.setErrorHandler(errorHandler());
    return factory;
}
public SeekToCurrentErrorHandler errorHandler() {
    return new SeekToCurrentErrorHandler(
            deadLetterPublishingRecoverer(),
            new FixedBackOff(0L, 2L)
    );
}
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(
            getEventKafkaTemplate(),
            (record, ex) -> {
                if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
                    return new TopicPartition("topic-undelivered", -1);
                }
                return new TopicPartition("topic-fail", -1);
            });
}
public KafkaOperations<String, Object> getEventKafkaTemplate() { // producer to DLQ
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}
This configurations work, thanks to Gary!
Thanks in advance
答案1
得分: 0
以下是翻译好的部分:
"在文档中,你所指的问题不太清楚,文档仍然在使用已经弃用的旧方法,这个问题出现在2.5.X版本中。"
"KafkaOperations 是一个接口,KafkaTemplate 实现了它;你需要做的唯一更改是将 maxAttempts 更改为 BackOff..."
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), new FixedBackOff(0, 2L)));
    return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -> new TopicPartition("topic-undelivered", -1));
}
英文:
It's not clear what you mean by
>The problem is, in the documentation, it's still using the old method, which is deprecated for 2.5.X version
The KafkaOperations is an interface that the KafkaTemplate implements; the only change you need to make is to change the maxAttempts to a BackOff...
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), new FixedBackOff(0, 2L));
    return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -> new TopicPartition("topic-undelivered", -1));
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论