Spring-kafka记录异步手动提交的偏移量提交时间。

huangapple go评论86阅读模式
英文:

Spring-kafka Record offset commit time for Async manual commits

问题

Here is the translated content you requested:

我有一个基于Spring Kafka的监听器,它接收一个批次,进行处理,然后像下面的代码一样手动确认:

@KafkaListener(topics = my_topic)
public void consume(@Payload List<String> messages,
                    @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
    for (String message: messages) {
         // 处理每条消息
    }
    // 理想情况下,计时器将在此记录开始时间
    acknowledgment.acknowledge();
}

我的配置类包含提交回调逻辑。我想记录在调用确认方法和接收回调之间的时间。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    log.info("创建Kafka监听器工厂");
    ConcurrentKafkaListenerContainerFactory<String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.getContainerProperties().setSyncCommits(false);     
    factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    // 更多配置
    factory.getContainerProperties().setCommitCallback((map, ex) -> {
    // 理想情况下,结束时间将在此记录,然后发布计时器度量值。
        if (ex == null) {
            log.info("成功提交 {} ", map);
        } else {
            log.error("提交失败 {},异常:{}", map, ex.getMessage());
        }
    });
}

如何在Spring Kafka中实现这个功能?确认方法不允许我传递任何值以供回调引用。

英文:

I have a spring kafka based listener that receives a batch, processes and then manually acknowledges it as in the code below:

    @KafkaListener(topics = my_topic)
    public void consume(@Payload List&lt;String&gt; messages,
                        @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
    for (String message: messages) {
         // process each message
    }
    // Ideally the timer will record start time here
    acknowledgement.acknowledge();

My config class has the commit callback logic. I would like to record a timer for the time between when the acknowledge is called and when the callback is received.

    @Bean
    public ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; kafkaListenerContainerFactory() {

        log.info(&quot;Creating kafka listener factory&quot;);
        ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; factory
                = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setSyncCommits(false);     
        factory.getContainerProperties()
                .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // More config
        factory.getContainerProperties().setCommitCallback((map, ex) -&gt; {
        // Ideally the end time will be recorded here and timer metric published.
            if (ex == null) {
                log.info(&quot;Successful commit for {}&quot;, map);
            } else {
                log.error(&quot;Commit failed for {} with exception: {}&quot;, map, ex.getMessage());
            }
        });
    }

How can I do it with spring kafka? The acknowledge method doesn't let me pass any values to refer in the callback.

答案1

得分: 0

你可以通过配置OffsetAndMetadataProviderOffsetAndMetadata添加元数据。

它会被调用以为每个将要提交的偏移量提供元数据。

/**
 * {@link OffsetAndMetadata}的提供者。该提供者可用于在创建{@link OffsetAndMetadata}时具有更细粒度的控制。
 * 该提供者用于偏移量的同步和异步提交。
 *
 * 作者:Francois Rosiere
 * 自2.8.5版本起提供
 * 参见org.apache.kafka.clients.consumer.OffsetCommitCallback
 */
public interface OffsetAndMetadataProvider {

	/**
	 * 为给定的监听器元数据和偏移量提供偏移量和元数据对象。
	 *
	 * @param listenerMetadata 与监听器相关联的元数据。
	 * @param offset 偏移量。
	 * @return 偏移量和元数据。
	 */
	OffsetAndMetadata provide(ListenerMetadata listenerMetadata, long offset);
}
英文:

You can add metadata to the OffsetAndMetadata by configuring an OffsetAndMetadataProvider.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#offsetAndMetadataProvider

It is called for each offset that will be committed.

/**
 * Provider for {@link OffsetAndMetadata}. The provider can be used to have more granularity when creating an
 * {@link OffsetAndMetadata}. The provider is used for both sync and async commits of the offsets.
 *
 * @author Francois Rosiere
 * @since 2.8.5
 * @see org.apache.kafka.clients.consumer.OffsetCommitCallback
 */
public interface OffsetAndMetadataProvider {

	/**
	 * Provide an offset and metadata object for the given listener metadata and offset.
	 *
	 * @param listenerMetadata metadata associated to a listener.
	 * @param offset an offset.
	 * @return an offset and metadata.
	 */
	OffsetAndMetadata provide(ListenerMetadata listenerMetadata, long offset);
}

huangapple
  • 本文由 发表于 2023年5月17日 08:24:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76267842.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定