pandas dataframe:如何更新Hive表中的特定行

huangapple go评论102阅读模式
英文:

pandas dataframe : how to update specific rows in hive table

问题

我想更新Hive表中的单个列。以下是我选择数据的方式:

  1. from datetime import timedelta, date
  2. import pandas as pd
  3. from pyspark import SparkContext
  4. from pyspark.sql import SQLContext,HiveContext
  5. from pyspark.sql import SparkSession
  6. import os
  7. sc = SparkContext.getOrCreate()
  8. spark = SparkSession.builder.appName("job1").enableHiveSupport().getOrCreate()
  9. test1=spark.sql("""
  10. select * from myPartitionedTable
  11. where part1='adqwf' and
  12. part2='avgewg' and
  13. col2='filter_condition'
  14. """)
  15. import pandas as pd
  16. import json
  17. import urllib.parse
  18. spark.conf.set("spark.sql.execution.arrow.enabled", "true")
  19. pandasDF = test1.toPandas()
  20. smvr={}
  21. for index, row in pandasDF.iterrows():
  22. newpr = []
  23. pr = pandasDF.iloc[index]['col1']
  24. if pr:
  25. for p in pr:
  26. newp = p.asDict()
  27. newp['attribute1']=p.attribute1+'_suffix'
  28. newpr.append(newp)
  29. print(newpr[0])
  30. pandasDF.iloc[index]['attribute1'][0]=newpr[0]
  31. print(pandasDF.iloc[index]['attribute1'][0])
  32. break

上述代码段中,我能够看到:

print(pandasDF.iloc[index]['attribute1'][0])

具有更新后的值。现在,如何更新底层的Hive表呢?我在网上查看了一些资源,出现了以下建议:

  1. spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
  2. data.write.mode("overwrite").insertInto("partitioned_table")

这可能有效,但请注意,在我的选择查询中,我有

col2='filter_condition'

所以基本上我不想覆盖整个分区,而只想更新我选择的行。

使用上述方法是否可行?

英文:

I want to update a single column in a hive table. Here is how i am selecting the data

  1. from datetime import timedelta, date
  2. import pandas as pd
  3. from pyspark import SparkContext
  4. from pyspark.sql import SQLContext,HiveContext
  5. from pyspark.sql import SparkSession
  6. import os
  7. sc = SparkContext.getOrCreate()
  8. spark = SparkSession.builder.appName("job1").enableHiveSupport().getOrCreate()
  9. test1=spark.sql("""
  10. select * from myPartitionedTable
  11. where part1='adqwf' and
  12. part2='avgewg' and
  13. col2='filter_condition'
  14. """)
  15. import pandas as pd
  16. import json
  17. import urllib.parse
  18. spark.conf.set("spark.sql.execution.arrow.enabled", "true")
  19. pandasDF = test1.toPandas()
  20. smvr={}
  21. for index, row in pandasDF.iterrows():
  22. newpr = []
  23. pr = pandasDF.iloc[index]['col1']
  24. if pr:
  25. for p in pr:
  26. newp = p.asDict()
  27. newp['attribute1']=p.attribute1+'_suffix'
  28. newpr.append(newp)
  29. print(newpr[0])
  30. pandasDF.iloc[index]['attribute1'][0]=newpr[0]
  31. print(pandasDF.iloc[index]['attribute1'][0])
  32. break

so with the above snippet, i am able to see that

print(pandasDF.iloc[index]['attribute1'][0])

has the updated value. Now, how can i update the underlying hive table as well? I was going through some resources online and the following suggestions appeared

  1. spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
  2. data.write.mode("overwrite").insertInto("partitioned_table")

this could work but note that in my select query i have

  1. col2='filter_condition'

so basically i dont want to overwrite the entire partition, but only the rows that i have selected.

Is that possible with the above approach?

答案1

得分: 1

需要修改你的代码来更新pd.DataFrame中的新值,然后使用SparkSessioncreateDataFrame()方法将更新后的pd.DataFrame转换为Spark DataFrame

  1. from pyspark.sql.functions import col
  2. from pyspark.sql.types import StructType, StructField, StringType
  3. # 定义DataFrame的模式
  4. schema = StructType([
  5. StructField("col1", StringType()),
  6. StructField("attribute1", StringType()),
  7. StructField("products", StringType()),
  8. StructField("part1", StringType()),
  9. StructField("part2", StringType())
  10. ])
  11. # 使用更新后的值创建新的DataFrame
  12. updated_df = spark.createDataFrame(
  13. pandasDF,
  14. schema=schema
  15. ).withColumn("products", col("products").cast("array<struct<attribute1:string,attribute2:string>>"))
  16. # 启用动态分区覆盖模式
  17. spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
  18. # 仅覆盖Hive表中选择的行
  19. updated_df.write.mode("overwrite").partitionBy("part1", "part2").format("hive").saveAsTable("myPartitionedTable")
英文:

I think I understand this right, but I am unable to test it. Let me know how this works.

You need to modify your code to update the pd.DataFrame with the new values, and then convert the updated pd.DataFrame back to a Spark DataFrame using the createDataFrame() method of the SparkSession.

  1. from pyspark.sql.functions import col
  2. from pyspark.sql.types import StructType, StructField, StringType
  3. # Define the schema of your DataFrame
  4. schema = StructType([
  5. StructField(&quot;col1&quot;, StringType()),
  6. StructField(&quot;attribute1&quot;, StringType()),
  7. StructField(&quot;products&quot;, StringType()),
  8. StructField(&quot;part1&quot;, StringType()),
  9. StructField(&quot;part2&quot;, StringType())
  10. ])
  11. # Create a new DataFrame with the updated values
  12. updated_df = spark.createDataFrame(
  13. pandasDF,
  14. schema=schema
  15. ).withColumn(&quot;products&quot;, col(&quot;products&quot;).cast(&quot;array&lt;struct&lt;attribute1:string,attribute2:string&gt;&gt;&quot;))
  16. # Enable dynamic partition overwrite mode
  17. spark.conf.set(&quot;spark.sql.sources.partitionOverwriteMode&quot;, &quot;dynamic&quot;)
  18. # Overwrite only the selected rows in your Hive table
  19. updated_df.write.mode(&quot;overwrite&quot;).partitionBy(&quot;part1&quot;, &quot;part2&quot;).format(&quot;hive&quot;).saveAsTable(&quot;myPartitionedTable&quot;)

huangapple
  • 本文由 发表于 2023年4月4日 09:45:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/75924901.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定