如何在写入Delta表时添加新列?

huangapple go评论81阅读模式
英文:

How to add a new column when writing to a Delta table?

问题

You can add a new column to a Delta table in Delta Lake by specifying the overwrite_schema parameter as True when using the write_deltalake function. Here's the relevant part of your Python code with the necessary adjustment:

write_deltalake(
    "s3a://my-bucket/delta-tables/motor",
    df,
    mode="append",
    schema=schema,
    storage_options=storage_options,
    overwrite_schema=True,  # Set this to True
)

By setting overwrite_schema to True, you are telling Delta Lake to update the schema of the Delta table to match the schema of your new data, which includes the additional "pressure" column. This will allow you to add the new column to the existing Delta table without encountering a schema mismatch error.

英文:

I am using delta-rs to write to a Delta table in the Delta Lake. Here is my code:

import time
import numpy as np
import pandas as pd
import pyarrow as pa
from deltalake.writer import write_deltalake

num_rows = 10
timestamp = np.array([time.time() + i * 0.01 for i in range(num_rows)])
current = np.random.rand(num_rows) * 10
voltage = np.random.rand(num_rows) * 100
temperature = np.random.rand(num_rows) * 50
data = {
    "timestamp": timestamp,
    "current": current,
    "voltage": voltage,
    "temperature": temperature,
}
df = pd.DataFrame(data)
storage_options = {
    "AWS_DEFAULT_REGION": "us-west-2",
    "AWS_ACCESS_KEY_ID": "xxx",
    "AWS_SECRET_ACCESS_KEY": "xxx",
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}
schema = pa.schema(
    [
        ("timestamp", pa.float64()),
        ("current", pa.float64()),
        ("voltage", pa.float64()),
        ("temperature", pa.float64()),
    ]
)
write_deltalake(
    "s3a://my-bucket/delta-tables/motor",
    df,
    mode="append",
    schema=schema,
    storage_options=storage_options,
)

Above code successfully wrote the data including 4 columns to a Delta table. I can confirm by Spark SQL:

spark-sql> describe table delta.`s3a://my-bucket/delta-tables/motor`;
23/05/22 06:38:51 WARN ObjectStore: Failed to get database delta, returning NoSuchObjectException
timestamp             double
current               double
voltage               double
temperature           double

# Partitioning
Not partitioned
Time taken: 0.39 seconds, Fetched 7 row(s)

spark-sql> select * from delta . `s3a://my-bucket/delta-tables/motor` limit 10;
23/05/22 07:01:50 WARN ObjectStore: Failed to get database delta, returning NoSuchObjectException
1.683746477029865E9 7.604250297497938 9.421758439102415 72.1927369069416
1.683746477039865E9 0.09092487512480374 17.989035574705202  35.350210012093214
1.683746477049866E9 7.493128659573002 9.390891728445448 48.541259705334625
1.683746477059866E9 2.717780962917138 0.9268887657049119  59.10566692023579
1.683746477069866E9 2.57300442470119  17.486083607683693  47.23521355609355
1.683746477079866E9 2.09432242350117  14.945888123248054  47.125030870747715
1.683746477089866E9 4.136491853926207 16.52334128991138 27.544656909406505
1.6837464770998669E9  1.1299759566741152  5.539831633892187 52.50892511866684
1.6837464771098669E9  0.9626607062002979  8.400536671329352 72.49131313291358
1.6837464771198668E9  7.6866231204656446  4.033915109232906 48.900631068812075
Time taken: 5.925 seconds, Fetched 10 row(s)

Now I am trying to write to the Delta table with a new column pressure:

import time
import numpy as np
import pandas as pd
import pyarrow as pa
from deltalake.writer import write_deltalake

num_rows = 10
timestamp = np.array([time.time() + i * 0.01 for i in range(num_rows)])
current = np.random.rand(num_rows) * 10
voltage = np.random.rand(num_rows) * 100
temperature = np.random.rand(num_rows) * 50
pressure = np.random.rand(num_rows) * 1000
data = {
    "timestamp": timestamp,
    "current": current,
    "voltage": voltage,
    "temperature": temperature,
    "pressure": pressure,
}
df = pd.DataFrame(data)
storage_options = {
    "AWS_DEFAULT_REGION": "us-west-2",
    "AWS_ACCESS_KEY_ID": "xxx",
    "AWS_SECRET_ACCESS_KEY": "xxx",
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}
schema = pa.schema(
    [
        ("timestamp", pa.float64()),
        ("current", pa.float64()),
        ("voltage", pa.float64()),
        ("temperature", pa.float64()),
        ("pressure", pa.float64()), # <- I added this line
    ]
)
write_deltalake(
    "s3a://my-bucket/delta-tables/motor",
    df,
    mode="append",
    schema=schema,
    storage_options=storage_options,
    overwrite_schema=True, # <- Whether add this or not will return same error
)

Note whether adding overwrite_schema=True in the function write_deltalake does not affect the result.

It will throw this error:

...

Traceback (most recent call last):
  File "python3.11/site-packages/deltalake/writer.py", line 180, in write_deltalake
    raise ValueError(
ValueError: Schema of data does not match table schema

Table schema:
timestamp: double
current: double
voltage: double
temperature: double
pressure: double

Data Schema:
timestamp: double
current: double
voltage: double
temperature: double

This error confused me. Because my existing Delta table data schema should have 4 columns. And the new data I want to write has 5 columns. But based on the error, it is opposite.

How can I add a new column in a Delta table? Thanks!

答案1

得分: 0

看起来你需要使用 mode='overwrite' 来使用 overwrite_schema=True。(参见 源代码

这似乎没有很好的文档说明。如果你想在附加数据时添加一列,你需要首先覆盖现有数据,添加列,然后运行附加语句。

英文:

It looks like you need mode='overwrite' to use overwrite_schema=True. (See the source code)

It doesn't seem to be documented well. If you want to add a column when you append, you'd need to overwrite the existing data first, adding the column, and then run the append statement.

答案2

得分: 0

"截止到今天,此功能不受支持。\n\n这是功能请求票 https://github.com/delta-io/delta-rs/issues/1386"

英文:

As of today, this feature is not supported.

Here is the feature request ticket https://github.com/delta-io/delta-rs/issues/1386

huangapple
  • 本文由 发表于 2023年5月22日 15:04:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76303713.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定