英文:
pandas dataframe : how to update specific rows in hive table
问题
我想更新Hive表中的单个列。以下是我选择数据的方式:
from datetime import timedelta, date
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext
from pyspark.sql import SparkSession
import os
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("job1").enableHiveSupport().getOrCreate()
test1=spark.sql("""
select * from myPartitionedTable
where part1='adqwf' and
part2='avgewg' and
col2='filter_condition'
""")
import pandas as pd
import json
import urllib.parse
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandasDF = test1.toPandas()
smvr={}
for index, row in pandasDF.iterrows():
newpr = []
pr = pandasDF.iloc[index]['col1']
if pr:
for p in pr:
newp = p.asDict()
newp['attribute1']=p.attribute1+'_suffix'
newpr.append(newp)
print(newpr[0])
pandasDF.iloc[index]['attribute1'][0]=newpr[0]
print(pandasDF.iloc[index]['attribute1'][0])
break
上述代码段中,我能够看到:
print(pandasDF.iloc[index]['attribute1'][0])
具有更新后的值。现在,如何更新底层的Hive表呢?我在网上查看了一些资源,出现了以下建议:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
这可能有效,但请注意,在我的选择查询中,我有
col2='filter_condition'
所以基本上我不想覆盖整个分区,而只想更新我选择的行。
使用上述方法是否可行?
英文:
I want to update a single column in a hive table. Here is how i am selecting the data
from datetime import timedelta, date
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext
from pyspark.sql import SparkSession
import os
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("job1").enableHiveSupport().getOrCreate()
test1=spark.sql("""
select * from myPartitionedTable
where part1='adqwf' and
part2='avgewg' and
col2='filter_condition'
""")
import pandas as pd
import json
import urllib.parse
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandasDF = test1.toPandas()
smvr={}
for index, row in pandasDF.iterrows():
newpr = []
pr = pandasDF.iloc[index]['col1']
if pr:
for p in pr:
newp = p.asDict()
newp['attribute1']=p.attribute1+'_suffix'
newpr.append(newp)
print(newpr[0])
pandasDF.iloc[index]['attribute1'][0]=newpr[0]
print(pandasDF.iloc[index]['attribute1'][0])
break
so with the above snippet, i am able to see that
print(pandasDF.iloc[index]['attribute1'][0])
has the updated value. Now, how can i update the underlying hive table as well? I was going through some resources online and the following suggestions appeared
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
this could work but note that in my select query i have
col2='filter_condition'
so basically i dont want to overwrite the entire partition, but only the rows that i have selected.
Is that possible with the above approach?
答案1
得分: 1
需要修改你的代码来更新pd.DataFrame
中的新值,然后使用SparkSession
的createDataFrame()
方法将更新后的pd.DataFrame
转换为Spark DataFrame
。
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
# 定义DataFrame的模式
schema = StructType([
StructField("col1", StringType()),
StructField("attribute1", StringType()),
StructField("products", StringType()),
StructField("part1", StringType()),
StructField("part2", StringType())
])
# 使用更新后的值创建新的DataFrame
updated_df = spark.createDataFrame(
pandasDF,
schema=schema
).withColumn("products", col("products").cast("array<struct<attribute1:string,attribute2:string>>"))
# 启用动态分区覆盖模式
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
# 仅覆盖Hive表中选择的行
updated_df.write.mode("overwrite").partitionBy("part1", "part2").format("hive").saveAsTable("myPartitionedTable")
英文:
I think I understand this right, but I am unable to test it. Let me know how this works.
You need to modify your code to update the pd.DataFrame
with the new values, and then convert the updated pd.DataFrame
back to a Spark DataFrame
using the createDataFrame()
method of the SparkSession
.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
# Define the schema of your DataFrame
schema = StructType([
StructField("col1", StringType()),
StructField("attribute1", StringType()),
StructField("products", StringType()),
StructField("part1", StringType()),
StructField("part2", StringType())
])
# Create a new DataFrame with the updated values
updated_df = spark.createDataFrame(
pandasDF,
schema=schema
).withColumn("products", col("products").cast("array<struct<attribute1:string,attribute2:string>>"))
# Enable dynamic partition overwrite mode
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
# Overwrite only the selected rows in your Hive table
updated_df.write.mode("overwrite").partitionBy("part1", "part2").format("hive").saveAsTable("myPartitionedTable")
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论