如何在2.5.4版本中发布Spring Kafka DLQ

huangapple go评论79阅读模式
英文:

How to publish Spring Kafka DLQ in 2.5.4 version

问题

需要您在此方面提供帮助和指导。

我在当前项目中使用了2.2.X版本的spring-kafka。

我创建的错误处理如下所示:

@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
    return factory;
}

public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -> new TopicPartition("topic-undelivered", -1));
}

然后,我升级了所有项目依赖的版本,如spring-boot和spring-kafka到最新版本:2.5.4 RELEASE

我发现一些方法已经被弃用和更改。

SeekToCurrentErrorHandler

SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
    // 在3次失败后恢复,没有退避 - 例如发送到死信主题
}, new FixedBackOff(0L, 2L));

我的问题是,如何使用这些配置生成DLQ:

已编辑

@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(consumerConcurrencyCount);
    factory.setErrorHandler(errorHandler());
    return factory;
}

public SeekToCurrentErrorHandler errorHandler() {
    return new SeekToCurrentErrorHandler(
            deadLetterPublishingRecoverer(),
            new FixedBackOff(0L, 2L)
    );
}

public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(
            getEventKafkaTemplate(),
            (record, ex) -> {
                if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
                    return new TopicPartition("topic-undelivered", -1);
                }

                return new TopicPartition("topic-fail", -1);
            });
}

public KafkaOperations<String, Object> getEventKafkaTemplate() { // producer to DLQ
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}

这些配置可行,感谢Gary的帮助!

提前感谢您的帮助。

英文:

need your help and guidance on this.

I was using 2.2.X version spring-kafka in my current project.

The error handling that I created looks like this:

@Bean(&quot;kafkaConsumer&quot;)
public ConcurrentKafkaListenerContainerFactory&lt;String, Map&lt;String, Object&gt;&gt; eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory&lt;String, Map&lt;String, Object&gt;&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
    return factory;
}

public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -&gt; new TopicPartition(&quot;topic-undelivered&quot;, -1));
}

And then I upgraded all my project dependency version, such as spring-boot and the spring-kafka into the latest one : 2.5.4 RELEASE

I found that some of the methods were deprecated and changed.

SeekToCurrentErrorHandler

SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -&gt; {
    // recover after 3 failures, woth no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));

My question is,
how to produce the DLQ with these configurations:

EDITED

@Bean(&quot;kafkaConsumer&quot;)
public ConcurrentKafkaListenerContainerFactory&lt;String, Map&lt;String, Object&gt;&gt; kafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory&lt;String, Map&lt;String, Object&gt;&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(consumerConcurrencyCount);
    factory.setErrorHandler(errorHandler());
    return factory;
}

public SeekToCurrentErrorHandler errorHandler() {
    return new SeekToCurrentErrorHandler(
            deadLetterPublishingRecoverer(),
            new FixedBackOff(0L, 2L)
    );
}

public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(
            getEventKafkaTemplate(),
            (record, ex) -&gt; {
                if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
                    return new TopicPartition(&quot;topic-undelivered&quot;, -1);
                }

                return new TopicPartition(&quot;topic-fail&quot;, -1);
            });
}

public KafkaOperations&lt;String, Object&gt; getEventKafkaTemplate() { // producer to DLQ
    return new KafkaTemplate&lt;&gt;(new DefaultKafkaProducerFactory&lt;&gt;(producerConfigs()));
}

This configurations work, thanks to Gary!

Thanks in advance

答案1

得分: 0

以下是翻译好的部分:

"在文档中,你所指的问题不太清楚,文档仍然在使用已经弃用的旧方法,这个问题出现在2.5.X版本中。"

"KafkaOperations 是一个接口,KafkaTemplate 实现了它;你需要做的唯一更改是将 maxAttempts 更改为 BackOff..."

@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), new FixedBackOff(0, 2L)));
    return factory;
}

public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -> new TopicPartition("topic-undelivered", -1));
}
英文:

It's not clear what you mean by

>The problem is, in the documentation, it's still using the old method, which is deprecated for 2.5.X version

The KafkaOperations is an interface that the KafkaTemplate implements; the only change you need to make is to change the maxAttempts to a BackOff...

@Bean(&quot;kafkaConsumer&quot;)
public ConcurrentKafkaListenerContainerFactory&lt;String, Map&lt;String, Object&gt;&gt; eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory&lt;String, Map&lt;String, Object&gt;&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), new FixedBackOff(0, 2L));
    return factory;
}

public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -&gt; new TopicPartition(&quot;topic-undelivered&quot;, -1));
}

huangapple
  • 本文由 发表于 2020年8月6日 13:40:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/63277494.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定