“Flink并行性与事件时间窗口”

huangapple go评论46阅读模式
英文:

Flink Parallelism with event time window

问题

I'm reading from Kafka from a topic with partition = 1, using Windows Event Time without a key. As a result, this code did not close (output) the results of the window. I twisted and figured it out for a long time. As a result, I added env.setParallelism(1); and suddenly everything worked.

我从一个分区为1的Kafka主题中读取数据,使用窗口事件时间而没有使用键。结果,这段代码没有关闭(输出)窗口的结果。我花了很长时间才解决了这个问题。最后,我添加了env.setParallelism(1);,突然一切正常工作。

I want to understand why this parameter is needed in my example? Why didn't the windows close in my example without this parameter?

我想了解为什么在我的示例中需要这个参数?为什么没有这个参数,窗口不会关闭?

  1. Also in the documentation I found that windows without keys always have a concurrency of 1.

1)此外,在文档中,我发现没有键的窗口始终具有并发度为1。

  1. I also want to add that with TumblingProcessingTimeWindows, everything works perfectly regardless of the parameter env.setParallelism(1);

2)我还想补充一点,使用TumblingProcessingTimeWindows,一切都完美运行,无论参数env.setParallelism(1);如何设置。

英文:

I'm reading from Kafka from a topic with partition = 1, using Windows Event Time without a key. As a result, this code did not close (output) the results of the window. I twisted and figured it out for a long time. As a result, I added env.setParallelism(1); and suddenly everything worked.

I want to understand why this parameter is needed in my example? Why didn't the windows close in my example without this parameter?

  1. Also in the documentation I found that windows without keys always have a concurrency of 1.
  2. I also want to add that with TumblingProcessingTimeWindows, everything works perfectly regardless of the parameter env.setParallelism(1);

-

   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            KafkaSource<UserModel> source = KafkaSource.<UserModel>builder().setBootstrapServers(kafka).setTopics("f1").setGroupId("flink_group").setStartingOffsets(OffsetsInitializer.earliest())
                    .setValueOnlyDeserializer(new JsonConverter()).build();
    
            DataStream<UserModel> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    
     WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                    .withTimestampAssigner((i, timestamp) -> {
                        return i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
                    });
    
            SingleOutputStreamOperator<UserModelEx> reduce = ds.assignTimestampsAndWatermarks(strategy)
                    .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .reduce((acc, i) -> {
                        acc.count += i.count;
                        acc.dt = i.dt;
                        System.out.println(acc.dt + " reduce:" + acc.count);
                        return acc;
                    }, new Rich());
    
    
            reduce.print();

答案1

得分: 1

I can't think of a reason why changing the parallelism would cause your window trigger to start working. I'm guessing it's some other issue with your code.

But in looking at your code, this line isn't good:

DataStream<UserModel> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

Instead of passing in the noWatermarks() strategy you should be providing the strategy you define just after that line.

Also the acc.dt = i.dt line is only valid if you have a single input partition to your reduce, and you know that events in the Kafka topic are ordered by time:

                    .reduce((acc, i) -> {
                        acc.count += i.count;
                        acc.dt = i.dt;
                        System.out.println(acc.dt + " reduce:" + acc.count);
                        return acc;

Otherwise you're setting the accumulator time based on some random element in your window.

But if that's true, then you can and should use the built-in forMonotonousTimestamps watermark strategy (instead of forBoundedOutOfOrderness), which will give you the most accurate watermark with no latency.

英文:

I can't think of a reason why changing the parallelism would cause your window trigger to start working. I'm guessing it's some other issue with your code.

But in looking at your code, this line isn't good:

DataStream&lt;UserModel&gt; ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), &quot;Kafka Source&quot;);

Instead of passing in the noWatermarks() strategy you should be providing the strategy you define just after that line.

Also the acc.dt = i.dt line is only valid if you have a single input partition to your reduce, and you know that events in the Kafka topic are ordered by time:

                    .reduce((acc, i) -&gt; {
                        acc.count += i.count;
                        acc.dt = i.dt;
                        System.out.println(acc.dt + &quot; reduce:&quot; + acc.count);
                        return acc;

Otherwise you're setting the accumulator time based on some random element in your window.

But if that's true, then you can and should use the built-in forMonotonousTimestamps watermark strategy (instead of forBoundedOutOfOrderness), which will give you the most accurate watermark with no latency.

huangapple
  • 本文由 发表于 2023年4月20日 05:05:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76058798.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定