英文:
How to log offset in KStreams Bean using spring-kafka and kafka-streams
问题
我已经几乎参考了关于如何通过Processor API的transform()或process()方法记录KStreams上的偏移量的所有问题,就像在这里的许多问题中提到的那样 -
但是我无法从这些答案中获得解决方案,所以我正在提出这个问题。
我想在每次流处理程序消费消息时记录分区、消费者组ID和偏移量,我不知道如何将process()或transform()方法与ProcessorContext API集成?如果我在CustomParser类中实现Processor接口,那么我必须实现所有方法,但我不确定那是否起作用,就像在confluent的记录元数据文档中所提到的那样 - https://docs.confluent.io/current/streams/developer-guide/processor-api.html#streams-developer-guide-processor-api
我已经设置了一个类似下面的spring-boot应用中的KStreams(供参考,已更改变量名)
@Bean
public Set<KafkaStreams> myKStreamJson(StreamsBuilder profileBuilder) {
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final KStream<String, JsonNode> pStream = myBuilder.stream(inputTopic, Consumed.with(Serdes.String(), jsonSerde));
Properties props = streamsConfig.kStreamsConfigs().asProperties();
pstream
.map((key, value) -> {
try {
return CustomParser.parse(key, value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return new KeyValue<>(null, null);
}
)
.filter((key, value) -> {
try {
return MessageFilter.filterNonNull(key, value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return false;
})
.through(
outputTopic,
Produced.with(Serdes.String(), new JsonPOJOSerde<>(TransformedMessage.class)));
return Sets.newHashSet(
new KafkaStreams(profileBuilder.build(), props)
);
}
英文:
I have referred almost all the questions regarding logging offset on KStreams via Processor API's transform() or process() method like mentioned in many questions here -
But Im not able to get the solution these answers so I'm asking this question.
I want to log the partition, consumer-group-id and offset each time the message is consumed by the stream, I'm not getting how to integrate process() or transform() method with the ProcessorContext API? And if I'm implementing Processor interface in my CustomParser class then I would have to implement all the methods but Im not sure if that will work, like it is mentioned in the confluent docs for Record Meta Data - https://docs.confluent.io/current/streams/developer-guide/processor-api.html#streams-developer-guide-processor-api
I've set up KStreams in a spring-boot application like below (for reference have change the variable names)
@Bean
public Set<KafkaStreams> myKStreamJson(StreamsBuilder profileBuilder) {
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final KStream<String, JsonNode> pStream = myBuilder.stream(inputTopic, Consumed.with(Serdes.String(), jsonSerde));
Properties props = streamsConfig.kStreamsConfigs().asProperties();
pstream
.map((key, value) -> {
try {
return CustomParser.parse(key, value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return new KeyValue<>(null, null);
}
)
.filter((key, value) -> {
try {
return MessageFilter.filterNonNull(key, value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return false;
})
.through(
outputTopic,
Produced.with(Serdes.String(), new JsonPOJOSerde<>(TransformedMessage.class)));
return Sets.newHashSet(
new KafkaStreams(profileBuilder.build(), props)
);
}
答案1
得分: 2
实现 Transformer
;在 init()
中保存 ProcessorContext
;然后可以在 transform()
中访问记录元数据并简单地返回原始的键/值。
这里有一个Transformer的示例。它由Spring为Apache Kafka提供,以调用Spring集成流来转换键/值。
英文:
Implement Transformer
; save off the ProcessorContext
in init()
; you can then access the record metadata in transform()
and simply return the original key/value.
Here is an example of a Transformer. It is provided by Spring for Apache Kafka to invoke a Spring Integration flow to transform the key/value.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论