英文:
How to pass dataframe to different functions with filters and group by
问题
我有一个来自pyspark查询的数据框架。但我想将数据框架传递给pyspark中的两个不同函数。
基本上,我正在做类似于这样的事情,并且希望避免两次运行相同的查询以创建initial_df
。
def post_process_1(df):
df = df_in_concern.filter(第一组筛选和分组条件)
write_to_table_1(df)
def post_process_2(df):
df = df_in_concern.filter(第二组筛选和分组条件)
write_to_table_2(df)
initial_df = df.filter(...).groupby(...).order1(...)
post_process_1(initial_df)
post_process_2(initial_df)
我找到了这篇帖子,其中讨论了与我的问题类似的问题。
https://stackoverflow.com/questions/55870621/pyspark-python-dataframe-reuse-in-different-functions
建议使用createOrReplaceTempView
。但根据我的了解,此函数createOrReplaceTempView
需要查询使用SQL
语法。
在我的示例中,post_process_1
,post_process_2
中的查询(筛选/分组)是使用pyspark完成的。
有没有办法我可以避免两次查询initial_df
并继续在post_process_1
,post_process_2
函数中使用pyspark查询(而不是sql查询)?
英文:
I have a dataframe from a query in pyspark. But I want to pass the dataframe to 2 different functions in pyspark.
Basically, I am doing something like this, and would like to avoid running the same query to create initial_df
twice.
def post_process_1(df):
df = df_in_concern.filter(1st set of filter and group by)
write_to_table_1(df)
def post_process_2(df):
df = df_in_concern.filter(2nd set of filter and group by)
write_to_table_2(df)
initial_df = df.filter(...).groupby(...).order1(...)
post_process_1(initial_df)
post_process_2(initial_df)
I found this post which talks about a problem similar to mine.
https://stackoverflow.com/questions/55870621/pyspark-python-dataframe-reuse-in-different-functions
And, the suggestion is to use createOrReplaceTempView
. But from what I learn, this function createOrReplaceTempView
needs the query to be using SQL
syntax.
And in my example, post_process_1
, post_process_2
, the query (filter/ group by) are done using pyspark.
Is there any way I can avoid querying initial_df
twice and keep using pyspark queries (not sql queries) in my post_process_1
, post_process_2
functions?
答案1
得分: 3
这应该能正常工作,因为在方法的上下文中只对 df 进行了筛选,并没有改变 initial_df 上的任何内容:
def post_process_1(df):
df_ = df.filter(第一组筛选和分组条件)
写入表1(df_)
def post_process_2(df):
df_ = df.filter(第二组筛选和分组条件)
写入表2(df_)
initial_df = df.filter(...).groupby(...).order1(...)
post_process_1(initial_df)
post_process_2(initial_df)
英文:
This should do the work since df is being filtered in the context of the method only and it is not changing anything on initial_df:
def post_process_1(df):
df_ = df.filter(1st set of filter and group by)
write_to_table_1(df_)
def post_process_2(df):
df_ = df.filter(2nd set of filter and group by)
write_to_table_2(df_)
initial_df = df.filter(...).groupby(...).order1(...)
post_process_1(initial_df)
post_process_2(initial_df)
答案2
得分: 1
使用 persist/cache(如果使用缓存,则需要指定所需的存储级别)。这是为了防止重新计算 initial_df。
def post_process_1(df):
df = df_in_concern.filter(第一组筛选条件和分组条件)
写入到表格1(df)
def post_process_2(df):
df = df_in_concern.filter(第二组筛选条件和分组条件)
写入到表格2(df)
initial_df = df.filter(...).groupby(...).order1(...)
initial_df.persist()
post_process_1(initial_df)
post_process_2(initial_df)
英文:
Use persist/cache (with the required storage levels as required if using cache). This is to prevent recomputing of the initial_df
def post_process_1(df):
df = df_in_concern.filter(1st set of filter and group by)
write_to_table_1(df)
def post_process_2(df):
df = df_in_concern.filter(2nd set of filter and group by)
write_to_table_2(df)
initial_df = df.filter(...).groupby(...).order1(...)
initial_df.persist()
post_process_1(initial_df)
post_process_2(initial_df)
</details>
# 答案3
**得分**: 0
这个用例的解决方案是在调用方法之前,使用`persist`来将你的DataFrame保存到存储级别中。
`persist()`方法用于将DataFrame存储到以下存储级别之一:MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER、DISK_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK_2 等等。
存储级别的选择应根据集群配置(内存和磁盘大小)进行。
你可以使用`cache()`,它是`persist(StorageLevel.MEMORY_AND_DISK)`的别名。
下面是不同的存储级别:
- MEMORY_ONLY:这是RDD `cache()` 方法的默认行为,将RDD或DataFrame存储为JVM内存中的反序列化对象。当内存不足时,它不会保存某些分区的DataFrame,这些分区将在需要时重新计算。这占用更多内存,但与RDD不同,它比MEMORY_AND_DISK级别慢,因为它重新计算未保存的分区,并重新计算底层表的内存中的列式表示是昂贵的。
- MEMORY_ONLY_SER:与MEMORY_ONLY相同,但不同之处在于它将RDD存储为JVM内存中的序列化对象。它占用较少内存(空间效率高),因为它将对象保存为序列化对象,但在反序列化时需要额外的CPU周期。
- MEMORY_ONLY_2:与MEMORY_ONLY存储级别相同,但将每个分区复制到两个集群节点。
- MEMORY_ONLY_SER_2:与MEMORY_ONLY_SER存储级别相同,但将每个分区复制到两个集群节点。
- MEMORY_AND_DISK:这是DataFrame或Dataset的默认行为。在此存储级别中,DataFrame将作为反序列化对象存储在JVM内存中。当所需存储空间大于可用内存时,它将某些多余分区存储到磁盘中,并在需要时从磁盘读取数据。它较慢,因为涉及I/O操作。
- MEMORY_AND_DISK_SER:与MEMORY_AND_DISK存储级别相同,不同之处在于当空间不足时,它将DataFrame对象在内存和磁盘上进行序列化。
- MEMORY_AND_DISK_2:与MEMORY_AND_DISK存储级别相同,但将每个分区复制到两个集群节点。
- MEMORY_AND_DISK_SER_2:与MEMORY_AND_DISK_SER存储级别相同,但将每个分区复制到两个集群节点。
- DISK_ONLY:在此存储级别中,DataFrame仅存储在磁盘上,因为涉及I/O操作,所以CPU计算时间较长。
- DISK_ONLY_2:与DISK_ONLY存储级别相同,但将每个分区复制到两个集群节点。
注意:在Python中,存储的对象将始终使用Pickle库进行序列化,因此选择序列化级别无关紧要。Python中可用的存储级别包括MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY、DISK_ONLY_2 和 DISK_ONLY_3。
[Spark官方文档](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
<details>
<summary>英文:</summary>
the solution for this use case would be using persist to save your DataFrame to a storage level before calling your methods.
persist() method is used to store your DataFrame to one of the storage levels MEMORY_ONLY,MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2,MEMORY_AND_DISK_2 and more.
the choice of the storage level should be done depending on your cluster configuration (memory and disk size)
You can use **cache()** which is an alias for **persist(StorageLevel.MEMORY_AND_DISK)**
This is the different storage levels available:
**MEMORY_ONLY** – This is the default behavior of the RDD cache() method and stores the RDD or DataFrame as deserialized objects to JVM memory. When there is not enough memory available it will not save DataFrame of some partitions and these will be re-computed as and when required. This takes more memory. but unlike RDD, this would be slower than MEMORY_AND_DISK level as it recomputes the unsaved partitions, and recomputing the in-memory columnar representation of the underlying table is expensive
**MEMORY_ONLY_SER** – This is the same as MEMORY_ONLY but the difference being it stores RDD as serialized objects to JVM memory. It takes lesser memory (space-efficient) than MEMORY_ONLY as it saves objects as serialized and takes an additional few more CPU cycles in order to deserialize.
**MEMORY_ONLY_2** – Same as MEMORY_ONLY storage level but replicate each partition to two cluster nodes.
**MEMORY_ONLY_SER_2** – Same as MEMORY_ONLY_SER storage level but replicate each partition to two cluster nodes.
**MEMORY_AND_DISK** – This is the default behavior of the DataFrame or Dataset. In this Storage Level, The DataFrame will be stored in JVM memory as a deserialized object. When required storage is greater than available memory, it stores some of the excess partitions into a disk and reads the data from the disk when required. It is slower as there is I/O involved.
**MEMORY_AND_DISK_SER** – This is the same as MEMORY_AND_DISK storage level difference being it serializes the DataFrame objects in memory and on disk when space is not available.
**MEMORY_AND_DISK_2** – Same as MEMORY_AND_DISK storage level but replicate each partition to two cluster nodes.
**MEMORY_AND_DISK_SER_2** – Same as MEMORY_AND_DISK_SER storage level but replicate each partition to two cluster nodes.
**DISK_ONLY** – In this storage level, DataFrame is stored only on disk and the CPU computation time is high as I/O is involved.
**DISK_ONLY_2** – Same as DISK_ONLY storage level but replicate each partition to two cluster nodes.
**Note: In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level. The available storage levels in Python include MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, DISK_ONLY_2, and DISK_ONLY_3.**
[Spark official doc][1]
[1]: https://spark.apache.org/docs/latest/rdd-programming-guide.html
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论