从大型Pyspark数据帧创建字典时出现OutOfMemoryError:Java堆空间。

huangapple go评论109阅读模式
英文:

Creating dictionary from large Pyspark dataframe showing OutOfMemoryError: Java heap space

问题

  1. I have seen and tried many [existing][1] StackOverflow posts regarding this issue but none work. I guess my JAVA heap space is not as large as expected for my large dataset, **My dataset contains 6.5M rows. My Linux instance contains 64GB Ram with 4 cores**. As per this [suggestion][1] I need to fix my code but I think making a dictionary from pyspark dataframe should not be very costly. Please advise me if any other way to compute that.
  2. I just want to make a python dictionary from my pyspark dataframe, this is the content of my pyspark dataframe,
  3. `property_sql_df.show()` shows,
  4. +--------------+------------+--------------------+--------------------+
  5. | id|country_code| name| hash_of_cc_pn_li|
  6. +--------------+------------+--------------------+--------------------+
  7. | BOND-9129450| US|Scotron Home w/Ga...|90cb0946cf4139e12...|
  8. | BOND-1742850| US|Sited in the Mead...|d5c301f00e9966483...|
  9. | BOND-3211356| US|NEW LISTING - Com...|811fa26e240d726ec...|
  10. | BOND-7630290| US|EC277- 9 Bedroom ...|d5c301f00e9966483...|
  11. | BOND-7175508| US|East Hampton Retr...|90cb0946cf4139e12...|
  12. +--------------+------------+--------------------+--------------------+
  13. What I want is to make a dictionary with hash_of_cc_pn_li as **key** and id as **a list** value.
  14. **Expected Output**
  15. {
  16. "90cb0946cf4139e12": ["BOND-9129450", "BOND-7175508"]
  17. "d5c301f00e9966483": ["BOND-1742850","BOND-7630290"]
  18. }
  19. **What I have tried so far,**
  20. *Way 1:* causing java.lang.OutOfMemoryError: Java heap space
  21. %%time
  22. duplicate_property_list = {}
  23. for ind in property_sql_df.collect():
  24. hashed_value = ind.hash_of_cc_pn_li
  25. property_id = ind.id
  26. if hashed_value in duplicate_property_list:
  27. duplicate_property_list[hashed_value].append(property_id)
  28. else:
  29. duplicate_property_list[hashed_value] = [property_id]
  30. *Way 2:* Not working because of missing native OFFSET on pyspark
  31. %%time
  32. i = 0
  33. limit = 1000000
  34. for offset in range(0, total_record,limit):
  35. i = i + 1
  36. if i != 1:
  37. offset = offset + 1
  38. duplicate_property_list = {}
  39. duplicate_properties = {}
  40. # Preparing dataframe
  41. url = '''select id, hash_of_cc_pn_li from properties_df LIMIT {} OFFSET {}'''.format(limit,offset)
  42. properties_sql_df = spark.sql(url)
  43. # Grouping dataset
  44. rows = properties_sql_df.groupBy("hash_of_cc_pn_li").agg(F.collect_set("id").alias("ids")).collect()
  45. duplicate_property_list = { row.hash_of_cc_pn_li: row.ids for row in rows }
  46. # Filter a dictionary to keep elements only where duplicate cound
  47. duplicate_properties = filterTheDict(duplicate_property_list, lambda elem : len(elem[1]) >=2)
  48. # Writing to file
  49. with open('duplicate_detected/duplicate_property_list_all_'+str(i)+'.json', 'w') as fp:
  50. json.dump(duplicate_property_list, fp)
  51. **What I get now on the console:**
  52. > java.lang.OutOfMemoryError: Java heap space
  53. and showing this error on **Jupyter notebook output**
  54. ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33097)
  55. [1]: https://stackoverflow.com/questions/37335/how-to-deal-with-java-lang-outofmemoryerror-java-heap-space-error
  56. **This is the followup question that I asked here:** https://stackoverflow.com/questions/63103302/creating-dictionary-from-pyspark-dataframe-showing-outofmemoryerror-java-heap-s
英文:

