Word Count Number is always changing, when using Flink

huangapple go评论82阅读模式
英文:

Word Count Number is always changing, when using Flink

问题

以下是翻译好的内容:

我正在尝试使用 Flink 创建一个单词计数示例这是单词数据的[链接][1]这是来自 Flink GitHub 账户的示例

当我用简单的 Java 程序计算单词数时

public static void main(String[] args) throws Exception {
    int count = 0;
    for (String eachSentence : WordCountData.WORDS){
        String[] splittedSentence = eachSentence.toLowerCase().split("\\W+");
        for (String eachWord: splittedSentence){
            count++;
        }
    }
    System.out.println(count);
    // 结果为 287
}

现在当我用 Flink 做这个操作时首先我会将句子拆分为单词

```java
DataStream<Tuple2<String, Integer>> readWordByWordStream = splitSentenceWordByWord(wordCountDataSource);

//...
public DataStream<Tuple2<String, Integer>> splitSentenceWordByWord(DataStream<String> wordDataSourceStream)
{
    DataStream<Tuple2<String, Integer>> wordByWordStream = wordDataSourceStream.flatMap(new TempTransformation());
    return wordByWordStream;
 }
  • 这是我的 TempTransformation 类:
public class TempTransformation extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception
    {
        String[] splittedSentence = input.toLowerCase().split("\\W+");
        for (String eachWord : splittedSentence)
        {
            collector.collect(new Tuple2<String, Integer>(eachWord, 1));
        }
    }
}
  • 现在,我要通过将其转换为键控流(按单词键控)来计算单词数。
public SingleOutputStreamOperator<String> keyedStreamExample(DataStream<Tuple2<String, Integer>> wordByWordStream)
{
    return wordByWordStream.keyBy(0).timeWindow(Time.milliseconds(1)).apply(new TempWindowFunction());
}
  • TempWindowFunction:
public class TempWindowFunction extends RichWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {
    private Logger logger = LoggerFactory.getLogger(TempWindowFunction.class);
    private int count = 0;
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception
    {
        logger.info("Key is:' {} ' and collected element for that key and count: {}", (Object) tuple.getField(0), count);
        StringBuilder builder = new StringBuilder();
        for (Tuple2 each : input)
        {
            String key = (String) each.getField(0);
            Integer value = (Integer) each.getField(1);
            String tupleStr = "[ " + key + " , " + value + "]";
            builder.append(tupleStr);
            count ++;
        }
        logger.info("All tuples {}", builder.toString());
        logger.info("Exit method");
        logger.info("----");
    }
}
  • 在使用 Flink 的本地环境运行此作业后,输出总是不同的,以下是一些示例:
18:09:40,086 INFO  TempWindowFunction     - Key is:' rub ' and collected element for that key and count: 86
18:09:40,086 INFO  TempWindowFunction     - All tuples [ rub , 1]
18:09:40,086 INFO  TempWindowFunction     - Exit method
18:09:40,086 INFO  TempWindowFunction     - ----
18:09:40,086 INFO  TempWindowFunction     - Key is:' for ' and collected element for that key and count: 87
18:09:40,086 INFO  TempWindowFunction     - All tuples [ for , 1]
18:09:40,086 INFO  TempWindowFunction     - Exit method
18:09:40,086 INFO  TempWindowFunction     - ----

// 另一次运行的输出:

18:36:21,660 INFO  TempWindowFunction     - Key is:' for ' and collected element for that key and count: 103
18:36:21,660 INFO  TempWindowFunction     - All tuples [ for , 1]
18:36:21,660 INFO  TempWindowFunction     - Exit method
18:36:21,660 INFO  TempWindowFunction     - ----
18:36:21,662 INFO  TempWindowFunction     - Key is:' coil ' and collected element for that key and count: 104
18:36:21,662 INFO  TempWindowFunction     - All tuples [ coil , 1]
18:36:21,662 INFO  TempWindowFunction     - Exit method
18:36:21,662 INFO  TempWindowFunction     - ----
  • 最后,这是执行设置:
//...
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
//...
  • 为什么 Flink 对每次执行都产生不同的输出?


<details>
<summary>英文:</summary>

I am trying to create word count example with flink. Here is the 
[1] for words data (this is the example from flink&#39;s github account) When I count the words with simple java program: public static void main(String[] args) throws Exception { int count = 0; for (String eachSentence : WordCountData.WORDS){ String[] splittedSentence = eachSentence.toLowerCase().split(&quot;\\W+&quot;); for (String eachWord: splittedSentence){ count++; } } System.out.println(count); // result is 287 } Now when I do this with flink, first I will split the sentence to words. ```java DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; readWordByWordStream = splitSentenceWordByWord(wordCountDataSource); //... public DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; splitSentenceWordByWord(DataStream&lt;String&gt; wordDataSourceStream) { DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; wordByWordStream = wordDataSourceStream.flatMap(new TempTransformation()); return wordByWordStream; }
  • Here is the my TempTransformationclass:
