英文:
How I receive the last windowed Kafka message in a windowedBy+aggregate when producer stops to send messages in Java/Spring?
问题
就像标题中说的,我想在生产者停止发送消息时接收最后的windowedBy
消息。目前我正在手动处理,但首先,简要描述一下:
我有一个从文件中读取行的Kafka生产者(每行是不同的JSON),每读取一行就会以500毫秒的时间间隔发送到Kafka。我只有120行(或JSON)。
我有一个消费者消费生产者发送的所有JSON。代码如下:
final KStream<String, Aggregate> transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde));
// Topology
transactions
.groupBy(this::groupedByTimeStampAndProtocolName)
.windowedBy(TimeWindows
.of(Duration.ofSeconds(10))
.grace(Duration.ofMillis(0)))
.aggregate(
tool::emptyAggregate,
this::processNewRecord, //new TransactionAggregator(),
Materialized.<String, Aggregate, WindowStore<Bytes, byte[]>>as(TRANSACTION_AGGREGATE)
.withKeySerde(Serdes.String())
.withValueSerde(aggregateSerde)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach(sendAggregatesToCassandra);
我有预期的功能,也就是说,它接收所有记录,但是要接收最后的窗口化消息,我必须手动发送记录。
关于这个问题有两个问题:
- 有没有办法自动处理最后一个窗口?当生产者发送最后一条记录(第120个JSON)时,生产者将不再发送更多记录。我是否应该等待一段时间或其他方式都可以。
- 我已经看到我必须发送3条记录来处理最后一个窗口。我不太清楚为什么必须发送3条记录(如果发送的记录少于3条,最后一个窗口不会被完全消费)。是否有办法只发送一条记录?更改缓冲区?更改某些属性?
我正在使用使用Spring的Kafka Streams,在JDK 11中工作,并且我正在使用Docker化的Kafka:
- confluentinc/cp-kafka:5.5.1
- zookeeper:3.4.14
- Kafka版本:
<version.kafka>2.5.0</version.kafka>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>
在消费Kafka时使用的属性为:
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getAppId() + Constants.APP_ID);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
在生产者端:
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "all");
请问,你能帮助我吗?
英文:
Like I say in the title I want to receive the last windowedBy messages when the producer stops to send menssages. At the moment I am doing it manually, but first of all, a little description.
I have a Kafka producer that is reading lines from a file (every line is a different jSon) every read line is send to Kafka with a difference of 500 ms time period. I have only 120 lines (or jSons).
I have a consumer that consumes all the jSons sent by the producer. The code:
final KStream<String, Aggregate> transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde));
// Topology
transactions
.groupBy(this::groupedByTimeStampAndProtocolName)
.windowedBy( TimeWindows
.of( Duration.ofSeconds( 10 ))
.grace( Duration.ofMillis( 0 )))
.aggregate(
tool::emptyAggregate,
this::processNewRecord, //new TransactionAggregator(),
Materialized.<String, Aggregate, WindowStore<Bytes, byte[]>>as(TRANSACTION_AGGREGATE)
.withKeySerde(Serdes.String())
.withValueSerde(aggregateSerde)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach(sendAggregatesToCassandra);
I have the expected functionality, I mean, it receives all the records but to receive the last windowed messages I must to send manually records.
Two questions about this:
- Is there any way to auto process the last window? When the producer sends the last record (the 120th jSon) the producer won't send more records anymore. It doesn't matter if I should wait time or whatever.
- I have saw that I must send 3 records to process the last window. It isn't clear for me why I must send 3 records (if I send < 3 records the last window isn't consumed completely). Is there any way to only send one record? Change the buffer? Change some property?
I am using Kafka Streams (with spring) in JDK 11 and I am working with dockerized Kafka:
- confluentinc/cp-kafka:5.5.1
- zookeeper:3.4.14
- Kafka:
<version.kafka>2.5.0</version.kafka>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>
The properties used in the Kafka consumed are:
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getAppId()+Constants.APP_ID);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
And in the producer side:
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "all");
Please, could you help me?
答案1
得分: 2
使用suppress()
(使用untilWindowCloses
配置)时,仅当“流时间”前进时,操作符才会发出最终结果。 "流时间"是根据记录时间戳计算的函数,因此,如果没有处理任何记录,"流时间"将前进,suppress()
将不会发出任何内容。因此,发送更多的记录是推进"流时间"的唯一方法。
> 注意:对于流式处理的用例,假设数据永不停止,因此对于实际部署来说这不是一个问题 - 正如您所做的从文件中读取,不是一个真正的流处理用例:我假设您从文件中读取用于测试,针对这种情况,您的输入文件应包含更多的记录以相应地推进流时间。
有关更多详细信息,请查阅此博客文章:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/
我还在Kafka Summit上就这个主题做了一个演讲:https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/
英文:
As you are using suppress()
(with untilWindowCloses
config) the operator will only emit a final result if "stream-time" advances. "stream-time" is computed as a function over the record timestamps and thus, if you no records are processed, "stream-time" would advance and suppress()
would never emit anything. Thus, sending more record is the only way how "stream-time" can be advance.
> Note: for a streaming use case, the assumption is that data never stops and thus it's not a issue for an actual deployment -- reading from a file as you do, is not a real stream processing use case: I assume you read from a file for a test, and for this case, your input file should contain a few more record to advance stream-time accordingly.
For more details, check out this blog post: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/
I also did a Kafka Summit talk about this topic: https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论