I have seen and tried many existing StackOverflow posts regarding this issue but none work. I guess my JAVA heap space is not as large as expected for my large dataset, My dataset contains 6.5M rows. My Linux instance contains 64GB Ram with 4 cores. As per this suggestion I need to fix my code but I think making a dictionary from pyspark dataframe should not be very costly. Please advise me if any other way to compute that.

I just want to make a python dictionary from my pyspark dataframe, this is the content of my pyspark dataframe,

property_sql_df.show() shows,

  1. +--------------+------------+--------------------+--------------------+
  2. | id|country_code| name| hash_of_cc_pn_li|
  3. +--------------+------------+--------------------+--------------------+
  4. | BOND-9129450| US|Scotron Home w/Ga...|90cb0946cf4139e12...|
  5. | BOND-1742850| US|Sited in the Mead...|d5c301f00e9966483...|
  6. | BOND-3211356| US|NEW LISTING - Com...|811fa26e240d726ec...|
  7. | BOND-7630290| US|EC277- 9 Bedroom ...|d5c301f00e9966483...|
  8. | BOND-7175508| US|East Hampton Retr...|90cb0946cf4139e12...|
  9. +--------------+------------+--------------------+--------------------+

What I want is to make a dictionary with hash_of_cc_pn_li as key and id as a list value.

Expected Output

  1. {
  2. "90cb0946cf4139e12": ["BOND-9129450", "BOND-7175508"]
  3. "d5c301f00e9966483": ["BOND-1742850","BOND-7630290"]
  4. }

What I have tried so far,

Way 1: causing java.lang.OutOfMemoryError: Java heap space

  1. %%time
  2. duplicate_property_list = {}
  3. for ind in property_sql_df.collect():
  4. hashed_value = ind.hash_of_cc_pn_li
  5. property_id = ind.id
  6. if hashed_value in duplicate_property_list:
  7. duplicate_property_list[hashed_value].append(property_id)
  8. else:
  9. duplicate_property_list[hashed_value] = [property_id]

Way 2: Not working because of missing native OFFSET on pyspark

  1. %%time
  2. i = 0
  3. limit = 1000000
  4. for offset in range(0, total_record,limit):
  5. i = i + 1
  6. if i != 1:
  7. offset = offset + 1
  8. duplicate_property_list = {}
  9. duplicate_properties = {}
  10. # Preparing dataframe
  11. url = '''select id, hash_of_cc_pn_li from properties_df LIMIT {} OFFSET {}'''.format(limit,offset)
  12. properties_sql_df = spark.sql(url)
  13. # Grouping dataset
  14. rows = properties_sql_df.groupBy("hash_of_cc_pn_li").agg(F.collect_set("id").alias("ids")).collect()
  15. duplicate_property_list = { row.hash_of_cc_pn_li: row.ids for row in rows }
  16. # Filter a dictionary to keep elements only where duplicate cound
  17. duplicate_properties = filterTheDict(duplicate_property_list, lambda elem : len(elem[1]) >=2)
  18. # Writing to file
  19. with open('duplicate_detected/duplicate_property_list_all_'+str(i)+'.json', 'w') as fp:
  20. json.dump(duplicate_property_list, fp)

What I get now on the console:

> java.lang.OutOfMemoryError: Java heap space

and showing this error on Jupyter notebook output

  1. ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33097)

This is the followup question that I asked here: https://stackoverflow.com/questions/63103302/creating-dictionary-from-pyspark-dataframe-showing-outofmemoryerror-java-heap-s

答案1

得分: 1

为什么不将尽可能多的数据和处理保留在执行器中,而不是收集到驱动程序中呢?如果我理解正确的话,你可以使用pyspark的转换和聚合功能,直接保存为JSON格式,从而利用执行器,然后将该JSON文件(可能会分区)加载回Python中作为字典。诚然,这会引入IO开销,但这应该可以解决内存溢出错误。逐步操作如下:

  1. import pyspark.sql.functions as f
  2. spark = SparkSession.builder.getOrCreate()
  3. data = [
  4. ("BOND-9129450", "90cb"),
  5. ("BOND-1742850", "d5c3"),
  6. ("BOND-3211356", "811f"),
  7. ("BOND-7630290", "d5c3"),
  8. ("BOND-7175508", "90cb"),
  9. ]
  10. df = spark.createDataFrame(data, ["id", "hash_of_cc_pn_li"])
  11. df.groupBy(
  12. f.col("hash_of_cc_pn_li"),
  13. ).agg(
  14. f.collect_set("id").alias("id") # 如果不关心BOND-XXXXX值的去重,可以在这里使用f.collect_list()
  15. ).write.json("./test.json")

