英文:
Is there a way to read messages using Kafka stream(not via KafkaConsumer) from beginning everytime in java?
问题
我们正在创建一个POC来读取数据库CDC并将其推送到外部系统。
- 每个源表的CDC以Avro格式发送到相应的主题(使用Kafka Schema Registry和Kafka Server)。
- 我们正在编写Java代码来消耗Avro模式的消息,使用AvroSerde进行反序列化,然后将它们连接并发送到不同的主题,以便外部系统可以消费。
不过,我们有一个限制,即无法将消息发送到源表主题以发送/接收新内容/更改。因此,唯一的编写连接代码的方法是在运行应用程序时每次从每个源主题读取消息(直到我们有信心代码正常运行并可以重新开始接收实时数据)。
在KafkaConsumer对象中,我们有一个选项可以使用seekToBeginning方法来强制从开头读取Java代码,这是有效的。但是,当我们尝试使用KStream对象流式传输主题并强制从开头读取时,没有选项。在这里有什么替代方法吗?
我们尝试使用kafka-consumer-groups reset-topic和--to-earliest来重置偏移量,但这只会将偏移量设置到最接近的偏移量。当我们尝试手动重置偏移量为"0"并使用--to-offset参数时,会收到以下警告,但偏移量不会设置为"0"。我的理解是,将其设置为0应该从头开始读取消息。如果我理解错误,请纠正我。
示例代码如下:
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put("schema.registry.url", SCHEMA_REGISTRY_URL);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
StreamsBuilder builder = new StreamsBuilder();
// 在某些已经设置了偏移量的情况下,此处不返回任何内容
KStream myStream = builder.stream("my-topic-in-avro-schema", ConsumedWith(myKeySerde, myValueSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
英文:
We are creating a POC to read database CDC and push it to external systems.
- each source table CDC are sent to respective topics in Avro format(with Kafka Schema Registry and Kafka Server)
- We are writing java code to consume the messages in avro schema,de-serialize it using AvroSerde and join them and then send to different topics so that it can be consumed by external systems.
We have a limitation though that we cannot produce messages to source table topics to send/receive new contents/changes. So only way to write join code is to read messages from beginning everytime from every source topic when we run the application.(until we have confident that code is working and can start receiving live data again)
In KafkaConsumer object we have an option to use seekToBeginning method to force reading from beginning in jave code, which works. However there are no option when we try to stream topic using KStream object and force to read it from beginning. What are the alternatives here?
We tried to reset the offset using kafka-consumer-groups reset-topic with --to-earliest but that sets the offset only to the nearest . When we try to reset offset manually with "0" with --to-offset parameter we get below warning but does not set to "0". my understanding is, setting to 0 should read messages from beginning. correct me if I am wrong.
"WARN New offset (0) is lower than earliest offset for topic partition"
Sample code below
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put("schema.registry.url", SCHEMA_REGISTRY_URL);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
StreamsBuilder builder = new StreamsBuilder();
//nothing returned here, when some offset has already been set
KStream myStream = builder.stream("my-topic-in-avro-schema",ConsumedWith(myKeySerde,myValueSerde));
KafkaStreams streams = new KafkaStreams(builder.build(),properties);
streams.start();
答案1
得分: 1
生成随机的ConsumerGroup的一种方法是在每次启动流应用程序时生成一个随机的ConsumerGroup,类似于:
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID + currentTimestamp);
这样,流将从“earliest”开始读取,因为您已经在auto.offset.reset
中设置了它。
顺便说一下,在您的代码中您设置了两次group.id
的属性...
英文:
One way to do this would be to generate a random ConsumerGroup every time you start the stream application. Something like:
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID + currentTimestamp);
That way, the stream will start reading from "earliest" as you have set it already in auto.offset.reset
.
By the way, you are setting the properties for group.id
twice in your code...
答案2
得分: 0
用UUID.randomId.toString()替换配置属性中的应用程序ID和组ID,这将帮助面临相同问题的人。应该从开始获取消息。
英文:
It will help someone who is also facing same issue. Replace Application Id and Group Id with some unique identifier using UUID.randomId.toString() in the configuration property. It should fetch the messages from beginning
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论