英文:
Can Flink OutputTag be reused?
问题
在 Flink 中,当我们有两个或更多的操作符对相同数据类型的记录进行副输出(side output)时,我们能否重用输出数据类型的 OutputTag?
示例:
OutputTag<A> sideOutputTag = new OutputTag<A>("side-output") {};
ProcessFunction1 processFunction1 = new ProcessFunction1(sideOutputTag);
ProcessFunction2 processFunction2 = new ProcessFunction2(sideOutputTag);
SingleOutputStreamOperator<A> output1 = input.process(processFunction1).getSideOutput(sideOutputTag);
SingleOutputStreamOperator<A> output2 = input.process(processFunction2).getSideOutput(sideOutputTag);
在这种方法中,output1
是否会包含由 processFunction2
处理的输出?
或者,output1
和 output2
是否会分别包含由 processFunction1
和 processFunction2
处理的记录?
谢谢!
英文:
In Flink, when we have two or more operators which are side outputing the same data type of records, can we reuse the OutputTag that data output data type?
Example:
OutputTag<A> sideOutputTag = new OutputTag<A>("side-output") {};
ProcessFunction1 processFunction1 = new ProcessFunction1(sideOutputTag);
ProcessFunction2 processFunction2 = new ProcessFunction2(sideOutputTag);
SingleOutputStreamOperator<A> output1 = input.process(processFunction1).getSideOutput(sideOutputTag);
SingleOutputStreamOperator<A> output2 = input.process(processFunction2).getSideOutput(sideOutputTag);
In this approach, will output1
contains the outputs processed by processFunction2
?
Or, will output1
and output2
contain the records processed by processFunction1
and processFunction2
separately?
Thanks!
答案1
得分: 4
你可以重用相同的标签,得到的流将是不同的。例如:
final OutputTag<String> errors = new OutputTag<String>("errors"){};
SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;
DataStream<String> exceptions1 = task1.getSideOutput(errors);
DataStream<String> exceptions2 = task2.getSideOutput(errors);
DataStream<String> exceptions3 = task3.getSideOutput(errors);
DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));
英文:
You can reuse the same tag, and the resulting streams will be distinct. For example:
final OutputTag<String> errors = new OutputTag<String>("errors"){};
SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;
DataStream<String> exceptions1 = task1.getSideOutput(errors);
DataStream<String> exceptions2 = task2.getSideOutput(errors);
DataStream<String> exceptions3 = task3.getSideOutput(errors);
DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论