如何在使用spring-kafka和kafka-streams时在KStreams Bean中记录偏移量。

huangapple go评论76阅读模式
英文:

How to log offset in KStreams Bean using spring-kafka and kafka-streams

问题

我已经几乎参考了关于如何通过Processor API的transform()或process()方法记录KStreams上的偏移量的所有问题,就像在这里的许多问题中提到的那样 -

https://stackoverflow.com/questions/40807346/how-can-i-get-the-offset-value-in-kstream?noredirect=1&lq=1

但是我无法从这些答案中获得解决方案,所以我正在提出这个问题。

我想在每次流处理程序消费消息时记录分区、消费者组ID和偏移量,我不知道如何将process()或transform()方法与ProcessorContext API集成?如果我在CustomParser类中实现Processor接口,那么我必须实现所有方法,但我不确定那是否起作用,就像在confluent的记录元数据文档中所提到的那样 - https://docs.confluent.io/current/streams/developer-guide/processor-api.html#streams-developer-guide-processor-api

我已经设置了一个类似下面的spring-boot应用中的KStreams(供参考,已更改变量名)

 @Bean
    public Set<KafkaStreams> myKStreamJson(StreamsBuilder profileBuilder) {
        Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        final KStream<String, JsonNode> pStream = myBuilder.stream(inputTopic, Consumed.with(Serdes.String(), jsonSerde));

        Properties props = streamsConfig.kStreamsConfigs().asProperties();
       

        pstream
                .map((key, value) -> {
                            try {
                                return CustomParser.parse(key, value);
                            } catch (Exception e) {
                                LOGGER.error("Error occurred - " + e.getMessage());
                            }
                            return new KeyValue<>(null, null);
                        }
                )
                .filter((key, value) -> {
                    try {
                        return MessageFilter.filterNonNull(key, value);
                    } catch (Exception e) {
                        LOGGER.error("Error occurred - " + e.getMessage());
                    }
                    return false;
                })
                .through(
                        outputTopic,
                        Produced.with(Serdes.String(), new JsonPOJOSerde<>(TransformedMessage.class)));

        return Sets.newHashSet(
                new KafkaStreams(profileBuilder.build(), props)
        );
    }
英文:

I have referred almost all the questions regarding logging offset on KStreams via Processor API's transform() or process() method like mentioned in many questions here -

https://stackoverflow.com/questions/40807346/how-can-i-get-the-offset-value-in-kstream?noredirect=1&amp;lq=1

But Im not able to get the solution these answers so I'm asking this question.

I want to log the partition, consumer-group-id and offset each time the message is consumed by the stream, I'm not getting how to integrate process() or transform() method with the ProcessorContext API? And if I'm implementing Processor interface in my CustomParser class then I would have to implement all the methods but Im not sure if that will work, like it is mentioned in the confluent docs for Record Meta Data - https://docs.confluent.io/current/streams/developer-guide/processor-api.html#streams-developer-guide-processor-api

I've set up KStreams in a spring-boot application like below (for reference have change the variable names)

 @Bean
    public Set&lt;KafkaStreams&gt; myKStreamJson(StreamsBuilder profileBuilder) {
        Serde&lt;JsonNode&gt; jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        final KStream&lt;String, JsonNode&gt; pStream = myBuilder.stream(inputTopic, Consumed.with(Serdes.String(), jsonSerde));

        Properties props = streamsConfig.kStreamsConfigs().asProperties();
       

        pstream
                .map((key, value) -&gt; {
                            try {
                                return CustomParser.parse(key, value);
                            } catch (Exception e) {
                                LOGGER.error(&quot;Error occurred - &quot; + e.getMessage());
                            }
                            return new KeyValue&lt;&gt;(null, null);
                        }
                )
                .filter((key, value) -&gt; {
                    try {
                        return MessageFilter.filterNonNull(key, value);
                    } catch (Exception e) {
                        LOGGER.error(&quot;Error occurred - &quot; + e.getMessage());
                    }
                    return false;
                })
                .through(
                        outputTopic,
                        Produced.with(Serdes.String(), new JsonPOJOSerde&lt;&gt;(TransformedMessage.class)));

        return Sets.newHashSet(
                new KafkaStreams(profileBuilder.build(), props)
        );
    }

答案1

得分: 2

实现 Transformer;在 init() 中保存 ProcessorContext;然后可以在 transform() 中访问记录元数据并简单地返回原始的键/值。

这里有一个Transformer的示例。它由Spring为Apache Kafka提供,以调用Spring集成流来转换键/值。

英文:

Implement Transformer; save off the ProcessorContext in init(); you can then access the record metadata in transform() and simply return the original key/value.

Here is an example of a Transformer. It is provided by Spring for Apache Kafka to invoke a Spring Integration flow to transform the key/value.

huangapple
  • 本文由 发表于 2020年9月7日 17:40:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/63775013.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定