如何正确迭代Big Query TableResult?

huangapple go评论60阅读模式
英文:

How to iterate Big Query TableResult correctly?

问题

我有一个在Big Query中的复杂联接查询,并需要在一个Spark作业中运行。这是当前的代码:

val bigquery = BigQueryOptions.newBuilder().setProjectId(bigQueryConfig.bigQueryProjectId)
      .setCredentials(credentials)
      .build().getService

val query =
      //一些复杂的查询

val queryConfig: QueryJobConfiguration =
      QueryJobConfiguration.newBuilder(
        query)
        .setUseLegacySql(false)
        .setPriority(QueryJobConfiguration.Priority.BATCH) //(已尝试设置和不设置)
        .build()

val jobId: JobId = JobId.newBuilder().setRandomJob().build()

val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()

val result = queryJob.getQueryResults()

val output = result.iterateAll().iterator().asScala.to[Seq].map { row: FieldValueList =>
    
//从行创建案例类
}

它不断遇到这个错误:

超出速率限制:您的项目:XXX 超出了每项目每秒的tabledata.list字节配额。

是否有更好的方法来迭代结果?我已经尝试在查询作业配置上使用setPriority(QueryJobConfiguration.Priority.BATCH),但它并没有改善结果。还尝试将Spark执行器的数量减少到1,但没有用。

英文:

I have a complex join query in Big Query and need to run in a spark job. This is the current code:

val bigquery = BigQueryOptions.newBuilder().setProjectId(bigQueryConfig.bigQueryProjectId)
      .setCredentials(credentials)
      .build().getService

val query =
      //some complex query

val queryConfig: QueryJobConfiguration =
      QueryJobConfiguration.newBuilder(
        query)
        .setUseLegacySql(false)
        .setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
        .build()

val jobId: JobId = JobId.newBuilder().setRandomJob().build()
     
val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()

val result = queryJob.getQueryResults()

val output = result.iterateAll().iterator().asScala.to[Seq].map { row: FieldValueList =>

//create case class from the row
}

It keeps running into this error:

> Exceeded rate limits: Your project: XXX exceeded quota for tabledata.list bytes per second per project.

Is there a way to better iterate through the results? I have tried to do setPriority(QueryJobConfiguration.Priority.BATCH) on the query job configuration, but it doesn't improve results. Also tried to reduce the number of spark executors to 1, but of no use.

答案1

得分: 2

代替直接读取查询结果,您可以使用 spark-bigquery-connector 将它们读取到一个 DataFrame 中:

val queryConfig: QueryJobConfiguration =
  QueryJobConfiguration.newBuilder(
    query)
    .setUseLegacySql(false)
    .setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
    .setDestinationTable(TableId.of(destinationDataset, destinationTable))
    .build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()

val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()

val result = queryJob.getQueryResults()

// read into DataFrame
val data = spark.read.format("bigquery")
  .option("dataset", destinationDataset)
  .option("table", destinationTable)
  .load()
英文:

Instead of reading the query results directly, you can use the spark-bigquery-connector to read them into a DataFrame:

val queryConfig: QueryJobConfiguration =
      QueryJobConfiguration.newBuilder(
        query)
        .setUseLegacySql(false)
        .setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
        .setDestinationTable(TableId.of(destinationDataset, destinationTable))
        .build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()

val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()

val result = queryJob.getQueryResults()

// read into DataFrame
val data = spark.read.format("bigquery")
  .option("dataset",destinationDataset)
  .option("table" destinationTable)
  .load()

答案2

得分: 0

我们通过在TableResult上提供自定义页面大小来解决了这个情况。

英文:

We resolved the situation by providing a custom page size on the TableResult

huangapple
  • 本文由 发表于 2020年1月3日 23:50:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/59581485.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定