Flink Parallelism with event time window
I'm reading from Kafka from a topic with partition = 1, using Windows Event Time without a key. As a result, this code did not close (output) the results of the window. I twisted and figured it out for a long time. As a result, I added env.setParallelism(1);
and suddenly everything worked.
I want to understand why this parameter is needed in my example? Why didn't the windows close in my example without this parameter?
- Also in the documentation I found that windows without keys always have a concurrency of 1.
- I also want to add that with
, everything works perfectly regardless of the parameterenv.setParallelism(1);
I'm reading from Kafka from a topic with partition = 1, using Windows Event Time without a key. As a result, this code did not close (output) the results of the window. I twisted and figured it out for a long time. As a result, I added env.setParallelism(1);
and suddenly everything worked.
I want to understand why this parameter is needed in my example? Why didn't the windows close in my example without this parameter?
- Also in the documentation I found that windows without keys always have a concurrency of 1.
- I also want to add that with
, everything works perfectly regardless of the parameterenv.setParallelism(1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<UserModel> source = KafkaSource.<UserModel>builder().setBootstrapServers(kafka).setTopics("f1").setGroupId("flink_group").setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new JsonConverter()).build();
DataStream<UserModel> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((i, timestamp) -> {
return i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
SingleOutputStreamOperator<UserModelEx> reduce = ds.assignTimestampsAndWatermarks(strategy)
.reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
System.out.println(acc.dt + " reduce:" + acc.count);
return acc;
}, new Rich());
得分: 1
I can't think of a reason why changing the parallelism would cause your window trigger to start working. I'm guessing it's some other issue with your code.
But in looking at your code, this line isn't good:
DataStream<UserModel> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Instead of passing in the noWatermarks()
strategy you should be providing the strategy
you define just after that line.
Also the acc.dt = i.dt
line is only valid if you have a single input partition to your reduce, and you know that events in the Kafka topic are ordered by time:
.reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
System.out.println(acc.dt + " reduce:" + acc.count);
return acc;
Otherwise you're setting the accumulator time based on some random element in your window.
But if that's true, then you can and should use the built-in forMonotonousTimestamps
watermark strategy (instead of forBoundedOutOfOrderness
), which will give you the most accurate watermark with no latency.
I can't think of a reason why changing the parallelism would cause your window trigger to start working. I'm guessing it's some other issue with your code.
But in looking at your code, this line isn't good:
DataStream<UserModel> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Instead of passing in the noWatermarks()
strategy you should be providing the strategy
you define just after that line.
Also the acc.dt = i.dt
line is only valid if you have a single input partition to your reduce, and you know that events in the Kafka topic are ordered by time:
.reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
System.out.println(acc.dt + " reduce:" + acc.count);
return acc;
Otherwise you're setting the accumulator time based on some random element in your window.
But if that's true, then you can and should use the built-in forMonotonousTimestamps
watermark strategy (instead of forBoundedOutOfOrderness
), which will give you the most accurate watermark with no latency.