Flink Unit Test over ProcessWindowFunction

huangapple go评论84阅读模式
英文:

Flink Unit Test over ProcessWindowFunction<IN, OUT, KEY, W>

问题

如何为Stateful Process Function创建单元测试。我有类似以下的代码:

private static SingleOutputStreamOperator<Tuple> methodName(KeyedStream<Event, String> stream) {
    return stream.window(TumblingEventTimeWindows.of(Time.minutes(10)))
            .process(new ProcessFunction());
}

ProcessFunction extends ProcessWindowFunction<IN, OUT, KEY, W>

我在Flink页面找到的所有Harness测试都是扩展自KeyedProcessFunction,但这不适用于我的情况。谢谢。祝一切顺利!

英文:

How can I create a unit test for a Stateful Process Function. I have something like this:

 private static SingleOutputStreamOperator&lt;Tuple&gt; methodName(KeyedStream&lt;Event, String&gt; stream) {
        return stream.window(TumblingEventTimeWindows.of(Time.minutes(10)))
                .process(new ProcessFunction());
    }

and

ProcessFunction extends ProcessWindowFunction&lt;IN, OUT, KEY, W&gt;

All the Harness tests that I've found in Flink page are extending from KeyedProcessFunction and this is not my case.
Thanks.
Kind regards!

答案1

得分: 4

一般来说,这些测试框架期望测试一个操作符,而不是用户函数。因此,在使用 ProcessWindowFunction 时,您需要首先创建一个适合的窗口操作符,然后将其传递给测试框架。

您可以使用一个包装了您的 ProcessWindowFunctionWindowOperator 来实例化的 OneInputStreamOperatorTestHarness 来测试 ProcessWindowFunction。恐怕这并不特别简单,但我可以参考 https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L437 作为示例。

英文:

In general these test harnesses expect to be testing an operator, rather than a user function. So in the case of a ProcessWindowFunction, you need to first create an suitable window operator to pass to the test harness.

You can test a ProcessWindowFunction using a OneInputStreamOperatorTestHarness that you instantiate with a WindowOperator wrapped around your ProcessWindowFunction. I'm afraid this isn't particularly straightforward, but I can refer you to https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L437 as an example.

答案2

得分: 2

I found a solution, inspired by this method: https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L1340

In my case, I have to test a TumblingProcessingTimeWindow where the process() operator uses a ProcessWindowFunction to count words while keeping the previous window count (i.e., not resetting the count each time the window is triggered).

WordCountPojo is a simple POJO with two fields: word and count (you can use Tuple2 if you prefer).

This is the test I wrote:

@Test
void testCounter() throws Exception {
    // Create a WindowOperator&lt;Key, Input, Accumulator, Output, Window&gt;
    WindowOperator&lt;String, WordCountPojo, Iterable&lt;WordCountPojo&gt;, WordCountPojo, TimeWindow&gt; operator =
            new WindowOperator&lt;&gt;(
                    TumblingProcessingTimeWindows.of(Time.seconds(3)), // window assigner
                    new TimeWindow.Serializer(), // window serializer
                    WordCountPojo::getWord, // key selector
                    BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), // key serializer
                    new ListStateDescriptor&lt;&gt;( // window state descriptor (to accumulate events inside the window)
                            "window-content",
                            TypeInformation.of(WordCountPojo.class).createSerializer(new ExecutionConfig())), // input serializer
                    new InternalIterableProcessWindowFunction&lt;&gt;(new Counter()), // my custom ProcessWindowFunction to invoke
                    ProcessingTimeTrigger.create(), // window trigger
                    0,
                    null);

    // Flink Test Harness
    OneInputStreamOperatorTestHarness&lt;WordCountPojo, WordCountPojo&gt; harness =
            new KeyedOneInputStreamOperatorTestHarness&lt;&gt;(operator, WordCountPojo::getWord, BasicTypeInfo.STRING_TYPE_INFO);

    ConcurrentLinkedQueue&lt;Object&gt; expected = new ConcurrentLinkedQueue&lt;&gt;();
    harness.open();
    harness.setProcessingTime(10);

    // Push data into the window
    harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo("to", 1)));
    harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo("be", 1)));
    harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo("or", 1)));
    harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo("not", 1)));

    harness.setProcessingTime(3500); // Set processing time to trigger the window

    // Expected result
    expected.add(new StreamRecord&lt;&gt;(new WordCountPojo("to", 1), 2999));
    expected.add(new StreamRecord&lt;&gt;(new WordCountPojo("be", 1), 2999));
    expected.add(new StreamRecord&lt;&gt;(new WordCountPojo("or", 1), 2999));
    expected.add(new StreamRecord&lt;&gt;(new WordCountPojo("not", 1), 2999));

    TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
            Comparator.comparing(streamRecord -&gt; ((StreamRecord&lt;WordCountPojo&gt;) streamRecord).getValue().getWord())
                    .thenComparing(streamRecord -&gt; ((StreamRecord&lt;WordCountPojo&gt;) streamRecord).getTimestamp()));

    // Push other WordCountPojos to test global counting
    harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo("to", 1)));
    harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo("be", 1)));

    harness.setProcessingTime(7000); // Trigger the window again

    // Expected result
    expected.add(new StreamRecord&lt;&gt;(new WordCountPojo("to", 2), 5999));
    expected.add(new StreamRecord&lt;&gt;(new WordCountPojo("be", 2), 5999));

    TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
            Comparator.comparing(streamRecord -&gt; ((StreamRecord&lt;WordCountPojo&gt;) streamRecord).getValue().getWord())
                    .thenComparing(streamRecord -&gt; ((StreamRecord&lt;WordCountPojo&gt;) streamRecord).getTimestamp()));

    harness.close();
}

