如何批量将数据追加到分区表中,覆盖摄取时间分区。

huangapple go评论51阅读模式
英文:

How to append data into a partitioned table in bulk, overriding ingestion time partitioning

问题

The target table has ingestion time partitioning (PARTITION BY DATE(_PARTITIONTIME)). Logging information is flowing into this table.

我想加载具有相同架构的历史记录数据,并根据特定 TIMESTAMP 列的日期部分进行分区。即,我想覆盖摄取时间分区的机制,同时保留分区 ID 的数据类型。

当我尝试使用 bq load 加载数据时,bq 抱怨分区不兼容:

bq --project_id=myProj \
load \
        --time_partitioning_field=span.endTime \
        --time_partitioning_type=DAY \
        --source_format=PARQUET \
        myDataset.test_junk \
        gs://mybucket/test/export-*.parquet

结果:

E0419 17:06:29.304003 140704289732224 bq_utils.py:244] BigQuery error in load operation: Error processing job 'myProj:bqjob_r4bd44de633ecd32f_000001879b5714b9_1': Incompatible table partitioning specification. Expects partitioning specification interval(type:day), but input partitioning specification is interval(type:day,field:span.endTime)

有没有比编写此StackOverflow帖子中的自定义代码更容易实现我的目标的方法?

英文:

The target table has ingestion time partitioning (PARTITION BY DATE(_PARTITIONTIME)). Logging information is flowing into this table.

I'd like to load historical logging data having the same schema and have it be partitioned based on the date portion of a specific TIMESTAMP column. I.e., I'd like to override the mechanism of the ingestion time partitioning while retaining the data type of the partition id.

When I try to load the data with bq load, bq complains that the partitioning is incompatible:

bq --project_id=myProj \
load \
        --time_partitioning_field=span.endTime \
        --time_partitioning_type=DAY \
        --source_format=PARQUET \
        myDataset.test_junk \
        gs://mybucket/test/export-*.parquet

The result:

E0419 17:06:29.304003 140704289732224 bq_utils.py:244] BigQuery error in load operation: Error processing job 'myProj:bqjob_r4bd44de633ecd32f_000001879b5714b9_1': Incompatible table partitioning specification. Expects partitioning specification interval(type:day), but input partitioning specification is
interval(type:day,field:span.endTime)

Is there any easier way to accomplish my goal than writing custom code per this stackoverflow post?

答案1

得分: 1

以下是您要翻译的内容:

如在此文档中提到:

--time_partitioning_type: 启用表上的基于时间的分区,并设置分区类型。可能的值为HOUR,DAY,MONTH和YEAR。在创建分区在DATE、DATETIME或TIMESTAMP列上的表时,此标志是可选的。基于时间的分区的默认分区类型为DAY。您不能更改现有表上的分区规格。

--time_partitioning_field: 用于创建分区表的DATE或TIMESTAMP列。如果启用基于时间的分区但没有提供此值,则会创建一个按摄取时间分区的表。

例如:

gs://mybucket/mydata.parquet加载数据到名为mydataset中的新的摄取时间分区表mytable的命令。

bq load \
--source_format=PARQUET \
--time_partitioning_type=DAY \
mydataset.mytable \
gs://mybucket/mydata.parquet

gs://mybucket/mydata.parquet加载数据到名为mydataset中的分区表mytable的命令。该表以mytimestamp列进行分区。

bq load \
--source_format=PARQUET \
--time_partitioning_field mytimestamp \
mydataset.mytable \
gs://mybucket/mydata.parquet

有关更多信息,您可以参考此文档

您可以参考此文档了解分区表类型的限制

英文:

As mentioned in this document:

--time_partitioning_type: Enables time-based partitioning on a table and sets the partition type. Possible values are HOUR, DAY, MONTH, and YEAR. This flag is optional when you create a table partitioned on a DATE, DATETIME, or TIMESTAMP column. The default partition type for time-based partitioning is DAY. You cannot change the partitioning specification on an existing table.

--time_partitioning_field: The DATE or TIMESTAMP column used to create a partitioned table. If time-based partitioning is enabled without this value, an ingestion-time partitioned table is created.

For example:

The command that loads data from gs://mybucket/mydata.parquet into a new ingestion-time partitioned table named mytable in mydataset.

    bq load \
    --source_format=PARQUET \
    --time_partitioning_type=DAY \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

