将偏移量提交给Kafka在多次尝试处理失败后,使用reactor-kafka。

huangapple go评论143阅读模式
英文:

Commit offset to Kafka in reactor-kafka after a few attempts to process have failed

问题

有一个 Kafka 主题,其中会有消息到达。我需要读取一条消息,处理它,然后继续处理下一条消息。消息处理可能会失败,如果发生失败,必须在继续处理下一条消息之前尝试重新处理几次(比方说,10次)。如果处理连续失败了10次,该消息需要被丢弃,然后我们应该继续处理下一条消息。

我们使用 reactor-kafka,所有处理都需要是响应式的。

以下是我尝试解决这个问题的方式:

Flux.defer(receiver::receive)
    .concatMap(this::processRecord)
    .retryBackoff(10, ofMillis(500))
    .concatMap(record -> record.receiverOffset().commit())
    .subscribe();

(这里的 receiver 是一个 KafkaReceiver<String, String>)。

这在没有任何异常的情况下是有效的,如果出现异常,processRecord() 会重试10次。问题在于,如果在允许的尝试次数内仍然失败了10次,偏移量不会被提交(当然),所以下次仍会从 Kafka 中读取相同的偏移量,导致处理在“有问题”的偏移量上永远卡住。

我尝试实现了以下明显的想法:如果一个异常“透过”了 retryBackoff() 操作符,那么提交当前的偏移量。要提交偏移量,我们需要一个 ReceiverRecord,所以我将异常包装在 ExceptionWithRecord 中,同时与当前记录一起添加:

// 在 processRecord() 中
.onErrorMap(ex -> new ExceptionWithRecord(record, ex))

以及

Flux.defer(receiver::receive)
    .concatMap(this::processRecord)
    .retryBackoff(10, ofMillis(500))
    .concatMap(record -> record.receiverOffset().commit())
    .onErrorResume(this::extractRecordAndMaybeCommit)
    .subscribe();

extractRecordAndMaybeCommit() 从给定的异常中提取 ReceiverRecord 并提交它:

return record.receiverOffset().commit();

尽管将记录传递并在重试次数用尽时提交记录的方法是有效的,.commit() 方法确实被调用了,但它没有效果。

事实证明,一旦任何异常进入上述响应式管道,DefaultKafkaReceiver.dispose() 就会被调用,因此任何后续的提交尝试都会被忽略。因此,实际上不可能在发布者“看到”任何异常时立即提交偏移量。

在仍然使用 reactor-kafka 的情况下,如何实现“在 N 次错误后提交”的行为呢?

英文:

