KafkaMessageListenerContainer 如何对特定错误进行消息拒绝 (nack)

huangapple go评论52阅读模式
英文:

KafkaMessageListenerContainer how to do nack for specific error

问题

我正在使用KafkaMessageListenerContainer与(KafkaAdapter)。

在特定错误情况下,我该如何“nack”偏移,以便下一个poll()会再次获取它们?

properties.setAckMode(ContainerProperties.AckMode.BATCH);

final KafkaMessageListenerContainer<String, String> kafkaContainer = new KafkaMessageListenerContainer<>(consumerFactory, properties);

kafkaContainer.setCommonErrorHandler(new CommonErrorHandler() {
    
  @Override
  public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
         CommonErrorHandler.super.handleBatch(thrownException, data, consumer, container, invokeListener);
   }

});

handleBatch中,我检测到了异常,对于这个特定的异常,我想要执行"nack"操作。尝试从那里抛出RuntimeException。

使用Spring Boot 2.7版本。

英文:

I am using KafkaMessageListenerContainer with (KafkaAdapter).

How can I "nack" offsets in case of specific error, so the next poll() will take them again?

properties.setAckMode(ContainerProperties.AckMode.BATCH);

final KafkaMessageListenerContainer&lt;String, String&gt; kafkaContainer = new KafkaMessageListenerContainer&lt;&gt;(consumerFactory , properties);



kafkaContainer.setCommonErrorHandler(new CommonErrorHandler() {
     

  @Override
  public void handleBatch(Exception thrownException, ConsumerRecords&lt;?, ?&gt; data, Consumer&lt;?, ?&gt; consumer, MessageListenerContainer container, Runnable invokeListener) {
         CommonErrorHandler.super.handleBatch(thrownException, data, consumer, container, invokeListener);
   }

});

Inside handleBatch I am detecting the exception, for that specific exception I would like to do nack.
Tried to throw from there RuntimeException.

using springboot 2.7

答案1

得分: 0

使用DefaultErrorHandler - 它正是这样做的(整个批次根据退避策略进行重试)。您可以分类哪些异常是可重试的,哪些不可重试。

如果抛出BatchListenerFailedException,您可以指定批次中哪个记录失败,并仅重试该记录(以及后续记录)。

编辑

如果抛出任何其他类型的异常,DefaultErrorHandler会退回到使用FallbackBatchErrorHandler,该处理程序调用ErrorHandlingUtils.retryBatch(),暂停消费者并重新传递整个批次而不寻找和重新轮询(循环内的轮询不返回记录,因为消费者已暂停)。

请参阅文档。https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh

这是必需的,因为在执行seek操作后,无法保证批次将按相同顺序获取。

这是因为我们需要了解批次的状态(重试了多少次)。如果批次不断变化,我们无法做到这一点,因此我上面描述的算法。

要无限重试,您可以使用maxAttempts属性中的Long.MAX_VALUEFixedBackOff,或者使用没有终止的ExponentialBackOff

只需确保最大的退避时间(以及批处理处理时间)明显小于max.poll.interval.ms,以避免重新平衡。

英文:

Use the DefaultErrorHandler - it does exactly that (the whole batch is retried according to the back off). You can classify which exceptions are retryable or not.

If you throw a BatchListenerFailedException you can specify exactly which record in the batch had the failure and only retry it (and the following records).

EDIT

If any other type of exception is thrown, the DefaultErrorHandler falls back to using a FallbackBatchErrorHandler which calls ErrorHandlingUtils.retryBatch() which, pauses the consumer and redelivers the whole batch without seeking and re-polling (the polls within the loop return no records because the consumer is paused).

See the documentation. https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh

This is required, because there is no guarantee that the batch will be fetched in the same order after a seek.

This is because we need to know the state of the batch (how many times we have retried). We can't do that if the batch keeps changing; hence the algorithm I described above.

To retry indefinitely you can, for example, use a FixedBackOff with Long.MAX_VALUE in the maxAttempts property. Or use an ExponentialBackOff with no termination.

Just be sure that the largest back off (and time to process a batch) is significantly less than max.poll.interval.ms to avoid a rebalance.

huangapple
  • 本文由 发表于 2023年2月6日 02:49:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/75354701.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定