英文:
Unable to Consume old messages in the kafka
问题
以下是您要翻译的内容:
"The Producer in Kafka is producing messages, but the consumer is down after consuming the 5th message. After some time the consumer is up, and I want to consume the 6th message, instead of the latest message.
How can I do this?
Producer.java
public class kafkaProducerRoute extends RouteBuilder{
@Override
public void configure() throws Exception {
from("timer:time?")
// loop over the route 10 times
.setBody().constant("Hello, World!") // set the message body to "Hello, World!"
.to("kafka:hello1?brokers=localhost:9092") // send the message to a Kafka topic using the Kafka producer endpoint
.log("Message is produced");
}
}
Consumer.java
from("kafka:c1?brokers=localhost:9092")
.log("Received message with offset: ${header." + KafkaConstants.OFFSET + "} - ${body}");
希望这对您有所帮助。如果您有任何其他问题,请随时提出。
英文:
The Producer in Kafka is producing messages, but the consumer is down after consuming the 5th message. After some time the consumer is up, and I want to consume the 6th message, instead of the latest message.
How can I do this?
Producer.java
public class kafkaProducerRoute extends RouteBuilder{
@Override
public void configure() throws Exception {
from("timer:time?")
// loop over the route 10 times
.setBody().constant("Hello, World!") // set the message body to "Hello, World!"
.to("kafka:hello1?brokers=localhost:9092") // send the message to a Kafka topic using the Kafka producer endpoint
.log("Message is produced");
}
}
Consumer.java
from("kafka:c1?brokers=localhost:9092")
.log("Received message with offset: ${header." + KafkaConstants.OFFSET + "} - ${body}");
答案1
得分: 2
当消费者组再次启动时,如果它使用相同的旧group_id,那么它应该从上次停止的位置继续。如果不是同一个group_id,那么根据auto.offset.reset = latest
的默认设置,它将始终从主题的最新偏移量开始。
详细信息请参考:https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset
英文:
when the consumer group comes up again, does it come up with the same old group_id, if yes, then it should continue from where it left, if not, then this is normal as per the default setting for auto.offset.reset = latest
will always start from the latest offset of the topic
https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论