There is a Kafka topic to which messages arrive. I need to read a message, process it and proceed to the next message. Message processing can fail, and if it happens, the processing has to be retried a few times (let's say, 10 times) before I can move on to the next message. If the processing fails for 10 times, the message needs to be dropped and we should continue with the next message.

We use reactor-kafka, all the processing needs to be reactive.

Here is how I tried to solve this:

Flux.defer(receiver::receive)
        .concatMap(this::processRecord)
        .retryBackoff(10, ofMillis(500))
        .concatMap(record -&gt; record.receiverOffset().commit())
        .subscribe();

(here receiver is a KafkaReceiver&lt;String, String&gt;).

This works for the case without any exceptions, and if there is an exception, processRecord() is retried 10 times. The problem here is that if it still fails after 10 allowed attempts, the offset is not committed (of course), so next time the same offset is read from Kafka, so, effectively, the processing gets stuck forever on the 'faulty' offset.

I tried to implement the following obvious idea: if an exception 'passes further' than the retryBackoff() operator, commit the current offset. To commit an offset, we need a ReceiverRecord, so I adding wrapping of an exception in ExceptionWithRecord together with the current record:

// in processRecord()
.onErrorMap(ex -&gt; new ExceptionWithRecord(record, ex))

and

Flux.defer(receiver::receive)
        .concatMap(this::processRecord)
        .retryBackoff(10, ofMillis(500))
        .concatMap(record -&gt; record.receiverOffset().commit())
        .onErrorResume(this::extractRecordAndMaybeCommit)
        .subscribe();

extractRecordAndMaybeCommit() extracts the ReceiverRecord from the given exception and commits it:

return record.receiverOffset().commit();

This method of passing a record and later committing it if the retries are exhausted works, and the .commit() method is called. but it has no effect.

It turns out that, as seen as any exception enters the reactive pipeline above, DefaultKafkaReceiver.dispose() is called, so any subsequent commit attempt is ignored. So it turns out that it is simply not possible to commit an offset as soon as any exception gets 'seen' by the publishers.

How can be 'commit after N errors' behavior implemented while still using reactor-kafka?

答案1

得分: 2

这是计数器:

public class RetryCounter {
    private final Map<TopicPartition, OffsetAttempts> partitionOffsets = new ConcurrentHashMap<>();

    public void onRecord(PartitionOffset partitionOffset) {
        var offsetAttempts = offsetAttemptsFor(partitionOffset);
        offsetAttempts.increaseAttemptNumber(partitionOffset.offset());
        offsetAttempts.pruneTooAncientFor(partitionOffset.offset());
    }

    public long currentAttemptFor(PartitionOffset partitionOffset) {
        var offsetAttempts = offsetAttemptsFor(partitionOffset);
        long result = offsetAttempts.currentAttemptFor(partitionOffset.offset());

        return result;
    }

    private OffsetAttempts offsetAttemptsFor(PartitionOffset partitionOffset) {
        return partitionOffsets.computeIfAbsent(partitionOffset.topicPartition(), key -> new OffsetAttempts());
    }

    private static class OffsetAttempts {
        private final NavigableMap<Long, Long> offsetAttempts = new ConcurrentSkipListMap<>();

        // this must exceed your Kafka batch size
        private static final int ANTIQUITY_SPREAD_THRESHOLD = 10000;

        public void increaseAttemptNumber(long offset) {
            offsetAttempts.merge(offset, 0L, (oldValue, value) -> oldValue + 1);
        }

        public long currentAttemptFor(long offset) {
            return offsetAttempts.getOrDefault(offset, 0L);
        }

        @Override
        public String toString() {
            return offsetAttempts.toString();
        }

        public void pruneTooAncientFor(long offset) {
            long antiquityThreshold = offset - ANTIQUITY_SPREAD_THRESHOLD;

            offsetAttempts.headMap(antiquityThreshold).clear();
        }
    }
}

然后我们对每个偏移量(针对每个分区独立地)进行重试计数,并在重试次数超过限制时停止处理:

RetryCounter counter = new RetryCounter();
Flux.defer(receiver::receive)
        .concatMap(record -> {
            counter.onRecord(record);
            if (counter.currentAttemptFor(record) >= 10) {
                // 我们尝试了10次,这是第11次,所以让我们记录错误并返回
                // 以避免调用 processRecord(),从而在响应式流水线中没有错误
                // 并且我们能够提交
                logFinalError(record);
                return Mono.just(record).flatMap(this::commitRecord);
            } else {
                return processRecord(record).thenReturn(record).flatMap(this::commitRecord);
            }
        })
        .retryBackoff(Long.MAX_VALUE, ofMillis(500))
        .subscribe();
英文:

I was not able to find a 'proper' and easy way to solve the task, so I had to resort to the brute force of state and side effects: count the retries manually and stop retrying when the attempt count exceeds the limit.

Here is the counter:

public class RetryCounter {
private final Map&lt;TopicPartition, OffsetAttempts&gt; partitionOffsets = new ConcurrentHashMap&lt;&gt;();
public void onRecord(PartitionOffset partitionOffset) {
var offsetAttempts = offsetAttemptsFor(partitionOffset);
offsetAttempts.increaseAttemptNumber(partitionOffset.offset());
offsetAttempts.pruneTooAncientFor(partitionOffset.offset());
}
public long currentAttemptFor(PartitionOffset partitionOffset) {
var offsetAttempts = offsetAttemptsFor(partitionOffset);
long result = offsetAttempts.currentAttemptFor(partitionOffset.offset());
return result;
}
private OffsetAttempts offsetAttemptsFor(PartitionOffset partitionOffset) {
return partitionOffsets.computeIfAbsent(partitionOffset.topicPartition(), key -&gt; new OffsetAttempts());
}
private static class OffsetAttempts {
private final NavigableMap&lt;Long, Long&gt; offsetAttempts = new ConcurrentSkipListMap&lt;&gt;();
// this must exceed your Kafka batch size
private static final int ANTIQUITY_SPREAD_THRESHOLD = 10000;
public void increaseAttemptNumber(long offset) {
offsetAttempts.merge(offset, 0L, (oldValue, value) -&gt; oldValue + 1);
}
public long currentAttemptFor(long offset) {
return offsetAttempts.getOrDefault(offset, 0L);
}
@Override
public String toString() {
return offsetAttempts.toString();
}
public void pruneTooAncientFor(long offset) {
long antiquityThreshold = offset - ANTIQUITY_SPREAD_THRESHOLD;
offsetAttempts.headMap(antiquityThreshold).clear();
}
}
}

Then we count retries of each offset (for each partition independently) and stop processing when the number of retries is exceeded:

RetryCounter counter = new RetryCounter();
Flux.defer(receiver::receive)
.concatMap(record -&gt; {
counter.onRecord(record);
if (counter.currentAttemptFor(record) &gt;= 10) {
// we tried 10 times, it&#39;s 11th, so let&#39;s log the error and return
// to avoid calling processRecord() so that there is no error
// in the reactive pipeline and we are able to commit
logFinalError(record);
return Mono.just(record).flatMap(this::commitRecord);
} else {
return processRecord(record).thenReturn(record).flatMap(this::commitRecord);
}
})
.retryBackoff(Long.MAX_VALUE, ofMillis(500))
.subscribe();

huangapple
  • 本文由 发表于 2020年4月9日 23:34:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/61124782.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定