Attention points:

  • The type of the accumulator for the WindowOperator is Iterable&lt;WordCountPojo&gt; and NOT simply WordCountPojo. This is because my Counter's process() method receives an Iterable and not a single WordCountPojo (remember that Counter extends WindowProcessFunction).
  • WindowOperator's state descriptor parameter is a ListStateDescriptor, which means that when the window collects WordCountPojos (the WindowOperatorTest example uses a ReducingStateDescriptor that reduces by sum, but I don't need to do this because I have the Counter function that is the function I want to test).
  • WindowsOperator's internal window function parameter is of type InternalIterableProcessWindowFunction. This function wraps my Counter function and is invoked when the window is triggered. Because the window accumulates an Iterable&lt;WordCountPojo&gt; collected by using a ListStateDescriptor, when the Counter function is invoked, this Iterable&lt;WordCountPojo&gt; is passed as an input parameter to the process() method.
英文:

I found a solution, inspired to this method https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L1340

In my case, I have to test a TumblingProcessingTimeWindow where the process() operator use a ProcessWindowFunction to count words keeping the previous window count (i.e not resetting the count each time the window is triggered)

WordCountPojo is a simple POJO with two field: word and count (you can use Tuple2 if you please)

This is the test I wrote:

@Test
void testCounter() throws Exception {
//create a WindowOperator&lt;Key, Input, Accumulator, Output, Window&gt;
WindowOperator&lt;String, WordCountPojo, Iterable&lt;WordCountPojo&gt;, WordCountPojo, TimeWindow&gt; operator =
new WindowOperator&lt;&gt;(
TumblingProcessingTimeWindows.of(Time.seconds(3)), //window assigner
new TimeWindow.Serializer(), //window serializer
WordCountPojo::getWord, //key selector
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), //key serializer
new ListStateDescriptor&lt;&gt;( //window state descriptor (in order to accumulate events inside the window)
&quot;window-content&quot;,
TypeInformation.of(WordCountPojo.class).createSerializer(new ExecutionConfig())), //input serializer
new InternalIterableProcessWindowFunction&lt;&gt;(new Counter()), //my custom ProcessWindowFunction to invoke
ProcessingTimeTrigger.create(), //window trigger
0,
null);
//Flink Test Harness
OneInputStreamOperatorTestHarness&lt;WordCountPojo, WordCountPojo&gt; harness =
new KeyedOneInputStreamOperatorTestHarness&lt;&gt;(operator, WordCountPojo::getWord, BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue&lt;Object&gt; expected = new ConcurrentLinkedQueue&lt;&gt;();
harness.open();
harness.setProcessingTime(10);
//Push data into window
harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;to&quot;, 1)));
harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;be&quot;, 1)));
harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;or&quot;, 1)));
harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;not&quot;, 1)));
harness.setProcessingTime(3500); //Set processing time in order to trigger the window
//Expected result
expected.add(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;to&quot;, 1), 2999));
expected.add(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;be&quot;, 1), 2999));
expected.add(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;or&quot;, 1), 2999));
expected.add(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;not&quot;, 1), 2999));
TestHarnessUtil.assertOutputEqualsSorted(&quot;Output not equal to expected&quot;, expected, harness.getOutput(),
Comparator.comparing(streamRecord -&gt; ((StreamRecord&lt;WordCountPojo&gt;) streamRecord).getValue().getWord())
.thenComparing(streamRecord -&gt; ((StreamRecord&lt;WordCountPojo&gt;) streamRecord).getTimestamp()));
//push other WordCountPojos to test global counting
harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;to&quot;, 1)));
harness.processElement(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;be&quot;, 1)));
harness.setProcessingTime(7000); //trigger the window again
//Expected result
expected.add(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;to&quot;, 2), 5999));
expected.add(new StreamRecord&lt;&gt;(new WordCountPojo(&quot;be&quot;, 2), 5999));
TestHarnessUtil.assertOutputEqualsSorted(&quot;Output not equal to expected&quot;, expected, harness.getOutput(),
Comparator.comparing(streamRecord -&gt; ((StreamRecord&lt;WordCountPojo&gt;) streamRecord).getValue().getWord())
.thenComparing(streamRecord -&gt; ((StreamRecord&lt;WordCountPojo&gt;) streamRecord).getTimestamp()));
harness.close();
}

Attention points:

  • Type of the accumulator for the WindowOperator is
    Iterable&lt;WordCountPojo&gt; and NOT simply WordCountPojo. This
    because my Counter's process() method receive an Iterable and
    not a single WordCountPojo (remember that Counter extends
    WindowProcessFunction)
  • WindowOperator's state descriptor parameter is a ListStateDescriptor,
    this means that when window collects WordCountPojos (WindowOperatorTest example use a ReducingStateDescriptor that reduce by sum, but I don't need to do these because I've the Counter function that is the function that I want to test)
  • WindowsOperator's internal window function parameter is of type InternaleIterableProcessWindowFunction. This function wraps my Counter function and is invoked when the window is triggered. Because the window accumulate an Iterable&lt;WordCountPojo&gt; collected by using aListStateDescriptor, when the Counter function is invoked this Iterable<WordCountPojo> is passed as input parameter of the process() method.

huangapple
  • 本文由 发表于 2020年8月14日 02:24:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/63401125.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定