Flink的OutputTag可以被重用吗?

huangapple go评论72阅读模式
英文:

Can Flink OutputTag be reused?

问题

在 Flink 中,当我们有两个或更多的操作符对相同数据类型的记录进行副输出(side output)时,我们能否重用输出数据类型的 OutputTag?

示例:

OutputTag<A> sideOutputTag = new OutputTag<A>("side-output") {};
ProcessFunction1 processFunction1 = new ProcessFunction1(sideOutputTag);
ProcessFunction2 processFunction2 = new ProcessFunction2(sideOutputTag);

SingleOutputStreamOperator<A> output1 = input.process(processFunction1).getSideOutput(sideOutputTag);

SingleOutputStreamOperator<A> output2 = input.process(processFunction2).getSideOutput(sideOutputTag);

在这种方法中,output1 是否会包含由 processFunction2 处理的输出?
或者,output1output2 是否会分别包含由 processFunction1processFunction2 处理的记录?

谢谢!

英文:

In Flink, when we have two or more operators which are side outputing the same data type of records, can we reuse the OutputTag that data output data type?

Example:

OutputTag&lt;A&gt; sideOutputTag = new OutputTag&lt;A&gt;(&quot;side-output&quot;) {};
ProcessFunction1 processFunction1 = new ProcessFunction1(sideOutputTag);
ProcessFunction2 processFunction2 = new ProcessFunction2(sideOutputTag);

SingleOutputStreamOperator&lt;A&gt; output1 = input.process(processFunction1).getSideOutput(sideOutputTag);

SingleOutputStreamOperator&lt;A&gt; output2 = input.process(processFunction2).getSideOutput(sideOutputTag);

In this approach, will output1 contains the outputs processed by processFunction2?
Or, will output1 and output2 contain the records processed by processFunction1 and processFunction2 separately?

Thanks!

答案1

得分: 4

你可以重用相同的标签,得到的流将是不同的。例如:

final OutputTag<String> errors = new OutputTag<String>("errors"){};

SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;

DataStream<String> exceptions1 = task1.getSideOutput(errors);
DataStream<String> exceptions2 = task2.getSideOutput(errors);
DataStream<String> exceptions3 = task3.getSideOutput(errors);

DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);

exceptions.addSink(new FlinkKafkaProducer(...));
英文:

You can reuse the same tag, and the resulting streams will be distinct. For example:

final OutputTag&lt;String&gt; errors = new OutputTag&lt;String&gt;(&quot;errors&quot;){};

SingleOutputStreamOperator&lt;Integer&gt; task1 = ...;
SingleOutputStreamOperator&lt;Integer&gt; task2 = ...;
SingleOutputStreamOperator&lt;Integer&gt; task3 = ...;

DataStream&lt;String&gt; exceptions1 = task1.getSideOutput(errors);
DataStream&lt;String&gt; exceptions2 = task2.getSideOutput(errors);
DataStream&lt;String&gt; exceptions3 = task3.getSideOutput(errors);

DataStream&lt;String&gt; exceptions = exceptions1.union(exceptions2, exceptions3);

exceptions.addSink(new FlinkKafkaProducer(...));

huangapple
  • 本文由 发表于 2020年9月10日 07:25:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/63820806.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定