检查输出路径:

  1. ls -l ./test.json
  2. -rw-r--r-- 1 jovyan users 0 7 27 08:29 part-00000-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
  3. -rw-r--r-- 1 jovyan users 50 7 27 08:29 part-00039-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
  4. -rw-r--r-- 1 jovyan users 65 7 27 08:29 part-00043-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
  5. -rw-r--r-- 1 jovyan users 65 7 27 08:29 part-00159-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
  6. -rw-r--r-- 1 jovyan users 0 7 27 08:29 _SUCCESS
  7. _SUCCESS

作为dict加载到Python中:

  1. import json
  2. from glob import glob
  3. data = []
  4. for file_name in glob('./test.json/*.json'):
  5. with open(file_name) as f:
  6. try:
  7. data.append(json.load(f))
  8. except json.JSONDecodeError: # 这里肯定有更好的方法 - 这只是因为某些分区可能为空
  9. pass

最终结果:

  1. {item['hash_of_cc_pn_li']: item['id'] for item in data}
  2. {'d5c3': ['BOND-7630290', 'BOND-1742850'],
  3. '811f': ['BOND-3211356'],
  4. '90cb': ['BOND-9129450', 'BOND-7175508']}

希望这能有所帮助!谢谢你提出这个很好的问题!

英文:

Why not keep as much data and processing in Executors, rather than collecting to Driver? If I understand this correctly, you could use pyspark transformations and aggregations and save directly to JSON, therefore leveraging executors, then load that JSON file (likely partitioned) back into Python as a dictionary. Admittedly, you introduce IO overhead, but this should allow you to get around your OOM heap space errors. Step-by-step:

  1. import pyspark.sql.functions as f
  2. spark = SparkSession.builder.getOrCreate()
  3. data = [
  4. ("BOND-9129450", "90cb"),
  5. ("BOND-1742850", "d5c3"),
  6. ("BOND-3211356", "811f"),
  7. ("BOND-7630290", "d5c3"),
  8. ("BOND-7175508", "90cb"),
  9. ]
  10. df = spark.createDataFrame(data, ["id", "hash_of_cc_pn_li"])
  11. df.groupBy(
  12. f.col("hash_of_cc_pn_li"),
  13. ).agg(
  14. f.collect_set("id").alias("id") # use f.collect_list() here if you're not interested in deduplication of BOND-XXXXX values
  15. ).write.json("./test.json")

Inspecting the output path:

  1. ls -l ./test.json
  2. -rw-r--r-- 1 jovyan users 0 Jul 27 08:29 part-00000-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
  3. -rw-r--r-- 1 jovyan users 50 Jul 27 08:29 part-00039-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
  4. -rw-r--r-- 1 jovyan users 65 Jul 27 08:29 part-00043-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
  5. -rw-r--r-- 1 jovyan users 65 Jul 27 08:29 part-00159-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
  6. -rw-r--r-- 1 jovyan users 0 Jul 27 08:29 _SUCCESS
  7. _SUCCESS

Loading to Python as dict:

  1. import json
  2. from glob import glob
  3. data = []
  4. for file_name in glob('./test.json/*.json'):
  5. with open(file_name) as f:
  6. try:
  7. data.append(json.load(f))
  8. except json.JSONDecodeError: # there is definitely a better way - this is here because some partitions might be empty
  9. pass

Finally

  1. {item['hash_of_cc_pn_li']:item['id'] for item in data}
  2. {'d5c3': ['BOND-7630290', 'BOND-1742850'],
  3. '811f': ['BOND-3211356'],
  4. '90cb': ['BOND-9129450', 'BOND-7175508']}

I hope this helps! Thank you for the good question!

huangapple
  • 本文由 发表于 2020年7月27日 14:28:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/63109775.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定