How I receive the last windowed Kafka message in a windowedBy+aggregate when producer stops to send messages in Java/Spring?

huangapple go评论57阅读模式
英文:

How I receive the last windowed Kafka message in a windowedBy+aggregate when producer stops to send messages in Java/Spring?

问题

就像标题中说的,我想在生产者停止发送消息时接收最后的windowedBy消息。目前我正在手动处理,但首先,简要描述一下:

我有一个从文件中读取行的Kafka生产者(每行是不同的JSON),每读取一行就会以500毫秒的时间间隔发送到Kafka。我只有120行(或JSON)。

我有一个消费者消费生产者发送的所有JSON。代码如下:

final KStream<String, Aggregate> transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde));

// Topology
transactions
    .groupBy(this::groupedByTimeStampAndProtocolName)
    .windowedBy(TimeWindows
        .of(Duration.ofSeconds(10))
        .grace(Duration.ofMillis(0)))
    .aggregate(
        tool::emptyAggregate,
        this::processNewRecord, //new TransactionAggregator(),
        Materialized.<String, Aggregate, WindowStore<Bytes, byte[]>>as(TRANSACTION_AGGREGATE)
            .withKeySerde(Serdes.String())
            .withValueSerde(aggregateSerde)
    )
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    .foreach(sendAggregatesToCassandra);

我有预期的功能,也就是说,它接收所有记录,但是要接收最后的窗口化消息,我必须手动发送记录。

关于这个问题有两个问题:

  1. 有没有办法自动处理最后一个窗口?当生产者发送最后一条记录(第120个JSON)时,生产者将不再发送更多记录。我是否应该等待一段时间或其他方式都可以。
  2. 我已经看到我必须发送3条记录来处理最后一个窗口。我不太清楚为什么必须发送3条记录(如果发送的记录少于3条,最后一个窗口不会被完全消费)。是否有办法只发送一条记录?更改缓冲区?更改某些属性?

我正在使用使用Spring的Kafka Streams,在JDK 11中工作,并且我正在使用Docker化的Kafka:

  • confluentinc/cp-kafka:5.5.1
  • zookeeper:3.4.14
  • Kafka版本:
<version.kafka>2.5.0</version.kafka>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>${version.kafka}</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${version.kafka}</version>
</dependency>

在消费Kafka时使用的属性为:

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getAppId() + Constants.APP_ID);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

在生产者端:

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "all");

请问,你能帮助我吗?

英文:

Like I say in the title I want to receive the last windowedBy messages when the producer stops to send menssages. At the moment I am doing it manually, but first of all, a little description.

I have a Kafka producer that is reading lines from a file (every line is a different jSon) every read line is send to Kafka with a difference of 500 ms time period. I have only 120 lines (or jSons).

I have a consumer that consumes all the jSons sent by the producer. The code:

  final KStream&lt;String, Aggregate&gt; transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde));

  // Topology
  transactions
        .groupBy(this::groupedByTimeStampAndProtocolName)
        .windowedBy( TimeWindows
                .of( Duration.ofSeconds( 10 ))
                .grace( Duration.ofMillis( 0 )))
        .aggregate(
                tool::emptyAggregate,
                this::processNewRecord, //new TransactionAggregator(),
                Materialized.&lt;String, Aggregate, WindowStore&lt;Bytes, byte[]&gt;&gt;as(TRANSACTION_AGGREGATE)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(aggregateSerde)
        )
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .foreach(sendAggregatesToCassandra);

I have the expected functionality, I mean, it receives all the records but to receive the last windowed messages I must to send manually records.

Two questions about this:

  1. Is there any way to auto process the last window? When the producer sends the last record (the 120th jSon) the producer won't send more records anymore. It doesn't matter if I should wait time or whatever.
  2. I have saw that I must send 3 records to process the last window. It isn't clear for me why I must send 3 records (if I send < 3 records the last window isn't consumed completely). Is there any way to only send one record? Change the buffer? Change some property?

I am using Kafka Streams (with spring) in JDK 11 and I am working with dockerized Kafka:

  • confluentinc/cp-kafka:5.5.1
  • zookeeper:3.4.14
  • Kafka:
            &lt;version.kafka&gt;2.5.0&lt;/version.kafka&gt;

            &lt;dependency&gt;
                &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
                &lt;artifactId&gt;kafka-streams&lt;/artifactId&gt;
                &lt;version&gt;${version.kafka}&lt;/version&gt;
            &lt;/dependency&gt;

            &lt;dependency&gt;
                &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
                &lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
                &lt;version&gt;${version.kafka}&lt;/version&gt;
            &lt;/dependency&gt;

The properties used in the Kafka consumed are:

  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getAppId()+Constants.APP_ID);
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass());
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

And in the producer side:

  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.ACKS_CONFIG, &quot;all&quot;);

Please, could you help me?

答案1

得分: 2

使用suppress()(使用untilWindowCloses配置)时,仅当“流时间”前进时,操作符才会发出最终结果。 "流时间"是根据记录时间戳计算的函数,因此,如果没有处理任何记录,"流时间"将前进,suppress()将不会发出任何内容。因此,发送更多的记录是推进"流时间"的唯一方法。

> 注意:对于流式处理的用例,假设数据永不停止,因此对于实际部署来说这不是一个问题 - 正如您所做的从文件中读取,不是一个真正的流处理用例:我假设您从文件中读取用于测试,针对这种情况,您的输入文件应包含更多的记录以相应地推进流时间。

有关更多详细信息,请查阅此博客文章:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/

我还在Kafka Summit上就这个主题做了一个演讲:https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/

英文:

As you are using suppress() (with untilWindowCloses config) the operator will only emit a final result if "stream-time" advances. "stream-time" is computed as a function over the record timestamps and thus, if you no records are processed, "stream-time" would advance and suppress() would never emit anything. Thus, sending more record is the only way how "stream-time" can be advance.

> Note: for a streaming use case, the assumption is that data never stops and thus it's not a issue for an actual deployment -- reading from a file as you do, is not a real stream processing use case: I assume you read from a file for a test, and for this case, your input file should contain a few more record to advance stream-time accordingly.

For more details, check out this blog post: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/

I also did a Kafka Summit talk about this topic: https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/

huangapple
  • 本文由 发表于 2020年8月27日 21:40:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/63617291.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定