英文:
Spring-kafka Record offset commit time for Async manual commits
问题
Here is the translated content you requested:
我有一个基于Spring Kafka的监听器,它接收一个批次,进行处理,然后像下面的代码一样手动确认:
@KafkaListener(topics = my_topic)
public void consume(@Payload List<String> messages,
                    @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
    for (String message: messages) {
         // 处理每条消息
    }
    // 理想情况下,计时器将在此记录开始时间
    acknowledgment.acknowledge();
}
我的配置类包含提交回调逻辑。我想记录在调用确认方法和接收回调之间的时间。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    log.info("创建Kafka监听器工厂");
    ConcurrentKafkaListenerContainerFactory<String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.getContainerProperties().setSyncCommits(false);     
    factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    // 更多配置
    factory.getContainerProperties().setCommitCallback((map, ex) -> {
    // 理想情况下,结束时间将在此记录,然后发布计时器度量值。
        if (ex == null) {
            log.info("成功提交 {} ", map);
        } else {
            log.error("提交失败 {},异常:{}", map, ex.getMessage());
        }
    });
}
如何在Spring Kafka中实现这个功能?确认方法不允许我传递任何值以供回调引用。
英文:
I have a spring kafka based listener that receives a batch, processes and then manually acknowledges it as in the code below:
    @KafkaListener(topics = my_topic)
    public void consume(@Payload List<String> messages,
                        @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
    for (String message: messages) {
         // process each message
    }
    // Ideally the timer will record start time here
    acknowledgement.acknowledge();
My config class has the commit callback logic. I would like to record a timer for the time between when the acknowledge is called and when the callback is received.
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        log.info("Creating kafka listener factory");
        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setSyncCommits(false);     
        factory.getContainerProperties()
                .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // More config
        factory.getContainerProperties().setCommitCallback((map, ex) -> {
        // Ideally the end time will be recorded here and timer metric published.
            if (ex == null) {
                log.info("Successful commit for {}", map);
            } else {
                log.error("Commit failed for {} with exception: {}", map, ex.getMessage());
            }
        });
    }
How can I do it with spring kafka? The acknowledge method doesn't let me pass any values to refer in the callback.
答案1
得分: 0
你可以通过配置OffsetAndMetadataProvider向OffsetAndMetadata添加元数据。
它会被调用以为每个将要提交的偏移量提供元数据。
/**
 * {@link OffsetAndMetadata}的提供者。该提供者可用于在创建{@link OffsetAndMetadata}时具有更细粒度的控制。
 * 该提供者用于偏移量的同步和异步提交。
 *
 * 作者:Francois Rosiere
 * 自2.8.5版本起提供
 * 参见org.apache.kafka.clients.consumer.OffsetCommitCallback
 */
public interface OffsetAndMetadataProvider {
	/**
	 * 为给定的监听器元数据和偏移量提供偏移量和元数据对象。
	 *
	 * @param listenerMetadata 与监听器相关联的元数据。
	 * @param offset 偏移量。
	 * @return 偏移量和元数据。
	 */
	OffsetAndMetadata provide(ListenerMetadata listenerMetadata, long offset);
}
英文:
You can add metadata to the OffsetAndMetadata by configuring an OffsetAndMetadataProvider.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#offsetAndMetadataProvider
It is called for each offset that will be committed.
/**
 * Provider for {@link OffsetAndMetadata}. The provider can be used to have more granularity when creating an
 * {@link OffsetAndMetadata}. The provider is used for both sync and async commits of the offsets.
 *
 * @author Francois Rosiere
 * @since 2.8.5
 * @see org.apache.kafka.clients.consumer.OffsetCommitCallback
 */
public interface OffsetAndMetadataProvider {
	/**
	 * Provide an offset and metadata object for the given listener metadata and offset.
	 *
	 * @param listenerMetadata metadata associated to a listener.
	 * @param offset an offset.
	 * @return an offset and metadata.
	 */
	OffsetAndMetadata provide(ListenerMetadata listenerMetadata, long offset);
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论