pandas dataframe:如何更新Hive表中的特定行

huangapple go评论66阅读模式
英文:

pandas dataframe : how to update specific rows in hive table

问题

我想更新Hive表中的单个列。以下是我选择数据的方式:

from datetime import timedelta, date
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext
from pyspark.sql import SparkSession
import os
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("job1").enableHiveSupport().getOrCreate()

test1=spark.sql("""
select * from myPartitionedTable
where part1='adqwf' and 
      part2='avgewg' and
      col2='filter_condition'
""")
import pandas as pd
import json
import urllib.parse
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandasDF = test1.toPandas()
smvr={}

for index, row in pandasDF.iterrows():
    newpr = []
    pr = pandasDF.iloc[index]['col1']
    if pr:
        
        for p in pr:
            newp = p.asDict()
            newp['attribute1']=p.attribute1+'_suffix'
            newpr.append(newp)
        print(newpr[0])
        pandasDF.iloc[index]['attribute1'][0]=newpr[0]
        print(pandasDF.iloc[index]['attribute1'][0])
        
        break

上述代码段中,我能够看到:

print(pandasDF.iloc[index]['attribute1'][0])

具有更新后的值。现在,如何更新底层的Hive表呢?我在网上查看了一些资源,出现了以下建议:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")

这可能有效,但请注意,在我的选择查询中,我有

col2='filter_condition'

所以基本上我不想覆盖整个分区,而只想更新我选择的行。

使用上述方法是否可行?

英文:

I want to update a single column in a hive table. Here is how i am selecting the data

from datetime import timedelta, date
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext
from pyspark.sql import SparkSession
import os
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("job1").enableHiveSupport().getOrCreate()


test1=spark.sql("""
select * from myPartitionedTable
where part1='adqwf' and 
      part2='avgewg' and
      col2='filter_condition'
""")

import pandas as pd
import json
import urllib.parse
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandasDF = test1.toPandas()
smvr={}

for index, row in pandasDF.iterrows():
    newpr = []
    pr = pandasDF.iloc[index]['col1']
    if pr:
        
        for p in pr:
            newp = p.asDict()
            newp['attribute1']=p.attribute1+'_suffix'
            newpr.append(newp)
        print(newpr[0])
        pandasDF.iloc[index]['attribute1'][0]=newpr[0]
        print(pandasDF.iloc[index]['attribute1'][0])
        
        break

so with the above snippet, i am able to see that

print(pandasDF.iloc[index]['attribute1'][0])

has the updated value. Now, how can i update the underlying hive table as well? I was going through some resources online and the following suggestions appeared

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")

this could work but note that in my select query i have

      col2='filter_condition'

so basically i dont want to overwrite the entire partition, but only the rows that i have selected.

Is that possible with the above approach?

答案1

得分: 1

需要修改你的代码来更新pd.DataFrame中的新值,然后使用SparkSessioncreateDataFrame()方法将更新后的pd.DataFrame转换为Spark DataFrame

from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType

# 定义DataFrame的模式
schema = StructType([
    StructField("col1", StringType()),
    StructField("attribute1", StringType()),
    StructField("products", StringType()),
    StructField("part1", StringType()),
    StructField("part2", StringType())
])

# 使用更新后的值创建新的DataFrame
updated_df = spark.createDataFrame(
    pandasDF,
    schema=schema
).withColumn("products", col("products").cast("array<struct<attribute1:string,attribute2:string>>"))

# 启用动态分区覆盖模式
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

# 仅覆盖Hive表中选择的行
updated_df.write.mode("overwrite").partitionBy("part1", "part2").format("hive").saveAsTable("myPartitionedTable")
英文:

I think I understand this right, but I am unable to test it. Let me know how this works.

You need to modify your code to update the pd.DataFrame with the new values, and then convert the updated pd.DataFrame back to a Spark DataFrame using the createDataFrame() method of the SparkSession.

from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType

# Define the schema of your DataFrame
schema = StructType([
    StructField(&quot;col1&quot;, StringType()),
    StructField(&quot;attribute1&quot;, StringType()),
    StructField(&quot;products&quot;, StringType()),
    StructField(&quot;part1&quot;, StringType()),
    StructField(&quot;part2&quot;, StringType())
])

# Create a new DataFrame with the updated values
updated_df = spark.createDataFrame(
    pandasDF,
    schema=schema
).withColumn(&quot;products&quot;, col(&quot;products&quot;).cast(&quot;array&lt;struct&lt;attribute1:string,attribute2:string&gt;&gt;&quot;))

# Enable dynamic partition overwrite mode
spark.conf.set(&quot;spark.sql.sources.partitionOverwriteMode&quot;, &quot;dynamic&quot;)

# Overwrite only the selected rows in your Hive table
updated_df.write.mode(&quot;overwrite&quot;).partitionBy(&quot;part1&quot;, &quot;part2&quot;).format(&quot;hive&quot;).saveAsTable(&quot;myPartitionedTable&quot;)

huangapple
  • 本文由 发表于 2023年4月4日 09:45:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/75924901.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定