Is there a way to read messages using Kafka stream(not via KafkaConsumer) from beginning everytime in java?
- 每个源表的CDC以Avro格式发送到相应的主题(使用Kafka Schema Registry和Kafka Server)。
- 我们正在编写Java代码来消耗Avro模式的消息,使用AvroSerde进行反序列化,然后将它们连接并发送到不同的主题,以便外部系统可以消费。
我们尝试使用kafka-consumer-groups reset-topic和--to-earliest来重置偏移量,但这只会将偏移量设置到最接近的偏移量。当我们尝试手动重置偏移量为"0"并使用--to-offset参数时,会收到以下警告,但偏移量不会设置为"0"。我的理解是,将其设置为0应该从头开始读取消息。如果我理解错误,请纠正我。
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put("schema.registry.url", SCHEMA_REGISTRY_URL);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
StreamsBuilder builder = new StreamsBuilder();
// 在某些已经设置了偏移量的情况下,此处不返回任何内容
KStream myStream = builder.stream("my-topic-in-avro-schema", ConsumedWith(myKeySerde, myValueSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
We are creating a POC to read database CDC and push it to external systems.
- each source table CDC are sent to respective topics in Avro format(with Kafka Schema Registry and Kafka Server)
- We are writing java code to consume the messages in avro schema,de-serialize it using AvroSerde and join them and then send to different topics so that it can be consumed by external systems.
We have a limitation though that we cannot produce messages to source table topics to send/receive new contents/changes. So only way to write join code is to read messages from beginning everytime from every source topic when we run the application.(until we have confident that code is working and can start receiving live data again)
In KafkaConsumer object we have an option to use seekToBeginning method to force reading from beginning in jave code, which works. However there are no option when we try to stream topic using KStream object and force to read it from beginning. What are the alternatives here?
We tried to reset the offset using kafka-consumer-groups reset-topic with --to-earliest but that sets the offset only to the nearest . When we try to reset offset manually with "0" with --to-offset parameter we get below warning but does not set to "0". my understanding is, setting to 0 should read messages from beginning. correct me if I am wrong.
"WARN New offset (0) is lower than earliest offset for topic partition"
Sample code below
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put("schema.registry.url", SCHEMA_REGISTRY_URL);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
StreamsBuilder builder = new StreamsBuilder();
//nothing returned here, when some offset has already been set
KStream myStream = builder.stream("my-topic-in-avro-schema",ConsumedWith(myKeySerde,myValueSerde));
KafkaStreams streams = new KafkaStreams(builder.build(),properties);
得分: 1
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID + currentTimestamp);
One way to do this would be to generate a random ConsumerGroup every time you start the stream application. Something like:
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID + currentTimestamp);
That way, the stream will start reading from "earliest" as you have set it already in auto.offset.reset
By the way, you are setting the properties for group.id
twice in your code...
得分: 0
It will help someone who is also facing same issue. Replace Application Id and Group Id with some unique identifier using UUID.randomId.toString() in the configuration property. It should fetch the messages from beginning