无法消费旧的Kafka消息

huangapple go评论53阅读模式
英文:

Unable to Consume old messages in the kafka

问题

以下是您要翻译的内容:

"The Producer in Kafka is producing messages, but the consumer is down after consuming the 5th message. After some time the consumer is up, and I want to consume the 6th message, instead of the latest message.

How can I do this?

Producer.java

public class kafkaProducerRoute extends RouteBuilder{
@Override
public void configure() throws Exception {

   from("timer:time?")
            // loop over the route 10 times
            .setBody().constant("Hello, World!") // set the message body to "Hello, World!"
            .to("kafka:hello1?brokers=localhost:9092")  // send the message to a Kafka topic using the Kafka producer endpoint
            .log("Message is produced");
}
}

Consumer.java

from("kafka:c1?brokers=localhost:9092")
        .log("Received message with offset: ${header." + KafkaConstants.OFFSET + "} - ${body}");

希望这对您有所帮助。如果您有任何其他问题,请随时提出。

英文:

The Producer in Kafka is producing messages, but the consumer is down after consuming the 5th message. After some time the consumer is up, and I want to consume the 6th message, instead of the latest message.

How can I do this?

Producer.java

public class kafkaProducerRoute extends RouteBuilder{
@Override
public void configure() throws Exception {


   from("timer:time?")
            // loop over the route 10 times
            .setBody().constant("Hello, World!") // set the message body to "Hello, World!"
            .to("kafka:hello1?brokers=localhost:9092")  // send the message to a Kafka topic using the Kafka producer endpoint
            .log("Message is produced");



}

}

Consumer.java

    from("kafka:c1?brokers=localhost:9092")

            .log("Received message with offset: ${header." + KafkaConstants.OFFSET + "} - ${body}");

答案1

得分: 2

当消费者组再次启动时,如果它使用相同的旧group_id,那么它应该从上次停止的位置继续。如果不是同一个group_id,那么根据auto.offset.reset = latest的默认设置,它将始终从主题的最新偏移量开始。

详细信息请参考:https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset

英文:

when the consumer group comes up again, does it come up with the same old group_id, if yes, then it should continue from where it left, if not, then this is normal as per the default setting for auto.offset.reset = latest will always start from the latest offset of the topic

https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset

huangapple
  • 本文由 发表于 2023年2月27日 14:32:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/75577367.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定