英文:
How to iterate Big Query TableResult correctly?
问题
我有一个在Big Query中的复杂联接查询,并需要在一个Spark作业中运行。这是当前的代码:
val bigquery = BigQueryOptions.newBuilder().setProjectId(bigQueryConfig.bigQueryProjectId)
.setCredentials(credentials)
.build().getService
val query =
//一些复杂的查询
val queryConfig: QueryJobConfiguration =
QueryJobConfiguration.newBuilder(
query)
.setUseLegacySql(false)
.setPriority(QueryJobConfiguration.Priority.BATCH) //(已尝试设置和不设置)
.build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()
val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()
val result = queryJob.getQueryResults()
val output = result.iterateAll().iterator().asScala.to[Seq].map { row: FieldValueList =>
//从行创建案例类
}
它不断遇到这个错误:
超出速率限制:您的项目:XXX 超出了每项目每秒的tabledata.list字节配额。
是否有更好的方法来迭代结果?我已经尝试在查询作业配置上使用setPriority(QueryJobConfiguration.Priority.BATCH)
,但它并没有改善结果。还尝试将Spark执行器的数量减少到1,但没有用。
英文:
I have a complex join query in Big Query and need to run in a spark job. This is the current code:
val bigquery = BigQueryOptions.newBuilder().setProjectId(bigQueryConfig.bigQueryProjectId)
.setCredentials(credentials)
.build().getService
val query =
//some complex query
val queryConfig: QueryJobConfiguration =
QueryJobConfiguration.newBuilder(
query)
.setUseLegacySql(false)
.setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
.build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()
val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()
val result = queryJob.getQueryResults()
val output = result.iterateAll().iterator().asScala.to[Seq].map { row: FieldValueList =>
//create case class from the row
}
It keeps running into this error:
> Exceeded rate limits: Your project: XXX exceeded quota for tabledata.list bytes per second per project.
Is there a way to better iterate through the results? I have tried to do setPriority(QueryJobConfiguration.Priority.BATCH)
on the query job configuration, but it doesn't improve results. Also tried to reduce the number of spark executors to 1, but of no use.
答案1
得分: 2
代替直接读取查询结果,您可以使用 spark-bigquery-connector 将它们读取到一个 DataFrame 中:
val queryConfig: QueryJobConfiguration =
QueryJobConfiguration.newBuilder(
query)
.setUseLegacySql(false)
.setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
.setDestinationTable(TableId.of(destinationDataset, destinationTable))
.build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()
val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()
val result = queryJob.getQueryResults()
// read into DataFrame
val data = spark.read.format("bigquery")
.option("dataset", destinationDataset)
.option("table", destinationTable)
.load()
英文:
Instead of reading the query results directly, you can use the spark-bigquery-connector to read them into a DataFrame:
val queryConfig: QueryJobConfiguration =
QueryJobConfiguration.newBuilder(
query)
.setUseLegacySql(false)
.setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
.setDestinationTable(TableId.of(destinationDataset, destinationTable))
.build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()
val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()
val result = queryJob.getQueryResults()
// read into DataFrame
val data = spark.read.format("bigquery")
.option("dataset",destinationDataset)
.option("table" destinationTable)
.load()
答案2
得分: 0
我们通过在TableResult上提供自定义页面大小来解决了这个情况。
英文:
We resolved the situation by providing a custom page size on the TableResult
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论