如何使用`consumerseekaware`接口以从特定的Kafka偏移位置获取Kafka消息。

huangapple go评论134阅读模式
英文:

how to use consumerseekaware interface in order to fetch kafka message from a paticluar kafka offset

问题

Here's the translated code portion you requested:

我有一个情景我必须使用ConsumerSeekAware接口来从特定的Kafka偏移量中获取Kafka消息

目前我的Kafka监听器配置如下

public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@EventListener(ApplicationReadyEvent.class)
public void kafkaListenerContainerFactory() {

ConcurrentMessageListenerContainer<String, String> factory = new ConcurrentMessageListenerContainer<>(consumerFactory(), new ContainerProperties("mytopic"));

    factory.setConcurrency(3);
    factory.setAutoStartup(true);
    factory.setupMessageListener(listenKafkaMessage());
    factory.start();
}

MessageListener<String, String> listenKafkaMessage(){
  return this::consumeCustomMessage;
}

void consumeCustomMessage(ConsumerRecord<String, String> message){
  System.out.println("Message Received : "+ message.value())
}

If you have any specific questions or need further assistance with implementing the ConsumerSeekAware interface, please let me know.

英文:

I have a scenario where I have to make use of consumerseekaware interface in order to fetch kafka message from a particular kafka offset.

Currently my kafka listener is configured like below:

public ConsumerFactory&lt;String, String&gt; consumerFactory() {
    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, &quot;group-id&quot;);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory&lt;&gt;(props);
}

@EventListener(ApplicationReadyEvent.class)
public void kafkaListenerContainerFactory() {

ConcurrentMessageListenerContainer&lt;String, String&gt; factory = new ConcurrentMessageListenerContainer&lt;&gt;(consumerFactory(), new ContainerProperties(&quot;mytopic&quot;));
    
    factory.setConcurrency(3);
    factory.setAutoStartup(true);
    factory.setupMessageListener(listenKafkaMessage());
    factory.start();
}

MessageListener&lt;String, String&gt; listenKafkaMessage(){
  return this::consumeCustomMessage;
}

void consumeCustomMessage(ConsumerRecord&lt;String, String&gt; message){
  System.out.println(&quot;Message Received : &quot;+ message.value())
}

wanted to check implementation of consumerseekaware and where can I register/use it in existing code.

答案1

得分: 1

以下是代码的中文翻译部分:

你可以强制消费者从主题的分区中的特定偏移量开始阅读。如果你希望每次应用程序启动时都从特定偏移量开始阅读,可以使用ConsumerSeekAware接口,如下所示:

@Component
public class MyKafkaListener implements ConsumerSeekAware {

    @KafkaListener(id = "myListener", topics = "mytopic")
    public void listen(ConsumerRecord<String, String> record) {
        System.out.println("Message Received : " + record.value());
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        // 当监听器启动并分配分区时将执行此方法
        // 您可以在此处设置特定的偏移量
        // 例如,要从主题'mytopic'的分区0的偏移量123开始:
        callback.seek("mytopic", 0, 123);
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }
}

在此示例中,每次启动监听器并将mytopic主题的第0个分区分配给它时,监听器将从偏移量123开始读取。请记住,这将在每次启动应用程序或将分配新分区时发生。

请确保监听器不会自动提交偏移量。您可以将ENABLE_AUTO_COMMIT_CONFIG属性设置为false以防止这种情况:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

您还需要确保监听器容器的AckMode设置为MANUAL_IMMEDIATE或MANUAL,以便您的监听器代码必须手动确认已处理消息:

ContainerProperties containerProperties = new ContainerProperties("mytopic");
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);

最后,我想指出上面的示例非常基本。在实际应用中,您可能需要存储您的消费者已处理的偏移量,以便在应用程序重新启动时可以从正确位置开始。请根据您的需求调整实现。

英文:

you can force a consumer to start reading from a specific offset in a topic's partition. If you want to start reading from a specific offset every time your application starts, you would use the ConsumerSeekAware interface as follows:

@Component
public class MyKafkaListener implements ConsumerSeekAware {

    @KafkaListener(id = &quot;myListener&quot;, topics = &quot;mytopic&quot;)
    public void listen(ConsumerRecord&lt;String, String&gt; record) {
        System.out.println(&quot;Message Received : &quot;+ record.value());
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
    }

    @Override
    public void onPartitionsAssigned(Map&lt;TopicPartition, Long&gt; assignments, ConsumerSeekCallback callback) {
        // this will run when the listener is started and assigned a partition
        // you can seek to a specific offset here
        // for example, to start from offset 123 on partition 0 of topic &#39;mytopic&#39;:
        callback.seek(&quot;mytopic&quot;, 0, 123);
    }

    @Override
    public void onIdleContainer(Map&lt;TopicPartition, Long&gt; assignments, ConsumerSeekCallback callback) {
    }
}

In this example, every time the listener is started and the mytopic topic's 0th partition is assigned to it, the listener will start reading from offset 123. Remember that this will happen every time your application is started or the listener is assigned a new partition.

Remember to ensure that the listener does not commit offsets automatically. You can set the ENABLE_AUTO_COMMIT_CONFIG property to false to prevent this:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

You'll also need to ensure that the AckMode of the listener container is set to MANUAL_IMMEDIATE or MANUAL, so that your listener code has to manually acknowledge that it has processed a message:

ContainerProperties containerProperties = new ContainerProperties(&quot;mytopic&quot;);
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);

Finally, I'd like to note that the example above is quite basic. In a real world scenario, you may need to store the offsets your consumer has processed so that you can start from the right position if your application is restarted. Please adjust the implementation according to your needs.

EDIT based on Gary Russell's suggestions:

  1. Extending AbstractConsumerSeekAware: Instead of directly implementing ConsumerSeekAware, consider extending AbstractConsumerSeekAware. This class provides some convenient default methods, making your implementation cleaner.

Change your class definition to:

@Component
public class MyKafkaListener extends AbstractConsumerSeekAware {
    // ... rest of your methods
}
  1. Spring Bean Declaration: Ensure that your ConsumerFactory and ConcurrentMessageListenerContainer are managed by the Spring container. Declare them as @Beans:
@Bean
public ConsumerFactory&lt;String, String&gt; consumerFactory() {
    // ... your existing code
}

@Bean
public ConcurrentMessageListenerContainer&lt;String, String&gt; kafkaListenerContainerFactory() {
    // ... your existing code
}

huangapple
  • 本文由 发表于 2023年8月5日 06:18:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/76839377.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定