Dataflow – 将 JSON 文件添加到 BigQuery

huangapple go评论69阅读模式
英文:

Dataflow - add JSON file to BigQuery

问题

我正在使用GCP Dataflow进行一些POC,并向BigQuery添加一些JSON对象。

import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query

p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'

freq = (
        p1
        | beam.Create([{"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}])
        | beam.Map(print)
        | 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
                                                                custom_gcs_temp_location='gs://XXXX-stage-xxxx/temp',
                                                                schema=trips_schema, project='xxxxxxxx-xxx-2',
                                                                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
p1.run()

现在,当我运行此代码时,我收到以下错误:

RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_957_6db0f0222c18bf6ef55dfb301cf9b7b2_2e2519daf47e48888bd08fc7661da2e6 failed. Error Result: <ErrorProto
 location: 'gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
 message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
 reason: 'invalid'> [while running 'Write Record to BigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)']

在引用错误的阶段桶中的文件为空

请帮助

<details>
<summary>英文:</summary>

I&#39;m doing some POC with GCP Dataflow and add some JSON object to BigQuery. 

    import apache_beam as beam
    import apache_beam.io.gcp.bigquery as b_query
    
    p1 = beam.Pipeline()
    trips_schema = &#39;trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING&#39;
    
    freq = (
            p1
            | beam.Create([&quot;{\&quot;vendor_id\&quot;:33,\&quot;trip_id\&quot;: 1000474,\&quot;trip_distance\&quot;: 2.3999996185302734,\&quot;fare_amount\&quot;: 42.13,\&quot;store_and_fwd_flag\&quot;: \&quot;Y\&quot;}&quot;])
            | beam.Map(print)
            | &#39;Write Record to BigQuery&#39; &gt;&gt; b_query.WriteToBigQuery(table=&#39;trips2&#39;, dataset=&#39;my_poc&#39;,
                                                                    custom_gcs_temp_location=&#39;gs://XXXX-stage&#39;
                                                                                             &#39;-xxxx/temp&#39;,
                                                                    schema=trips_schema, project=&#39;xxxxxxxx-xxx-2&#39;,
                                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
    )
    p1.run()

Now when I&#39;m running this code, I&#39;m getting the following error:

    RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_957_6db0f0222c18bf6ef55dfb301cf9b7b2_2e2519daf47e48888bd08fc7661da2e6 failed. Error Result: &lt;ErrorProto
     location: &#39;gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c&#39;
     message: &#39;Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c&#39;
     reason: &#39;invalid&#39;&gt; [while running &#39;Write Record to BigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)&#39;]

File in the stage bucket which referred in error contains null. 

Please help.

</details>


# 答案1
**得分**: 2

如果您想使用`Beam``Dataflow``Json`文件插入到`BigQuery`表中您必须以`NEWLINE_DELIMITED_JSON`格式读取`Json`文件示例

您的代码示例以模拟输入为例

```python
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query

def map_to_dict(input: str) -> Dict:
    return json.loads(input)

p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'

inputs: List[str] = [
    '{"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}',
    '{"vendor_id":34,"trip_id": 1000475,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}'
]

freq = (
    p1
    | beam.Create(inputs)
    | beam.Map(print)
    | beam.Map(map_to_dict)
    | 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
                                                            custom_gcs_temp_location='gs://XXXX-stage-xxxx/temp',
                                                            schema=trips_schema, project='xxxxxxxx-xxx-2',
                                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
p1.run()

GCS中的Json文件示例:

input.json:

{"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
{"vendor_id":34,"trip_id": 1000475,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query

def map_to_dict(input: str) -> Dict:
    return json.loads(input)

p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'

freq = (
    p1
    | ReadFromText("gs://your-bucket/object/input.json")
    | beam.Map(print)
    | beam.Map(map_to_dict)
    | 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
                                                            custom_gcs_temp_location='gs://XXXX-stage-xxxx/temp',
                                                            schema=trips_schema, project='xxxxxxxx-xxx-2',
                                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
p1.run()

需要遵守以下条件:

  • 在将PCollection保存到BigQuery之前,需要将字符串转换为字典。
  • Json对象应与BigQuery表的架构完全匹配。您可以在WriteToBigQuery输出连接器中使用此参数来忽略额外的字段:ignore_unknown_columns
英文:

If you want to insert a Json file to a BigQuery table with Beam and Dataflow, you have to read the Json file with NEWLINE_DELIMITED_JSON format, example :

Your code with a mock as input :

import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query

def map_to_dict(input: str) -&gt; Dict:
    return json.loads(input)

p1 = beam.Pipeline()
trips_schema = &#39;trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING&#39;

inputs: List[str] = [
            &quot;{\&quot;vendor_id\&quot;:33,\&quot;trip_id\&quot;: 1000474,\&quot;trip_distance\&quot;: 2.3999996185302734,&quot;&quot;\&quot;fare_amount\&quot;: 42.13,\&quot;store_and_fwd_flag\&quot;: \&quot;Y\&quot;}&quot;,
            &quot;{\&quot;vendor_id\&quot;:34,\&quot;trip_id\&quot;: 1000475,\&quot;trip_distance\&quot;: 2.3999996185302734,&quot;&quot;\&quot;fare_amount\&quot;: 42.13,\&quot;store_and_fwd_flag\&quot;: \&quot;Y\&quot;}&quot;
        ]

freq = (
        p1
        | beam.Create(inputs)
        | beam.Map(print)
        | beam.Map(map_to_dict)
        | &#39;Write Record to BigQuery&#39; &gt;&gt; b_query.WriteToBigQuery(table=&#39;trips2&#39;, dataset=&#39;my_poc&#39;,
                                                                custom_gcs_temp_location=&#39;gs://XXXX-stage&#39;
                                                                                         &#39;-xxxx/temp&#39;,
                                                                schema=trips_schema, project=&#39;xxxxxxxx-xxx-2&#39;,
                                                                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
)
p1.run()

Example with a Json file from GCS :

input.json :

{&quot;vendor_id&quot;:33,&quot;trip_id&quot;: 1000474,&quot;trip_distance&quot;: 2.3999996185302734,&quot;fare_amount&quot;: 42.13,&quot;store_and_fwd_flag&quot;: &quot;Y&quot;}
{&quot;vendor_id&quot;:34,&quot;trip_id&quot;: 1000475,&quot;trip_distance&quot;: 2.3999996185302734,&quot;fare_amount&quot;: 42.13,&quot;store_and_fwd_flag&quot;: &quot;Y&quot;}
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query

def map_to_dict(input: str) -&gt; Dict:
    return json.loads(input)

p1 = beam.Pipeline()
trips_schema = &#39;trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING&#39;

freq = (
        p1
        | ReadFromText(&quot;gs://your-bucket/object/input.json&quot;)
        | beam.Map(print)
        | beam.Map(map_to_dict)
        | &#39;Write Record to BigQuery&#39; &gt;&gt; b_query.WriteToBigQuery(table=&#39;trips2&#39;, dataset=&#39;my_poc&#39;,
                                                                custom_gcs_temp_location=&#39;gs://XXXX-stage&#39;
                                                                                         &#39;-xxxx/temp&#39;,
                                                                schema=trips_schema, project=&#39;xxxxxxxx-xxx-2&#39;,
                                                                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
)
p1.run()

The following conditions need to be respected :

  • You need to transform your string to Dict before to save the PCollection to BigQuery
  • The Json object should match exactly the schema of the BigQuery table. You can ignore extra fields with this parameter in the WriteToBigQuery output connector : ignore_unknown_columns

huangapple
  • 本文由 发表于 2023年6月5日 18:43:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/76405650.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定