The command that loads data from gs://mybucket/mydata.parquet into a partitioned table named mytable in mydataset. The table is partitioned on the mytimestamp column.

    bq load \
    --source_format=PARQUET \
    --time_partitioning_field mytimestamp \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

For more information you can refer to this document.

You can refer to this document about the limitations of types of partitioned tables.

答案2

得分: 0

TL/DR: 我不得不使用{table}${partition}的语法来逐个导出和导入分区。

我得出结论,鉴于以下限制,没有办法进行单次导出和单次导入:

  • 源表和目标表都使用摄取时间进行分区。
  • 导入到目标表中的记录必须具有正确的摄取时间,即源表中的原始摄取时间。
  • 数据必须附加到目标表中。

附加限制条件:

  • 源表和目标表位于不同的地区。
  • 模式使用嵌套数据结构,除了JSON格式以外的任何其他方式都无法自动加载。

我本可以使用编程 API,但最终编写了一个程序来生成使用bqgcloudgsutil来逐个导出和导入所有分区的shell脚本。

命令最终看起来像这样:

# 为源设置项目
gcloud config set project {source_project}

# 导出分区 20220325
bq extract --location {source_region} \
--destination_format NEWLINE_DELIMITED_JSON \
--quiet=true \
--format=pretty \
--compression GZIP \
'{source_project}:{dataset}.{table}$20220325' \
gs://{source_bucket}/dump.20220325.json.gz

# ... 以及其他分区的类似操作 ...

# 将所有导出从源复制到目标地区
gsutil cp -r gs://{source_bucket} gs://{destination_bucket}

# 为目标设置项目
gcloud config set project {destination_project}

# 将分区(20220325)加载到目标表
bq load --location={destination_region} \
--source_format=NEWLINE_DELIMITED_JSON \
--quiet=true \
--format=pretty \
--autodetect=false \
'{destination_project}:{dataset}.{table}$20220325' \
gs://{destination_bucket}/dump.20220325.json.gz

# ... 以及其他分区导入的类似操作 ...

令人惊讶的是,使用Parquet时出现了问题:加载失败,出现“提供的模式与表不匹配”。也许可以使用某个标志来实现完美的模式匹配,但由于上述的JSON格式可行,我没有深入挖掘这个问题。

英文:

TL/DR: I had to use the {table}${partition} syntax to dump and load partitions individually.

I came to the conclusion that there was no way to do a single dump and a single load that would work given the constraints:

  • Source and destination tables are both partitioned using ingestion time.
  • Records loaded in the destination table must have the correct ingestion time, that is, the original ingestion time in the source table.
  • Data must be appended to the destination table.

Additional constraints:

  • Source and destination tables in different regions.
  • Schema uses nested data structure that failed to be automatically loaded properly using anything other than JSON format.

I could have used a programmatic API, but I wound up writing a program to generate shell scripts to use bq, gcloud, and gsutil to dump and load all the partitions individually.

Commands wound up looking like:

# Set project for source
gcloud config set project {source_project}

# Extract partition 20220325
bq extract --location {source_region} \
--destination_format NEWLINE_DELIMITED_JSON \
--quiet=true \
--format=pretty \
--compression GZIP \
'{source_project}:{dataset}.{table}$20220325' \
gs://{source_bucket}/dump.20220325.json.gz

# ... and similar for the rest of the partitions ...

# Copy all the dumps from source to destination region
gsutil cp -r gs://{source_bucket} gs://{destination_bucket}

# Set project for destination
gcloud config set project {destination_project}

# Load partition (20220325) into destination table
bq load --location={destination_region} \
--source_format=NEWLINE_DELIMITED_JSON \
--quiet=true \
--format=pretty \
--autodetect=false \
'{destination_project}:{dataset}.{table}$20220325' \
gs://{destination_bucket}/dump.20220325.json.gz

# ... and similar for the rest of the partition dumps ...

I was surprised to run into problems using Parquet: the loads would fail with "Provided Schema does not match Table". It may have been possible to use some flag to get a perfect schema match, but since JSON worked as shown above, I wasn't motivated to dig too deeply.

huangapple
  • 本文由 发表于 2023年4月20日 06:09:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/76059160.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定