public class TempTransformation extends RichFlatMapFunction&lt;String, Tuple2&lt;String, Integer&gt;&gt; {

    @Override
    public void flatMap(String input, Collector&lt;Tuple2&lt;String, Integer&gt;&gt; collector) throws Exception
    {
        String[] splittedSentence = input.toLowerCase().split(&quot;\\W+&quot;);
        for (String eachWord : splittedSentence)
        {
            collector.collect(new Tuple2&lt;String, Integer&gt;(eachWord, 1));
        }
    }
}
  • Now I am going to count the words by converting it to KeyedStream (keyed by word)
    public SingleOutputStreamOperator&lt;String&gt; keyedStreamExample(DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; wordByWordStream)
    {
        return wordByWordStream.keyBy(0).timeWindow(Time.milliseconds(1)).apply(new TempWindowFunction());
    }
  • TempWindowFunction():
public class TempWindowFunction extends RichWindowFunction&lt;Tuple2&lt;String, Integer&gt;, String, Tuple, TimeWindow&gt; {
    private Logger logger = LoggerFactory.getLogger(TempWindowFunction.class);
    private int count = 0;
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable&lt;Tuple2&lt;String, Integer&gt;&gt; input, Collector&lt;String&gt; out) throws Exception
    {
        logger.info(&quot;Key is:&#39; {} &#39; and collected element for that key and count: {}&quot;, (Object) tuple.getField(0), count);
        StringBuilder builder = new StringBuilder();
        for (Tuple2 each : input)
        {
            String key = (String) each.getField(0);
            Integer value = (Integer) each.getField(1);
            String tupleStr = &quot;[ &quot; + key + &quot; , &quot; + value + &quot;]&quot;;
            builder.append(tupleStr);
            count ++;
        }
        logger.info(&quot;All tuples {}&quot;, builder.toString());
        logger.info(&quot;Exit method&quot;);
        logger.info(&quot;----&quot;);
    }
}
  • After running this job with Flink's local environments, outputs always changing, here is the a few samples:
18:09:40,086 INFO  com.sampleFlinkProject.transformations.TempWindowFunction     - Key is:&#39; rub &#39; and collected element for that key and count: 86
18:09:40,086 INFO  TempWindowFunction     - All tuples [ rub , 1]
18:09:40,086 INFO  TempWindowFunction     - Exit method
18:09:40,086 INFO  TempWindowFunction     - ----
18:09:40,086 INFO  TempWindowFunction     - Key is:&#39; for &#39; and collected element for that key and count: 87
18:09:40,086 INFO  TempWindowFunction     - All tuples [ for , 1]
18:09:40,086 INFO  TempWindowFunction     - Exit method
18:09:40,086 INFO  TempWindowFunction     - ----

// another running outputs:

18:36:21,660 INFO  TempWindowFunction     - Key is:&#39; for &#39; and collected element for that key and count: 103
18:36:21,660 INFO  TempWindowFunction     - All tuples [ for , 1]
18:36:21,660 INFO  TempWindowFunction     - Exit method
18:36:21,660 INFO  TempWindowFunction     - ----
18:36:21,662 INFO  TempWindowFunction     - Key is:&#39; coil &#39; and collected element for that key and count: 104
18:36:21,662 INFO  TempWindowFunction     - All tuples [ coil , 1]
18:36:21,662 INFO  TempWindowFunction     - Exit method
18:36:21,662 INFO  TempWindowFunction     - ----
  • Lastly, here is the execution setup
//...
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
//...
  • Why Flink is giving different outputs for each execution?

答案1

得分: 1

你的应用程序中的一个非确定性源是处理时间窗口(长度为1毫秒)。每当你使用处理时间进行窗口化时,窗口最终包含了在时间间隔内出现并进行处理的任何事件。 (基于事件时间戳的事件时间窗口行为是确定性的。)窗口如此短会夸大这种效应。

英文:

One source of non-determinism in your application is the processing time windows (which are 1 ms long). Whenever you use processing time for windowing, then the windows end up containing whatever events happen to show up and get processed during the time interval. (Event time windows do behave deterministically, since they are based on timestamps in the events.) Having the windows be so short is going to exaggerate this effect.

huangapple
  • 本文由 发表于 2020年3月15日 23:42:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/60694613.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定