英文:
Flink Parallelism with event time window
问题
I'm reading from Kafka from a topic with partition = 1, using Windows Event Time without a key. As a result, this code did not close (output) the results of the window. I twisted and figured it out for a long time. As a result, I added env.setParallelism(1);
and suddenly everything worked.
我从一个分区为1的Kafka主题中读取数据,使用窗口事件时间而没有使用键。结果,这段代码没有关闭(输出)窗口的结果。我花了很长时间才解决了这个问题。最后,我添加了env.setParallelism(1);
,突然一切正常工作。
I want to understand why this parameter is needed in my example? Why didn't the windows close in my example without this parameter?
我想了解为什么在我的示例中需要这个参数?为什么没有这个参数,窗口不会关闭?
- Also in the documentation I found that windows without keys always have a concurrency of 1.
1)此外,在文档中,我发现没有键的窗口始终具有并发度为1。
- I also want to add that with
TumblingProcessingTimeWindows
, everything works perfectly regardless of the parameterenv.setParallelism(1);
2)我还想补充一点,使用TumblingProcessingTimeWindows
,一切都完美运行,无论参数env.setParallelism(1);
如何设置。
英文:
I'm reading from Kafka from a topic with partition = 1, using Windows Event Time without a key. As a result, this code did not close (output) the results of the window. I twisted and figured it out for a long time. As a result, I added env.setParallelism(1);
and suddenly everything worked.
I want to understand why this parameter is needed in my example? Why didn't the windows close in my example without this parameter?
- Also in the documentation I found that windows without keys always have a concurrency of 1.
- I also want to add that with
TumblingProcessingTimeWindows
, everything works perfectly regardless of the parameterenv.setParallelism(1);
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KafkaSource<UserModel> source = KafkaSource.<UserModel>builder().setBootstrapServers(kafka).setTopics("f1").setGroupId("flink_group").setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new JsonConverter()).build();
DataStream<UserModel> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((i, timestamp) -> {
return i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
});
SingleOutputStreamOperator<UserModelEx> reduce = ds.assignTimestampsAndWatermarks(strategy)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
System.out.println(acc.dt + " reduce:" + acc.count);
return acc;
}, new Rich());
reduce.print();
答案1
得分: 1
I can't think of a reason why changing the parallelism would cause your window trigger to start working. I'm guessing it's some other issue with your code.
But in looking at your code, this line isn't good:
DataStream<UserModel> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Instead of passing in the noWatermarks()
strategy you should be providing the strategy
you define just after that line.
Also the acc.dt = i.dt
line is only valid if you have a single input partition to your reduce, and you know that events in the Kafka topic are ordered by time:
.reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
System.out.println(acc.dt + " reduce:" + acc.count);
return acc;
Otherwise you're setting the accumulator time based on some random element in your window.
But if that's true, then you can and should use the built-in forMonotonousTimestamps
watermark strategy (instead of forBoundedOutOfOrderness
), which will give you the most accurate watermark with no latency.
英文:
I can't think of a reason why changing the parallelism would cause your window trigger to start working. I'm guessing it's some other issue with your code.
But in looking at your code, this line isn't good:
DataStream<UserModel> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Instead of passing in the noWatermarks()
strategy you should be providing the strategy
you define just after that line.
Also the acc.dt = i.dt
line is only valid if you have a single input partition to your reduce, and you know that events in the Kafka topic are ordered by time:
.reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
System.out.println(acc.dt + " reduce:" + acc.count);
return acc;
Otherwise you're setting the accumulator time based on some random element in your window.
But if that's true, then you can and should use the built-in forMonotonousTimestamps
watermark strategy (instead of forBoundedOutOfOrderness
), which will give you the most accurate watermark with no latency.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论