英文:
Is it possible to catch a missing dataset java.lang.RuntimeException in a Google Cloud Dataflow pipeline that writes from Pub/Sub to BigQuery?
问题
我正在尝试处理这样一种错误,即我的Dataflow作业尝试动态写入BigQuery表目标时出现的错误。
我想捕获以下异常:
> java.lang.RuntimeException:无法获取项目example_project中dataset example_dataset的数据集
以便创建数据集,然后重试向BigQuery写入。
是否可以以这种方式捕获异常?如果可以的话,您知道我需要在代码中的哪里添加try/catch逻辑吗?
英文:
I am trying to handle errors in which my Dataflow job attempts to dynamically write to BigQuery table destinations.
I would like to catch the following exception:
> java.lang.RuntimeException: unable to obtain dataset for dataset
> example_dataset in project example_project
in order to create the dataset and then retry writing to BigQuery.
Is it possible to catch exceptions in this manner and if so, do you know where I would need to add the try/catch logic in my code?
答案1
得分: 2
你无法使用 try-catch 块来处理这种情况,因为它是内部 BQ API 错误。相反,我建议您编写“重试临时策略”并设置错误类型。这样,您可以将 BigQuery 写入错误结果存储在 PCollection 中,然后根据您的意愿转储该记录。请参考下面的代码片段来实现相同的功能。
WriteResult result = formattedData.get(successRows).setCoder(TableRowJsonCoder.of()).apply("BQ SteamingInserts",
BigQueryIO.writeTableRows().withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.to("audit.db_audit")
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()).withoutValidation()
.withExtendedErrorInfo());
使用上面的代码片段,如果由于 ddl 操作而失败,数据将存储在 WriteResult 中。
PCollection<String> failedInserts = result.getFailedInsertsWithErr().apply("BQErrorToTableRow",
ParDo.of(new BQErrorToString()));
您可以使用上面的代码片段获取失败的记录。如果有帮助,请告诉我
英文:
You can't handle this scenario using try-catch block, as it's an internal BQ api error. Rather I would suggest you to write Retry Transient policy and set the error type. This way you can store the BigQuery write error result in PCollection and then dump that record as your wish. Please refer the below snippet to achieve the same.
WriteResult result = formattedData.get(successRows).setCoder(TableRowJsonCoder.of()).apply("BQ SteamingInserts",
BigQueryIO.writeTableRows().withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.to("audit.db_audit")
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()).withoutValidation()
.withExtendedErrorInfo());
Using above code snippet if something fails because of ddl ops, the data will store in WriteResult.
PCollection<String> failedInserts = result.getFailedInsertsWithErr().apply("BQErrorToTableRow",
ParDo.of(new BQErrorToString()));
You can get the failed record using the above code snippet. Let me know if that helps
答案2
得分: 1
不存在的BigQuery数据集和/或表将无限期地进行重试,可能会导致管道卡住。BigQueryIO没有可配置的选项来自动创建不存在的BigQuery数据集,它只有一个选项来创建不存在的BigQuery表,但指定的数据集资源必须在调用“写入表”代码之前存在或被创建。
我还在Beam文档中找到了这样的结论:
> 要写入的数据集必须已经存在
请参考官方文档,了解在Cloud Dataflow中如何处理Java异常,并查看示例。
在批处理模式下,Dataflow服务最多会重试失败的任务4次,在流处理模式下则会无限次重试。在批处理模式下,作业将会失败,在流处理模式下可能会无限期地停滞。
希望能有所帮助。
英文:
Nonexistent BigQuery datasets and/or tables, will be retried indefinitely and may cause a stuck pipeline. BigQueryIO doesn't have a configurable option to automatically create nonexistent BigQuery datasets, it only has an option to create nonexistent BigQuery tables, but the dataset resource that is specified must exist or be created before writing to table
code is called.
I also found in the Beam documentation that concludes that
> the dataset being written to must already exist
Please, refer to official documentation and see how Java exceptions are handled in Cloud Dataflow and see examples.
The Dataflow service retries failed tasks up to 4 times in batch mode, and an unlimited number of times in streaming mode. In batch mode, your job will fail and in streaming, it may stall indefinitely.
I hope it helps.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论