英文:
Spring-kafka Record offset commit time for Async manual commits
问题
Here is the translated content you requested:
我有一个基于Spring Kafka的监听器,它接收一个批次,进行处理,然后像下面的代码一样手动确认:
@KafkaListener(topics = my_topic)
public void consume(@Payload List<String> messages,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
for (String message: messages) {
// 处理每条消息
}
// 理想情况下,计时器将在此记录开始时间
acknowledgment.acknowledge();
}
我的配置类包含提交回调逻辑。我想记录在调用确认方法和接收回调之间的时间。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
log.info("创建Kafka监听器工厂");
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setSyncCommits(false);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 更多配置
factory.getContainerProperties().setCommitCallback((map, ex) -> {
// 理想情况下,结束时间将在此记录,然后发布计时器度量值。
if (ex == null) {
log.info("成功提交 {} ", map);
} else {
log.error("提交失败 {},异常:{}", map, ex.getMessage());
}
});
}
如何在Spring Kafka中实现这个功能?确认方法不允许我传递任何值以供回调引用。
英文:
I have a spring kafka based listener that receives a batch, processes and then manually acknowledges it as in the code below:
@KafkaListener(topics = my_topic)
public void consume(@Payload List<String> messages,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
for (String message: messages) {
// process each message
}
// Ideally the timer will record start time here
acknowledgement.acknowledge();
My config class has the commit callback logic. I would like to record a timer for the time between when the acknowledge is called and when the callback is received.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
log.info("Creating kafka listener factory");
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setSyncCommits(false);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// More config
factory.getContainerProperties().setCommitCallback((map, ex) -> {
// Ideally the end time will be recorded here and timer metric published.
if (ex == null) {
log.info("Successful commit for {}", map);
} else {
log.error("Commit failed for {} with exception: {}", map, ex.getMessage());
}
});
}
How can I do it with spring kafka? The acknowledge method doesn't let me pass any values to refer in the callback.
答案1
得分: 0
你可以通过配置OffsetAndMetadataProvider
向OffsetAndMetadata
添加元数据。
它会被调用以为每个将要提交的偏移量提供元数据。
/**
* {@link OffsetAndMetadata}的提供者。该提供者可用于在创建{@link OffsetAndMetadata}时具有更细粒度的控制。
* 该提供者用于偏移量的同步和异步提交。
*
* 作者:Francois Rosiere
* 自2.8.5版本起提供
* 参见org.apache.kafka.clients.consumer.OffsetCommitCallback
*/
public interface OffsetAndMetadataProvider {
/**
* 为给定的监听器元数据和偏移量提供偏移量和元数据对象。
*
* @param listenerMetadata 与监听器相关联的元数据。
* @param offset 偏移量。
* @return 偏移量和元数据。
*/
OffsetAndMetadata provide(ListenerMetadata listenerMetadata, long offset);
}
英文:
You can add metadata to the OffsetAndMetadata
by configuring an OffsetAndMetadataProvider
.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#offsetAndMetadataProvider
It is called for each offset that will be committed.
/**
* Provider for {@link OffsetAndMetadata}. The provider can be used to have more granularity when creating an
* {@link OffsetAndMetadata}. The provider is used for both sync and async commits of the offsets.
*
* @author Francois Rosiere
* @since 2.8.5
* @see org.apache.kafka.clients.consumer.OffsetCommitCallback
*/
public interface OffsetAndMetadataProvider {
/**
* Provide an offset and metadata object for the given listener metadata and offset.
*
* @param listenerMetadata metadata associated to a listener.
* @param offset an offset.
* @return an offset and metadata.
*/
OffsetAndMetadata provide(ListenerMetadata listenerMetadata, long offset);
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论