从Pyspark数据帧中创建字典时显示OutOfMemoryError: Java堆空间。

huangapple go评论90阅读模式
英文:

Creating dictionary from Pyspark dataframe showing OutOfMemoryError: Java heap space

问题

以下是翻译好的部分:

我已经阅读并尝试了许多关于此问题的现有StackOverflow帖子,但都没有起作用。我猜我的JAVA堆空间对于我的大型数据集来说不如预期,**我的数据集包含650万行。我的Linux实例配有64GB内存和4个内核**。根据这个[suggestion][1],我需要修复我的代码,但我认为从pyspark dataframe创建字典不应该很昂贵。如果有其他计算方法,请给我建议。

我只想从我的pyspark dataframe创建一个Python字典,这是我的pyspark dataframe的内容,

`property_sql_df.show()`显示,

    +--------------+------------+--------------------+--------------------+
    |            id|country_code|       name|          hash_of_cc_pn_li|
    +--------------+------------+--------------------+--------------------+
    |  BOND-9129450|          US|Scotron Home w/Ga...|90cb0946cf4139e12...|
    |  BOND-1742850|          US|Sited in the Mead...|d5c301f00e9966483...|
    |  BOND-3211356|          US|NEW LISTING - Com...|811fa26e240d726ec...|
    |  BOND-7630290|          US|EC277- 9 Bedroom ...|d5c301f00e9966483...|
    |  BOND-7175508|          US|East Hampton Retr...|90cb0946cf4139e12...|
    +--------------+------------+--------------------+--------------------+

我想要的是制作一个字典,其中hash_of_cc_pn_li作为**键**,id作为**值列表**。

**期望的输出**

    {
      "90cb0946cf4139e12": ["BOND-9129450", "BOND-7175508"],
      "d5c301f00e9966483": ["BOND-1742850", "BOND-7630290"]
    }

**我目前尝试过的**

    %%time
    duplicate_property_list = {}
    for ind in property_sql_df.collect(): 
         hashed_value = ind.hash_of_cc_pn_li
         property_id = ind.id
         if hashed_value in duplicate_property_list:
             duplicate_property_list[hashed_value].append(property_id) 
         else:
             duplicate_property_list[hashed_value] = [property_id] 

**目前在控制台上得到的是**

> java.lang.OutOfMemoryError: Java堆空间

并在**Jupyter笔记本输出**上显示此错误

    ERROR:py4j.java_gateway:尝试连接到Java服务器时发生错误(127.0.0.1:33097)

  [1]: https://stackoverflow.com/questions/37335/how-to-deal-with-java-lang-outofmemoryerror-java-heap-space-error
英文:

I have seen and tried many existing StackOverflow posts regarding this issue but none work. I guess my JAVA heap space is not as large as expected for my large dataset, My dataset contains 6.5M rows. My Linux instance contains 64GB Ram with 4 cores. As per this suggestion I need to fix my code but I think making a dictionary from pyspark dataframe should not be very costly. Please advise me if any other way to compute that.

I just want to make a python dictionary from my pyspark dataframe, this is the content of my pyspark dataframe,

property_sql_df.show() shows,

+--------------+------------+--------------------+--------------------+
|            id|country_code|       name|          hash_of_cc_pn_li|
+--------------+------------+--------------------+--------------------+
|  BOND-9129450|          US|Scotron Home w/Ga...|90cb0946cf4139e12...|
|  BOND-1742850|          US|Sited in the Mead...|d5c301f00e9966483...|
|  BOND-3211356|          US|NEW LISTING - Com...|811fa26e240d726ec...|
|  BOND-7630290|          US|EC277- 9 Bedroom ...|d5c301f00e9966483...|
|  BOND-7175508|          US|East Hampton Retr...|90cb0946cf4139e12...|
+--------------+------------+--------------------+--------------------+

What I want is to make a dictionary with hash_of_cc_pn_li as key and id as a list value.

Expected Output

{
  "90cb0946cf4139e12": ["BOND-9129450", "BOND-7175508"]
  "d5c301f00e9966483": ["BOND-1742850","BOND-7630290"]
}

What I have tried so far,

%%time
duplicate_property_list = {}
for ind in property_sql_df.collect(): 
     hashed_value = ind.hash_of_cc_pn_li
     property_id = ind.id
     if hashed_value in duplicate_property_list:
         duplicate_property_list[hashed_value].append(property_id) 
     else:
         duplicate_property_list[hashed_value] = [property_id] 

What I get now on the console:

> java.lang.OutOfMemoryError: Java heap space

