Is it possible to catch a missing dataset java.lang.RuntimeException in a Google Cloud Dataflow pipeline that writes from Pub/Sub to BigQuery?

huangapple go评论87阅读模式
英文:

Is it possible to catch a missing dataset java.lang.RuntimeException in a Google Cloud Dataflow pipeline that writes from Pub/Sub to BigQuery?

问题

我正在尝试处理这样一种错误,即我的Dataflow作业尝试动态写入BigQuery表目标时出现的错误。

我想捕获以下异常:

> java.lang.RuntimeException:无法获取项目example_project中dataset example_dataset的数据集

以便创建数据集,然后重试向BigQuery写入。

是否可以以这种方式捕获异常?如果可以的话,您知道我需要在代码中的哪里添加try/catch逻辑吗?

英文:

I am trying to handle errors in which my Dataflow job attempts to dynamically write to BigQuery table destinations.

I would like to catch the following exception:

> java.lang.RuntimeException: unable to obtain dataset for dataset
> example_dataset in project example_project

in order to create the dataset and then retry writing to BigQuery.

Is it possible to catch exceptions in this manner and if so, do you know where I would need to add the try/catch logic in my code?

答案1

得分: 2

你无法使用 try-catch 块来处理这种情况,因为它是内部 BQ API 错误。相反,我建议您编写“重试临时策略”并设置错误类型。这样,您可以将 BigQuery 写入错误结果存储在 PCollection 中,然后根据您的意愿转储该记录。请参考下面的代码片段来实现相同的功能。

WriteResult result = formattedData.get(successRows).setCoder(TableRowJsonCoder.of()).apply("BQ SteamingInserts",
                BigQueryIO.writeTableRows().withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                        .to("audit.db_audit")
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()).withoutValidation()
                        .withExtendedErrorInfo());

使用上面的代码片段,如果由于 ddl 操作而失败,数据将存储在 WriteResult 中。

PCollection<String> failedInserts = result.getFailedInsertsWithErr().apply("BQErrorToTableRow",
                ParDo.of(new BQErrorToString()));

您可以使用上面的代码片段获取失败的记录。如果有帮助,请告诉我 Is it possible to catch a missing dataset java.lang.RuntimeException in a Google Cloud Dataflow pipeline that writes from Pub/Sub to BigQuery?

英文:

You can't handle this scenario using try-catch block, as it's an internal BQ api error. Rather I would suggest you to write Retry Transient policy and set the error type. This way you can store the BigQuery write error result in PCollection and then dump that record as your wish. Please refer the below snippet to achieve the same.

WriteResult result = formattedData.get(successRows).setCoder(TableRowJsonCoder.of()).apply(&quot;BQ SteamingInserts&quot;,
				BigQueryIO.writeTableRows().withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
						.to(&quot;audit.db_audit&quot;)
						.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
						.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
						.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()).withoutValidation()
						.withExtendedErrorInfo());

Using above code snippet if something fails because of ddl ops, the data will store in WriteResult.

PCollection&lt;String&gt; failedInserts = result.getFailedInsertsWithErr().apply(&quot;BQErrorToTableRow&quot;,
				ParDo.of(new BQErrorToString()));

You can get the failed record using the above code snippet. Let me know if that helps Is it possible to catch a missing dataset java.lang.RuntimeException in a Google Cloud Dataflow pipeline that writes from Pub/Sub to BigQuery?

答案2

得分: 1

不存在的BigQuery数据集和/或表将无限期地进行重试,可能会导致管道卡住。BigQueryIO没有可配置的选项来自动创建不存在的BigQuery数据集,它只有一个选项来创建不存在的BigQuery表,但指定的数据集资源必须在调用“写入表”代码之前存在或被创建。

我还在Beam文档中找到了这样的结论:

> 要写入的数据集必须已经存在

请参考官方文档,了解在Cloud Dataflow中如何处理Java异常,并查看示例

在批处理模式下,Dataflow服务最多会重试失败的任务4次,在流处理模式下则会无限次重试。在批处理模式下,作业将会失败,在流处理模式下可能会无限期地停滞。

希望能有所帮助。

英文:

Nonexistent BigQuery datasets and/or tables, will be retried indefinitely and may cause a stuck pipeline. BigQueryIO doesn't have a configurable option to automatically create nonexistent BigQuery datasets, it only has an option to create nonexistent BigQuery tables, but the dataset resource that is specified must exist or be created before writing to table code is called.

I also found in the Beam documentation that concludes that

> the dataset being written to must already exist

Please, refer to official documentation and see how Java exceptions are handled in Cloud Dataflow and see examples.

The Dataflow service retries failed tasks up to 4 times in batch mode, and an unlimited number of times in streaming mode. In batch mode, your job will fail and in streaming, it may stall indefinitely.

I hope it helps.

huangapple
  • 本文由 发表于 2020年3月4日 06:49:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/60516657.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定