Dataflow – 将 JSON 文件添加到 BigQuery

huangapple go评论113阅读模式
英文:

Dataflow - add JSON file to BigQuery

问题

我正在使用GCP Dataflow进行一些POC,并向BigQuery添加一些JSON对象。

  1. import apache_beam as beam
  2. import apache_beam.io.gcp.bigquery as b_query
  3. p1 = beam.Pipeline()
  4. trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
  5. freq = (
  6. p1
  7. | beam.Create([{"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}])
  8. | beam.Map(print)
  9. | 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
  10. custom_gcs_temp_location='gs://XXXX-stage-xxxx/temp',
  11. schema=trips_schema, project='xxxxxxxx-xxx-2',
  12. write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
  13. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
  14. )
  15. p1.run()

现在,当我运行此代码时,我收到以下错误:

  1. RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_957_6db0f0222c18bf6ef55dfb301cf9b7b2_2e2519daf47e48888bd08fc7661da2e6 failed. Error Result: <ErrorProto
  2. location: 'gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
  3. message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
  4. reason: 'invalid'> [while running 'Write Record to BigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)']
  5. 在引用错误的阶段桶中的文件为空
  6. 请帮助
  7. <details>
  8. <summary>英文:</summary>
  9. I&#39;m doing some POC with GCP Dataflow and add some JSON object to BigQuery.
  10. import apache_beam as beam
  11. import apache_beam.io.gcp.bigquery as b_query
  12. p1 = beam.Pipeline()
  13. trips_schema = &#39;trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING&#39;
  14. freq = (
  15. p1
  16. | beam.Create([&quot;{\&quot;vendor_id\&quot;:33,\&quot;trip_id\&quot;: 1000474,\&quot;trip_distance\&quot;: 2.3999996185302734,\&quot;fare_amount\&quot;: 42.13,\&quot;store_and_fwd_flag\&quot;: \&quot;Y\&quot;}&quot;])
  17. | beam.Map(print)
  18. | &#39;Write Record to BigQuery&#39; &gt;&gt; b_query.WriteToBigQuery(table=&#39;trips2&#39;, dataset=&#39;my_poc&#39;,
  19. custom_gcs_temp_location=&#39;gs://XXXX-stage&#39;
  20. &#39;-xxxx/temp&#39;,
  21. schema=trips_schema, project=&#39;xxxxxxxx-xxx-2&#39;,
  22. write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
  23. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
  24. )
  25. p1.run()
  26. Now when I&#39;m running this code, I&#39;m getting the following error:
  27. RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_957_6db0f0222c18bf6ef55dfb301cf9b7b2_2e2519daf47e48888bd08fc7661da2e6 failed. Error Result: &lt;ErrorProto
  28. location: &#39;gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c&#39;
  29. message: &#39;Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c&#39;
  30. reason: &#39;invalid&#39;&gt; [while running &#39;Write Record to BigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)&#39;]
  31. File in the stage bucket which referred in error contains null.
  32. Please help.
  33. </details>
  34. # 答案1
  35. **得分**: 2
  36. 如果您想使用`Beam``Dataflow``Json`文件插入到`BigQuery`表中您必须以`NEWLINE_DELIMITED_JSON`格式读取`Json`文件示例
  37. 您的代码示例以模拟输入为例
  38. ```python
  39. import apache_beam as beam
  40. import apache_beam.io.gcp.bigquery as b_query
  41. def map_to_dict(input: str) -> Dict:
  42. return json.loads(input)
  43. p1 = beam.Pipeline()
  44. trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
  45. inputs: List[str] = [
  46. '{"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}',
  47. '{"vendor_id":34,"trip_id": 1000475,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}'
  48. ]
  49. freq = (
  50. p1
  51. | beam.Create(inputs)
  52. | beam.Map(print)
  53. | beam.Map(map_to_dict)
  54. | 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
  55. custom_gcs_temp_location='gs://XXXX-stage-xxxx/temp',
  56. schema=trips_schema, project='xxxxxxxx-xxx-2',
  57. write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
  58. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
  59. )
  60. p1.run()

GCS中的Json文件示例:

input.json:

  1. {"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
  2. {"vendor_id":34,"trip_id": 1000475,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
  1. import apache_beam as beam
  2. import apache_beam.io.gcp.bigquery as b_query
  3. def map_to_dict(input: str) -> Dict:
  4. return json.loads(input)
  5. p1 = beam.Pipeline()
  6. trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
  7. freq = (
  8. p1
  9. | ReadFromText("gs://your-bucket/object/input.json")
  10. | beam.Map(print)
  11. | beam.Map(map_to_dict)
  12. | 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
  13. custom_gcs_temp_location='gs://XXXX-stage-xxxx/temp',
  14. schema=trips_schema, project='xxxxxxxx-xxx-2',
  15. write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
  16. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
  17. )
  18. p1.run()

需要遵守以下条件:

  • 在将PCollection保存到BigQuery之前,需要将字符串转换为字典。
  • Json对象应与BigQuery表的架构完全匹配。您可以在WriteToBigQuery输出连接器中使用此参数来忽略额外的字段:ignore_unknown_columns
英文:

If you want to insert a Json file to a BigQuery table with Beam and Dataflow, you have to read the Json file with NEWLINE_DELIMITED_JSON format, example :

Your code with a mock as input :

  1. import apache_beam as beam
  2. import apache_beam.io.gcp.bigquery as b_query
  3. def map_to_dict(input: str) -&gt; Dict:
  4. return json.loads(input)
  5. p1 = beam.Pipeline()
  6. trips_schema = &#39;trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING&#39;
  7. inputs: List[str] = [
  8. &quot;{\&quot;vendor_id\&quot;:33,\&quot;trip_id\&quot;: 1000474,\&quot;trip_distance\&quot;: 2.3999996185302734,&quot;&quot;\&quot;fare_amount\&quot;: 42.13,\&quot;store_and_fwd_flag\&quot;: \&quot;Y\&quot;}&quot;,
  9. &quot;{\&quot;vendor_id\&quot;:34,\&quot;trip_id\&quot;: 1000475,\&quot;trip_distance\&quot;: 2.3999996185302734,&quot;&quot;\&quot;fare_amount\&quot;: 42.13,\&quot;store_and_fwd_flag\&quot;: \&quot;Y\&quot;}&quot;
  10. ]
  11. freq = (
  12. p1
  13. | beam.Create(inputs)
  14. | beam.Map(print)
  15. | beam.Map(map_to_dict)
  16. | &#39;Write Record to BigQuery&#39; &gt;&gt; b_query.WriteToBigQuery(table=&#39;trips2&#39;, dataset=&#39;my_poc&#39;,
  17. custom_gcs_temp_location=&#39;gs://XXXX-stage&#39;
  18. &#39;-xxxx/temp&#39;,
  19. schema=trips_schema, project=&#39;xxxxxxxx-xxx-2&#39;,
  20. write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
  21. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
  22. )
  23. p1.run()

Example with a Json file from GCS :

input.json :

  1. {&quot;vendor_id&quot;:33,&quot;trip_id&quot;: 1000474,&quot;trip_distance&quot;: 2.3999996185302734,&quot;fare_amount&quot;: 42.13,&quot;store_and_fwd_flag&quot;: &quot;Y&quot;}
  2. {&quot;vendor_id&quot;:34,&quot;trip_id&quot;: 1000475,&quot;trip_distance&quot;: 2.3999996185302734,&quot;fare_amount&quot;: 42.13,&quot;store_and_fwd_flag&quot;: &quot;Y&quot;}
  1. import apache_beam as beam
  2. import apache_beam.io.gcp.bigquery as b_query
  3. def map_to_dict(input: str) -&gt; Dict:
  4. return json.loads(input)
  5. p1 = beam.Pipeline()
  6. trips_schema = &#39;trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING&#39;
  7. freq = (
  8. p1
  9. | ReadFromText(&quot;gs://your-bucket/object/input.json&quot;)
  10. | beam.Map(print)
  11. | beam.Map(map_to_dict)
  12. | &#39;Write Record to BigQuery&#39; &gt;&gt; b_query.WriteToBigQuery(table=&#39;trips2&#39;, dataset=&#39;my_poc&#39;,
  13. custom_gcs_temp_location=&#39;gs://XXXX-stage&#39;
  14. &#39;-xxxx/temp&#39;,
  15. schema=trips_schema, project=&#39;xxxxxxxx-xxx-2&#39;,
  16. write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
  17. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
  18. )
  19. p1.run()

The following conditions need to be respected :

  • You need to transform your string to Dict before to save the PCollection to BigQuery
  • The Json object should match exactly the schema of the BigQuery table. You can ignore extra fields with this parameter in the WriteToBigQuery output connector : ignore_unknown_columns

huangapple
  • 本文由 发表于 2023年6月5日 18:43:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/76405650.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定