使用INSERT INTO合并Spark数据框

huangapple go评论57阅读模式
英文:

Merging a Spark DF using INSERT INTO

问题

我有一个现有的表格,我想要添加两列。我创建一个Spark数据框:

spark_df = spark.createDataFrame(df)

然后我想要使用MERGE INTO,如下所示:

spark.sql(f"""MERGE INTO x.y AS m
USING {spark_df} AS s
ON m.id = s.id
WHEN MATCHED THEN
 UPDATE SET m.id = s.id
WHEN NOT MATCHED 
 THEN INSERT (id, colnew_1) VALUES (id, spark_df[["myCol"]])""")

在尝试解析spark_df时,我遇到了语法错误。这个功能是否可行?我了解首先需要创建一个Delta表,以支持MERGE操作。然而,我对事件的顺序有点困惑。例如,我可以这样创建一个Delta表:

CREATE TABLE x.y_delta (id bigint, colnew_1 bigint) USING delta

但是这个表是空的。我想一个中间步骤是完全复制原始表到这个新的Delta表,然后相应地使用这个Delta表。但我不确定这是否也是正确的。

英文:

I have an existing table, which I'd like to append two columns to. I create a Spark dataframe:

spark_df = spark.createDataFrame(df)

Then I'd like to use MERGE INTO as so:

spark.sql(f"""MERGE INTO x.y AS m
USING {spark_df} AS s
ON m.id = s.id
WHEN MATCHED THEN
 UPDATE SET m.id = s.id
WHEN NOT MATCHED 
 THEN INSERT (id, colnew_1) VALUES (id, spark_df[["myCol"]])""")

I retrieve a syntax error when trying to parse the spark_df. Is this functionality possible? I understand that a Delta table is to be created first, so that the MERGE operation is supported. However I'm a bit confused on the sequence of events. For example, I can create a delta table like so:

CREATE TABLE x.y_delta (id bigint, colnew_1 bigint) USING delta

However this table is empty. I suppose an intermediate step is to completely copy the original table, to this new delta table. Then use this delta table accordingly. Though I'm not convinced that this is also right.

答案1

得分: 1

# 建议来自 @blackbishop,为数据帧创建临时视图。
df12.createOrReplaceTempView("temp_table1")

# 我遵循了相同的建议,它运行正常。按照以下步骤操作:

**代码**

>  示例数据帧 **df12**

from pyspark.sql import types as f
df12 = spark.createDataFrame(
    [
        (1, "vam", 400),  
        (2, "gov", 456)
    ],
    f.StructType(  
        [
            f.StructField("id", f.IntegerType(), True),
            f.StructField("col1", f.StringType(), True),
            f.StructField("myCol", f.IntegerType(), True)
        ]
    ),
)

> 创建 Delta 表

spark.sql("CREATE TABLE x.y_delta2 (id int, col1 string, myCol int) USING delta")
spark.sql("insert into x.y_delta2 values (1, 'govind', 123), (3, 'deep', 456)")

![在此输入图片描述](https://i.stack.imgur.com/DGnGD.png)

> **创建临时视图**

df12.createOrReplaceTempView("temp_table1")

> **合并操作**

spark.sql(f"""MERGE INTO x.y_delta2 AS m
USING temp_table1 AS s
ON m.id = s.id
WHEN MATCHED THEN
 UPDATE SET m.id = s.id
WHEN NOT MATCHED 
 THEN INSERT (m.id, m.col1, m.myCol) VALUES (s.id, s.col1, s.myCol)""")
![在此输入图片描述](https://i.stack.imgur.com/07wZs.png)
英文:

As suggested by @blackbishop, Create temp view for the data frame.

df12.createOrReplaceTempView("temp_table1")

I followed the same suggestion, its working fine .Follow below steps:

Code:

> Sample data frame df12:

from pyspark.sql import types as f
df12 = spark.createDataFrame(
    [
        (1,"vam",400),  
        (2,"gov",456)
    ],
    f.StructType(  
        [
            f.StructField("id", f.IntegerType(), True),
            f.StructField("col1", f.StringType(), True),
            f.StructField("myCol", f.IntegerType(), True)
        ]
    ),
)

> Create Delta table :

spark.sql("CREATE TABLE x.y_delta2 (id int, col1 string, myCol int) USING delta")
spark.sql("insert into x.y_delta2 values (1,'govind',123),(3,'deep',456)")

使用INSERT INTO合并Spark数据框

> Create Temp View

df12.createOrReplaceTempView("temp_table1")

> Merge operation:

spark.sql(f"""MERGE INTO x.y_delta2 AS m
USING temp_table1 AS s
ON m.id = s.id
WHEN MATCHED THEN
 UPDATE SET m.id = s.id
WHEN NOT MATCHED 
 THEN INSERT (m.id,m.col1,m.myCol) VALUES (s.id,s.col1,s.myCol)""")

使用INSERT INTO合并Spark数据框

huangapple
  • 本文由 发表于 2023年2月23日 22:55:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/75546501.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定