英文:
Merging a Spark DF using INSERT INTO
问题
我有一个现有的表格,我想要添加两列。我创建一个Spark数据框:
spark_df = spark.createDataFrame(df)
然后我想要使用MERGE INTO,如下所示:
spark.sql(f"""MERGE INTO x.y AS m
USING {spark_df} AS s
ON m.id = s.id
WHEN MATCHED THEN
UPDATE SET m.id = s.id
WHEN NOT MATCHED
THEN INSERT (id, colnew_1) VALUES (id, spark_df[["myCol"]])""")
在尝试解析spark_df时,我遇到了语法错误。这个功能是否可行?我了解首先需要创建一个Delta表,以支持MERGE操作。然而,我对事件的顺序有点困惑。例如,我可以这样创建一个Delta表:
CREATE TABLE x.y_delta (id bigint, colnew_1 bigint) USING delta
但是这个表是空的。我想一个中间步骤是完全复制原始表到这个新的Delta表,然后相应地使用这个Delta表。但我不确定这是否也是正确的。
英文:
I have an existing table, which I'd like to append two columns to. I create a Spark dataframe:
spark_df = spark.createDataFrame(df)
Then I'd like to use MERGE INTO as so:
spark.sql(f"""MERGE INTO x.y AS m
USING {spark_df} AS s
ON m.id = s.id
WHEN MATCHED THEN
UPDATE SET m.id = s.id
WHEN NOT MATCHED
THEN INSERT (id, colnew_1) VALUES (id, spark_df[["myCol"]])""")
I retrieve a syntax error when trying to parse the spark_df. Is this functionality possible? I understand that a Delta table is to be created first, so that the MERGE operation is supported. However I'm a bit confused on the sequence of events. For example, I can create a delta table like so:
CREATE TABLE x.y_delta (id bigint, colnew_1 bigint) USING delta
However this table is empty. I suppose an intermediate step is to completely copy the original table, to this new delta table. Then use this delta table accordingly. Though I'm not convinced that this is also right.
答案1
得分: 1
# 建议来自 @blackbishop,为数据帧创建临时视图。
df12.createOrReplaceTempView("temp_table1")
# 我遵循了相同的建议,它运行正常。按照以下步骤操作:
**代码:**
> 示例数据帧 **df12**:
from pyspark.sql import types as f
df12 = spark.createDataFrame(
[
(1, "vam", 400),
(2, "gov", 456)
],
f.StructType(
[
f.StructField("id", f.IntegerType(), True),
f.StructField("col1", f.StringType(), True),
f.StructField("myCol", f.IntegerType(), True)
]
),
)
> 创建 Delta 表:
spark.sql("CREATE TABLE x.y_delta2 (id int, col1 string, myCol int) USING delta")
spark.sql("insert into x.y_delta2 values (1, 'govind', 123), (3, 'deep', 456)")
![在此输入图片描述](https://i.stack.imgur.com/DGnGD.png)
> **创建临时视图**
df12.createOrReplaceTempView("temp_table1")
> **合并操作:**
spark.sql(f"""MERGE INTO x.y_delta2 AS m
USING temp_table1 AS s
ON m.id = s.id
WHEN MATCHED THEN
UPDATE SET m.id = s.id
WHEN NOT MATCHED
THEN INSERT (m.id, m.col1, m.myCol) VALUES (s.id, s.col1, s.myCol)""")
![在此输入图片描述](https://i.stack.imgur.com/07wZs.png)
英文:
As suggested by @blackbishop, Create temp view for the data frame.
df12.createOrReplaceTempView("temp_table1")
I followed the same suggestion, its working fine .Follow below steps:
Code:
> Sample data frame df12:
from pyspark.sql import types as f
df12 = spark.createDataFrame(
[
(1,"vam",400),
(2,"gov",456)
],
f.StructType(
[
f.StructField("id", f.IntegerType(), True),
f.StructField("col1", f.StringType(), True),
f.StructField("myCol", f.IntegerType(), True)
]
),
)
> Create Delta table :
spark.sql("CREATE TABLE x.y_delta2 (id int, col1 string, myCol int) USING delta")
spark.sql("insert into x.y_delta2 values (1,'govind',123),(3,'deep',456)")
> Create Temp View
df12.createOrReplaceTempView("temp_table1")
> Merge operation:
spark.sql(f"""MERGE INTO x.y_delta2 AS m
USING temp_table1 AS s
ON m.id = s.id
WHEN MATCHED THEN
UPDATE SET m.id = s.id
WHEN NOT MATCHED
THEN INSERT (m.id,m.col1,m.myCol) VALUES (s.id,s.col1,s.myCol)""")
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论