and showing this error on Jupyter notebook output

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33097)

答案1

得分: 2

从Pyspark DataFrame创建字典不应该非常昂贵

从运行时角度来看,这是正确的,但这将占用大量的空间。特别是如果你执行 property_sql_df.collect(),此时你会将整个DataFrame加载到驱动程序内存中。在有6.5百万行的情况下,如果每行有10KB或10K个字符,你将使用65GB的内存,而我们甚至还没有涉及到字典。

首先,你可以只收集你需要的列(例如不包括 name 列)。其次,你可以在Spark中的上游进行聚合,这将根据每个 hash_of_cc_pn_li 中有多少个 id 来节省一些空间:

rows = property_sql_df.groupBy("hash_of_cc_pn_li") \
  .agg(collect_set("id").alias("ids")) \
  .collect()

duplicate_property_list = {row.hash_of_cc_pn_li: row.ids for row in rows}
英文:

>making a dictionary from pyspark dataframe should not be very costly

This is true in terms of runtime, but this will easily take up a lot of space. Especially if you're doing property_sql_df.collect(), at which point you're loading your entire dataframe into driver memory. At 6.5M rows, you'll already hit 65GB if each row has 10KB, or 10K characters, and we haven't even gotten to the dictionary yet.

First, you can collect just the columns you need (e.g. not name). Second, you can do the aggregation upstream in Spark, which will save some space depending on how many ids there are per hash_of_cc_pn_li:

rows = property_sql_df.groupBy("hash_of_cc_pn_li") \
  .agg(collect_set("id").alias("ids")) \
  .collect()

duplicate_property_list = { row.hash_of_cc_pn_li: row.ids for row in rows }

答案2

得分: 1

以下是已经翻译好的内容:

这里是如何使用您的数据创建一个示例 DataFrame:

data = [
    ("BOND-9129450", "90cb"),
    ("BOND-1742850", "d5c3"),
    ("BOND-3211356", "811f"),
    ("BOND-7630290", "d5c3"),
    ("BOND-7175508", "90cb"),
]
df = spark.createDataFrame(data, ["id", "hash_of_cc_pn_li"])

让我们在一个 Spark DataFrame 中对数据进行聚合,以限制在驱动节点上收集的行数。我们将使用 quinn 中定义的 two_columns_to_dictionary 函数创建字典。

agg_df = df.groupBy("hash_of_cc_pn_li").agg(F.max("hash_of_cc_pn_li").alias("hash"), F.collect_list("id").alias("id"))
res = quinn.two_columns_to_dictionary(agg_df, "hash", "id")
print(res) # => {'811f': ['BOND-3211356'], 'd5c3': ['BOND-1742850', 'BOND-7630290'], '90cb': ['BOND-9129450', 'BOND-7175508']}

这个方法可能适用于一个相对较小的数据集,大约650万行,但在大数据集上不适用。对于真正微小的 DataFrames,"我认为从 pyspark dataframe 制作字典不应该非常昂贵" 是正确的。从 PySpark DataFrame 制作字典实际上非常昂贵。

PySpark 是一个集群计算框架,通过在集群中的节点上分布数据来获益。当您调用 collect 时,所有数据都会移动到驱动节点,并且工作节点没有任何帮助。当您尝试将过多的数据移动到驱动节点时,将会出现 OutOfMemory 异常。

最好完全避免使用字典,并找出解决问题的不同方法。这是一个很好的问题。

英文:

Here's how to make a sample DataFrame with your data:

data = [
    ("BOND-9129450", "90cb"),
    ("BOND-1742850", "d5c3"),
    ("BOND-3211356", "811f"),
    ("BOND-7630290", "d5c3"),
    ("BOND-7175508", "90cb"),
]
df = spark.createDataFrame(data, ["id", "hash_of_cc_pn_li"])

Let's aggregate the data in a Spark DataFrame to limit the number of rows that are collected on the driver node. We'll use the two_columns_to_dictionary function defined in quinn to create the dictionary.

agg_df = df.groupBy("hash_of_cc_pn_li").agg(F.max("hash_of_cc_pn_li").alias("hash"), F.collect_list("id").alias("id"))
res = quinn.two_columns_to_dictionary(agg_df, "hash", "id")
print(res) # => {'811f': ['BOND-3211356'], 'd5c3': ['BOND-1742850', 'BOND-7630290'], '90cb': ['BOND-9129450', 'BOND-7175508']}

