英文:
Flink Unit Test over ProcessWindowFunction<IN, OUT, KEY, W>
问题
如何为Stateful Process Function创建单元测试。我有类似以下的代码:
private static SingleOutputStreamOperator<Tuple> methodName(KeyedStream<Event, String> stream) {
return stream.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.process(new ProcessFunction());
}
和
ProcessFunction extends ProcessWindowFunction<IN, OUT, KEY, W>
我在Flink页面找到的所有Harness测试都是扩展自KeyedProcessFunction,但这不适用于我的情况。谢谢。祝一切顺利!
英文:
How can I create a unit test for a Stateful Process Function. I have something like this:
private static SingleOutputStreamOperator<Tuple> methodName(KeyedStream<Event, String> stream) {
return stream.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.process(new ProcessFunction());
}
and
ProcessFunction extends ProcessWindowFunction<IN, OUT, KEY, W>
All the Harness tests that I've found in Flink page are extending from KeyedProcessFunction and this is not my case.
Thanks.
Kind regards!
答案1
得分: 4
一般来说,这些测试框架期望测试一个操作符,而不是用户函数。因此,在使用 ProcessWindowFunction
时,您需要首先创建一个适合的窗口操作符,然后将其传递给测试框架。
您可以使用一个包装了您的 ProcessWindowFunction
的 WindowOperator
来实例化的 OneInputStreamOperatorTestHarness
来测试 ProcessWindowFunction
。恐怕这并不特别简单,但我可以参考 https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L437 作为示例。
英文:
In general these test harnesses expect to be testing an operator, rather than a user function. So in the case of a ProcessWindowFunction
, you need to first create an suitable window operator to pass to the test harness.
You can test a ProcessWindowFunction
using a OneInputStreamOperatorTestHarness
that you instantiate with a WindowOperator
wrapped around your ProcessWindowFunction
. I'm afraid this isn't particularly straightforward, but I can refer you to https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L437 as an example.
答案2
得分: 2
I found a solution, inspired by this method: https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L1340
In my case, I have to test a TumblingProcessingTimeWindow
where the process()
operator uses a ProcessWindowFunction
to count words while keeping the previous window count (i.e., not resetting the count each time the window is triggered).
WordCountPojo
is a simple POJO with two fields: word
and count
(you can use Tuple2
if you prefer).
This is the test I wrote:
@Test
void testCounter() throws Exception {
// Create a WindowOperator<Key, Input, Accumulator, Output, Window>
WindowOperator<String, WordCountPojo, Iterable<WordCountPojo>, WordCountPojo, TimeWindow> operator =
new WindowOperator<>(
TumblingProcessingTimeWindows.of(Time.seconds(3)), // window assigner
new TimeWindow.Serializer(), // window serializer
WordCountPojo::getWord, // key selector
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), // key serializer
new ListStateDescriptor<>( // window state descriptor (to accumulate events inside the window)
"window-content",
TypeInformation.of(WordCountPojo.class).createSerializer(new ExecutionConfig())), // input serializer
new InternalIterableProcessWindowFunction<>(new Counter()), // my custom ProcessWindowFunction to invoke
ProcessingTimeTrigger.create(), // window trigger
0,
null);
// Flink Test Harness
OneInputStreamOperatorTestHarness<WordCountPojo, WordCountPojo> harness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, WordCountPojo::getWord, BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
harness.open();
harness.setProcessingTime(10);
// Push data into the window
harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("or", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("not", 1)));
harness.setProcessingTime(3500); // Set processing time to trigger the window
// Expected result
expected.add(new StreamRecord<>(new WordCountPojo("to", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("be", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("or", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("not", 1), 2999));
TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
.thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));
// Push other WordCountPojos to test global counting
harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));
harness.setProcessingTime(7000); // Trigger the window again
// Expected result
expected.add(new StreamRecord<>(new WordCountPojo("to", 2), 5999));
expected.add(new StreamRecord<>(new WordCountPojo("be", 2), 5999));
TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
.thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));
harness.close();
}
Attention points:
- The type of the accumulator for the
WindowOperator
isIterable<WordCountPojo>
and NOT simplyWordCountPojo
. This is because myCounter
'sprocess()
method receives anIterable
and not a singleWordCountPojo
(remember thatCounter
extendsWindowProcessFunction
). WindowOperator
's state descriptor parameter is aListStateDescriptor
, which means that when the window collectsWordCountPojo
s (theWindowOperatorTest
example uses aReducingStateDescriptor
that reduces by sum, but I don't need to do this because I have theCounter
function that is the function I want to test).WindowsOperator
's internal window function parameter is of typeInternalIterableProcessWindowFunction
. This function wraps myCounter
function and is invoked when the window is triggered. Because the window accumulates anIterable<WordCountPojo>
collected by using aListStateDescriptor
, when theCounter
function is invoked, thisIterable<WordCountPojo>
is passed as an input parameter to theprocess()
method.
英文:
I found a solution, inspired to this method https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L1340
In my case, I have to test a TumblingProcessingTimeWindow
where the process()
operator use a ProcessWindowFunction
to count words keeping the previous window count (i.e not resetting the count each time the window is triggered)
WordCountPojo
is a simple POJO with two field: word
and count
(you can use Tuple2
if you please)
This is the test I wrote:
@Test
void testCounter() throws Exception {
//create a WindowOperator<Key, Input, Accumulator, Output, Window>
WindowOperator<String, WordCountPojo, Iterable<WordCountPojo>, WordCountPojo, TimeWindow> operator =
new WindowOperator<>(
TumblingProcessingTimeWindows.of(Time.seconds(3)), //window assigner
new TimeWindow.Serializer(), //window serializer
WordCountPojo::getWord, //key selector
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), //key serializer
new ListStateDescriptor<>( //window state descriptor (in order to accumulate events inside the window)
"window-content",
TypeInformation.of(WordCountPojo.class).createSerializer(new ExecutionConfig())), //input serializer
new InternalIterableProcessWindowFunction<>(new Counter()), //my custom ProcessWindowFunction to invoke
ProcessingTimeTrigger.create(), //window trigger
0,
null);
//Flink Test Harness
OneInputStreamOperatorTestHarness<WordCountPojo, WordCountPojo> harness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, WordCountPojo::getWord, BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
harness.open();
harness.setProcessingTime(10);
//Push data into window
harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("or", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("not", 1)));
harness.setProcessingTime(3500); //Set processing time in order to trigger the window
//Expected result
expected.add(new StreamRecord<>(new WordCountPojo("to", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("be", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("or", 1), 2999));
expected.add(new StreamRecord<>(new WordCountPojo("not", 1), 2999));
TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
.thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));
//push other WordCountPojos to test global counting
harness.processElement(new StreamRecord<>(new WordCountPojo("to", 1)));
harness.processElement(new StreamRecord<>(new WordCountPojo("be", 1)));
harness.setProcessingTime(7000); //trigger the window again
//Expected result
expected.add(new StreamRecord<>(new WordCountPojo("to", 2), 5999));
expected.add(new StreamRecord<>(new WordCountPojo("be", 2), 5999));
TestHarnessUtil.assertOutputEqualsSorted("Output not equal to expected", expected, harness.getOutput(),
Comparator.comparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getValue().getWord())
.thenComparing(streamRecord -> ((StreamRecord<WordCountPojo>) streamRecord).getTimestamp()));
harness.close();
}
Attention points:
- Type of the accumulator for the
WindowOperator
is
Iterable<WordCountPojo>
and NOT simplyWordCountPojo
. This
because myCounter
'sprocess()
method receive anIterable
and
not a singleWordCountPojo
(remember thatCounter
extends
WindowProcessFunction
) WindowOperator
's state descriptor parameter is aListStateDescriptor
,
this means that when window collectsWordCountPojo
s (WindowOperatorTest
example use aReducingStateDescriptor
that reduce by sum, but I don't need to do these because I've theCounter
function that is the function that I want to test)WindowsOperator
's internal window function parameter is of typeInternaleIterableProcessWindowFunction
. This function wraps myCounter
function and is invoked when the window is triggered. Because the window accumulate anIterable<WordCountPojo>
collected by using aListStateDescriptor
, when theCounter
function is invoked this Iterable<WordCountPojo> is passed as input parameter of theprocess()
method.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论