英文:
Dataflow - add JSON file to BigQuery
问题
我正在使用GCP Dataflow进行一些POC,并向BigQuery添加一些JSON对象。
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query
p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
freq = (
p1
| beam.Create([{"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}])
| beam.Map(print)
| 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
custom_gcs_temp_location='gs://XXXX-stage-xxxx/temp',
schema=trips_schema, project='xxxxxxxx-xxx-2',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
p1.run()
现在,当我运行此代码时,我收到以下错误:
RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_957_6db0f0222c18bf6ef55dfb301cf9b7b2_2e2519daf47e48888bd08fc7661da2e6 failed. Error Result: <ErrorProto
location: 'gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
reason: 'invalid'> [while running 'Write Record to BigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)']
在引用错误的阶段桶中的文件为空。
请帮助。
<details>
<summary>英文:</summary>
I'm doing some POC with GCP Dataflow and add some JSON object to BigQuery.
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query
p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
freq = (
p1
| beam.Create(["{\"vendor_id\":33,\"trip_id\": 1000474,\"trip_distance\": 2.3999996185302734,\"fare_amount\": 42.13,\"store_and_fwd_flag\": \"Y\"}"])
| beam.Map(print)
| 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
custom_gcs_temp_location='gs://XXXX-stage'
'-xxxx/temp',
schema=trips_schema, project='xxxxxxxx-xxx-2',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
)
p1.run()
Now when I'm running this code, I'm getting the following error:
RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_957_6db0f0222c18bf6ef55dfb301cf9b7b2_2e2519daf47e48888bd08fc7661da2e6 failed. Error Result: <ErrorProto
location: 'gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
reason: 'invalid'> [while running 'Write Record to BigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)']
File in the stage bucket which referred in error contains null.
Please help.
</details>
# 答案1
**得分**: 2
如果您想使用`Beam`和`Dataflow`将`Json`文件插入到`BigQuery`表中,您必须以`NEWLINE_DELIMITED_JSON`格式读取`Json`文件,示例:
您的代码示例,以模拟输入为例:
```python
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query
def map_to_dict(input: str) -> Dict:
return json.loads(input)
p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
inputs: List[str] = [
'{"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}',
'{"vendor_id":34,"trip_id": 1000475,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}'
]
freq = (
p1
| beam.Create(inputs)
| beam.Map(print)
| beam.Map(map_to_dict)
| 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
custom_gcs_temp_location='gs://XXXX-stage-xxxx/temp',
schema=trips_schema, project='xxxxxxxx-xxx-2',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
p1.run()
从GCS
中的Json
文件示例:
input.json:
{"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
{"vendor_id":34,"trip_id": 1000475,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query
def map_to_dict(input: str) -> Dict:
return json.loads(input)
p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
freq = (
p1
| ReadFromText("gs://your-bucket/object/input.json")
| beam.Map(print)
| beam.Map(map_to_dict)
| 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
custom_gcs_temp_location='gs://XXXX-stage-xxxx/temp',
schema=trips_schema, project='xxxxxxxx-xxx-2',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
p1.run()
需要遵守以下条件:
- 在将
PCollection
保存到BigQuery
之前,需要将字符串转换为字典。 Json
对象应与BigQuery
表的架构完全匹配。您可以在WriteToBigQuery
输出连接器中使用此参数来忽略额外的字段:ignore_unknown_columns
英文:
If you want to insert a Json
file to a BigQuery
table with Beam
and Dataflow
, you have to read the Json
file with NEWLINE_DELIMITED_JSON
format, example :
Your code with a mock as input :
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query
def map_to_dict(input: str) -> Dict:
return json.loads(input)
p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
inputs: List[str] = [
"{\"vendor_id\":33,\"trip_id\": 1000474,\"trip_distance\": 2.3999996185302734,""\"fare_amount\": 42.13,\"store_and_fwd_flag\": \"Y\"}",
"{\"vendor_id\":34,\"trip_id\": 1000475,\"trip_distance\": 2.3999996185302734,""\"fare_amount\": 42.13,\"store_and_fwd_flag\": \"Y\"}"
]
freq = (
p1
| beam.Create(inputs)
| beam.Map(print)
| beam.Map(map_to_dict)
| 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
custom_gcs_temp_location='gs://XXXX-stage'
'-xxxx/temp',
schema=trips_schema, project='xxxxxxxx-xxx-2',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
)
p1.run()
Example with a Json
file from GCS
:
input.json :
{"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
{"vendor_id":34,"trip_id": 1000475,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query
def map_to_dict(input: str) -> Dict:
return json.loads(input)
p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
freq = (
p1
| ReadFromText("gs://your-bucket/object/input.json")
| beam.Map(print)
| beam.Map(map_to_dict)
| 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
custom_gcs_temp_location='gs://XXXX-stage'
'-xxxx/temp',
schema=trips_schema, project='xxxxxxxx-xxx-2',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
)
p1.run()
The following conditions need to be respected :
- You need to transform your string to Dict before to save the
PCollection
toBigQuery
- The
Json
object should match exactly the schema of theBigQuery
table. You can ignore extra fields with this parameter in theWriteToBigQuery
output connector :ignore_unknown_columns
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论