This might work on a relatively small, 6.5 million row dataset, but it won't work on a huge dataset. "I think making a dictionary from pyspark dataframe should not be very costly" is only true for DataFrames that are really tiny. Making a dictionary from a PySpark DataFrame is actually very expensive.

PySpark is a cluster computing framework that benefits from having data spread out across nodes in a cluster. When you call collect all the data is moved to the driver node and the worker nodes don't help. You'll get an OutOfMemory exception whenever you try to move too much data to the driver node.

It's probably best to avoid the dictionary entirely and figure out a different way to solve the problem. Great question.

答案3

得分: 1

从**Spark-2.4**开始,我们可以使用内置函数groupBycollect_listmap_from_arraysto_json来执行这个操作。

示例:

df.show()
#+------------+-----------------+
#|          id| hash_of_cc_pn_li|
#+------------+-----------------+
#|BOND-9129450|90cb0946cf4139e12|
#|BOND-7175508|90cb0946cf4139e12|
#|BOND-1742850|d5c301f00e9966483|
#|BOND-7630290|d5c301f00e9966483|
#+------------+-----------------+
df.groupBy(col("hash_of_cc_pn_li")).\
agg(collect_list(col("id")).alias("id")).\
selectExpr("to_json(map_from_arrays(array(hash_of_cc_pn_li),array(id))) as output").\
show(10,False)
#+-----------------------------------------------------+
#|output                                               |
#+-----------------------------------------------------+
#|{"90cb0946cf4139e12":["BOND-9129450","BOND-7175508"]}|
#|{"d5c301f00e9966483":["BOND-1742850","BOND-7630290"]}|
#+-----------------------------------------------------+

要获取**一个字典**,请使用另一个带有collect_list的agg函数。

df.groupBy(col("hash_of_cc_pn_li")).\
agg(collect_list(col("id")).alias("id")).\
agg(to_json(map_from_arrays(collect_list(col("hash_of_cc_pn_li")),collect_list(col("id")))).alias("output")).\
show(10,False)
#+---------------------------------------------------------------------------------------------------------+
#|output                                                                                                   |
#+---------------------------------------------------------------------------------------------------------+
#|{"90cb0946cf4139e12":["BOND-9129450","BOND-7175508"],"d5c301f00e9966483":["BOND-1742850","BOND-7630290"]}|
#+---------------------------------------------------------------------------------------------------------+
英文:

From Spark-2.4 we can use groupBy,collect_list,map_from_arrays,to_json built in functions for this case.

Example:

df.show()
#+------------+-----------------+
#|          id| hash_of_cc_pn_li|
#+------------+-----------------+
#|BOND-9129450|90cb0946cf4139e12|
#|BOND-7175508|90cb0946cf4139e12|
#|BOND-1742850|d5c301f00e9966483|
#|BOND-7630290|d5c301f00e9966483|
#+------------+-----------------+
df.groupBy(col("hash_of_cc_pn_li")).\
agg(collect_list(col("id")).alias("id")).\
selectExpr("to_json(map_from_arrays(array(hash_of_cc_pn_li),array(id))) as output").\
show(10,False)
#+-----------------------------------------------------+
#|output                                               |
#+-----------------------------------------------------+
#|{"90cb0946cf4139e12":["BOND-9129450","BOND-7175508"]}|
#|{"d5c301f00e9966483":["BOND-1742850","BOND-7630290"]}|
#+-----------------------------------------------------+

To get one dict use another agg with collect_list.

df.groupBy(col("hash_of_cc_pn_li")).\
agg(collect_list(col("id")).alias("id")).\
agg(to_json(map_from_arrays(collect_list(col("hash_of_cc_pn_li")),collect_list(col("id")))).alias("output")).\
show(10,False)
#+---------------------------------------------------------------------------------------------------------+
#|output                                                                                                   |
#+---------------------------------------------------------------------------------------------------------+
#|{"90cb0946cf4139e12":["BOND-9129450","BOND-7175508"],"d5c301f00e9966483":["BOND-1742850","BOND-7630290"]}|
#+---------------------------------------------------------------------------------------------------------+

答案4

得分: 1

添加链接帖子的已接受答案以备后查。该答案通过利用write.json方法解决了问题,并在此处防止了收集过大的数据集到Driver:

> https://stackoverflow.com/a/63111765/12378881

英文:

Adding accepted answer from linked post for posterity. The answer solves the problem by leveraging write.json method and preventing the collection of too-large dataset to the Driver here:

> https://stackoverflow.com/a/63111765/12378881

huangapple
  • 本文由 发表于 2020年7月27日 01:14:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/63103302.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定