在调用waitUntilFinish()后,可以在Dataflow模板中运行代码吗?

huangapple go评论73阅读模式
英文:

Can you run code after waitUntilFinish() is called in a Dataflow template?

问题

我有一个批处理的 Apache Beam 作业,它从 GCS 获取文件作为输入。我的目标是根据管道执行后的状态将文件移动到两个不同的 GCS 存储桶中。如果管道成功执行,将文件移动到存储桶 A;如果管道在执行过程中出现任何未处理的异常,将文件移动到存储桶 B。

我正在使用 Apache Beam 版本 2.24.0 用于 Java,我需要创建一个 Dataflow 模板,以便可以多次使用不同的输入文件运行它。

目前,我的方法是使用 pipeline.run().waitUntilFinish() 运行管道,将调用放在 try-catch 块中,并使用生成的 PipelineResult.State 对象(当出现异常时为 null)来决定将文件移动到哪个存储桶。
在使用 DirectRunner 时,这个方法运行良好,但在创建 Dataflow 模板并使用 DataflowRunner 执行时,它完全忽略了执行图之外的任何代码。

是否有任何方法可以执行定义在 Dataflow 模板执行图之外的代码?
我希望能够在同一份 Java 代码中直接实现这一点,而不需要在作业完成后执行另一个程序(例如 Cloud Function)。

英文:

I have a batch Apache Beam job that takes a file from GCS as input. My goal is to move the file to one of two GCS buckets depending on the pipeline's state after execution. If the pipeline executed successfully, move the file to bucket A, otherwise, if the pipeline had any kind of unhandled exceptions during execution, move the file to bucket B.

I'm using Apache Beam version 2.24.0 for Java, and I need to create a Dataflow template in order to run it multiple times with different input files.

Currently my approach is to run the pipeline with pipeline.run().waitUntilFinish(), wrapping the call with a try-catch and using the resulting PipelineResult.State object (null when there was an exception) to decide to which bucket to move the file.
Using the DirectRunner it works fine, but when creating a Dataflow template and executing it with DataflowRunner, it completely ignores any code outside of the pipeline execution graph.

Is there any way to execute code defined outside of the Dataflow template's execution graph?
I would like to achieve this directly in the same Java code, without the need to execute another program after the job finishes (e.g. Cloud Function).

答案1

得分: 2

抱歉,目前没有办法在数据流模板的执行图之外执行定义的代码。

很遗憾,目前没有方法可以做到这一点。您可以在图之外执行一些代码,但它将在创建(分阶段)模板时运行,而不是在从该模板运行作业时运行。

基本上,“模板”只是序列化的图形。当您使用DataflowRunner运行Beam程序并使用--templateLocation参数时,pipeline.run()仅将图形序列化到文件中(供以后使用),而不是实际上运行新作业。这就是为什么waitUntilFinish()不起作用的原因:当此代码运行时,没有要等待的Dataflow作业。

一个可能的解决方案是使用Wait.on,以便在管道完成处理主数据时执行一些额外的逻辑。但代码仍然需要“内部”于图形中,并且将在单个Dataflow作业内运行。

英文:

> Is there any way to execute code defined outside of the Dataflow template's execution graph?

Unfortunately no way to do it at the moment. You can execute some code outside of the graph, but it will run at the time when you create (stage) the template, not when you run a job from this template.

Basically, a "template" is a simply the serialized graph. When you the Beam program with DataflowRunner and with --templateLocation param, the pipeline.run() only serializes the graph to a file (for later use) instead of actually running a new job. That's why waitUntilFinish() doesn't work: when this code runs, there is no Dataflow job to wait for.

One possible solution is to use Wait.on to execute some additional logic when the pipeline is done processing main data. But the code still needs to be "inside" the graph and will run within a single Dataflow job.

答案2

得分: 0

由于在Dataflow工作器上抛出的Java异常无法在提交作业的主机上捕获,所以出现这种情况的原因可能是这样的。

如果您想在执行后根据管道状态将文件移动到GCS存储桶之一,我建议使用Dataflow API 查询作业的最终状态:

PipelineResult res = p.run();
String jobId = ((DataflowPipelineJob) res).getJobId();
DataflowClient client = DataflowClient.create(options);
/* 使用client轮询最终状态 */

然后,您可以在作业成功时将文件移动到“存储桶A”,在作业失败时将文件移动到“存储桶B”。

另外,我想与您分享一些有用的链接:使用 waitUntilFinish 的示例,以及这个 Stack Overflow 帖子

英文:

The reason for that could be that java Exception is thrown on the Dataflow worker and cannot be caught on host that submits the jobs.

If you want to to move the file to one of GCS buckets depending on the pipeline's state after execution, I would recommend to use the Dataflow API to query the final state of the job:

PipelineResult res = p.run();
  String jobId = ((DataflowPipelineJob) res).getJobId();
  DataflowClient client = DataflowClient.create(options);
  /* use client to poll final state */

Then, you can specify moving the files to the bucket A only when the job is successful and to the bucket B when the job failed.

Additionally, I want to share with you some useful links: exanple with waitUntilFinish, SO thread.

huangapple
  • 本文由 发表于 2020年9月24日 20:05:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/64046071.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定