英文:
KafkaMessageListenerContainer how to do nack for specific error
问题
我正在使用KafkaMessageListenerContainer
与(KafkaAdapter
)。
在特定错误情况下,我该如何“nack”偏移,以便下一个poll()
会再次获取它们?
properties.setAckMode(ContainerProperties.AckMode.BATCH);
final KafkaMessageListenerContainer<String, String> kafkaContainer = new KafkaMessageListenerContainer<>(consumerFactory, properties);
kafkaContainer.setCommonErrorHandler(new CommonErrorHandler() {
@Override
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
CommonErrorHandler.super.handleBatch(thrownException, data, consumer, container, invokeListener);
}
});
在handleBatch
中,我检测到了异常,对于这个特定的异常,我想要执行"nack"操作。尝试从那里抛出RuntimeException。
使用Spring Boot 2.7版本。
英文:
I am using KafkaMessageListenerContainer
with (KafkaAdapter
).
How can I "nack" offsets in case of specific error, so the next poll()
will take them again?
properties.setAckMode(ContainerProperties.AckMode.BATCH);
final KafkaMessageListenerContainer<String, String> kafkaContainer = new KafkaMessageListenerContainer<>(consumerFactory , properties);
kafkaContainer.setCommonErrorHandler(new CommonErrorHandler() {
@Override
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
CommonErrorHandler.super.handleBatch(thrownException, data, consumer, container, invokeListener);
}
});
Inside handleBatch
I am detecting the exception, for that specific exception I would like to do nack.
Tried to throw from there RuntimeException.
using springboot 2.7
答案1
得分: 0
使用DefaultErrorHandler
- 它正是这样做的(整个批次根据退避策略进行重试)。您可以分类哪些异常是可重试的,哪些不可重试。
如果抛出BatchListenerFailedException
,您可以指定批次中哪个记录失败,并仅重试该记录(以及后续记录)。
编辑
如果抛出任何其他类型的异常,DefaultErrorHandler
会退回到使用FallbackBatchErrorHandler
,该处理程序调用ErrorHandlingUtils.retryBatch()
,暂停消费者并重新传递整个批次而不寻找和重新轮询(循环内的轮询不返回记录,因为消费者已暂停)。
请参阅文档。https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh
这是必需的,因为在执行seek操作后,无法保证批次将按相同顺序获取。
这是因为我们需要了解批次的状态(重试了多少次)。如果批次不断变化,我们无法做到这一点,因此我上面描述的算法。
要无限重试,您可以使用maxAttempts
属性中的Long.MAX_VALUE
和FixedBackOff
,或者使用没有终止的ExponentialBackOff
。
只需确保最大的退避时间(以及批处理处理时间)明显小于max.poll.interval.ms
,以避免重新平衡。
英文:
Use the DefaultErrorHandler
- it does exactly that (the whole batch is retried according to the back off). You can classify which exceptions are retryable or not.
If you throw a BatchListenerFailedException
you can specify exactly which record in the batch had the failure and only retry it (and the following records).
EDIT
If any other type of exception is thrown, the DefaultErrorHandler falls back to using a FallbackBatchErrorHandler which calls ErrorHandlingUtils.retryBatch() which, pauses the consumer and redelivers the whole batch without seeking and re-polling (the polls within the loop return no records because the consumer is paused).
See the documentation. https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh
This is required, because there is no guarantee that the batch will be fetched in the same order after a seek.
This is because we need to know the state of the batch (how many times we have retried). We can't do that if the batch keeps changing; hence the algorithm I described above.
To retry indefinitely you can, for example, use a FixedBackOff
with Long.MAX_VALUE
in the maxAttempts
property. Or use an ExponentialBackOff
with no termination.
Just be sure that the largest back off (and time to process a batch) is significantly less than max.poll.interval.ms
to avoid a rebalance.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论