英文:
Dask/pandas apply function and return multiple rows
问题
在Dask的map_partitions
函数中返回一个DataFrame是一个有挑战性的任务,因为它要求每个分区产生一个DataFrame,最后将这些分区合并成一个完整的DataFrame。你提供的示例代码中,myfunc
函数返回的是一个Pandas DataFrame,而不是Dask DataFrame,这会导致合并时出现问题。
为了实现你期望的结果,你可以使用dask.delayed
来处理每个分区的计算,然后将它们合并成一个新的Dask DataFrame。下面是修改后的示例代码:
import pandas as pd
import dask.dataframe as dd
import dask
import numpy as np
def myfunc():
data1 = np.random.uniform(low=0, high=25, size=(5,))
data2 = np.random.uniform(low=0, high=25, size=(5,))
# Just an example DataFrame to show
df = pd.DataFrame([data1, data2])
return df
df = pd.DataFrame({
'val1': [1, 2, 3, 4, 5],
'val2': [1, 2, 3, 4, 5]
})
ddf = dd.from_pandas(df, npartitions=2)
def process_partition(part):
return dask.delayed(part.apply(lambda x: myfunc(), axis=1))
result = ddf.map_partitions(process_partition, meta=('object'))
result = result.compute()
print('\nDask\n', result)
这样,process_partition
函数会在每个分区上应用 myfunc
,使用 dask.delayed
延迟执行,最后将结果合并成一个新的Dask DataFrame。
请注意,Dask的 compute
函数用于执行延迟操作以获取最终结果。希望这能帮助你得到期望的结果。
英文:
I'm trying to return a dataframe from the dask map_partitions
function. The example code I provided returns a 2 row dataframe in the function. However only 1 row is shown in the end result. Which is in this case only the column name row. I removed the column names in previous test examples but even then only 1 row is shown. I also have this exact same result with pandas only.
How can I make this map_partitions
function return multiple rows (or dataframe with multiple rows) to a new dask dataframe? A solution with dask delayed might even be better. I need to apply this function on every cell of the dataframe and the result should be a complete new dataframe (with more rows) based on every cell of the dataframe.
Current result
Dask
0 0 1 2 3 ...
1 0 1 2 3 ...
2 0 1 2 3 ...
3 0 1 2 3 ...
4 0 1 2 3 ...
Desired result:
Dask
0 1 2 3 4
0 11.760715 14.591147 3.058529 19.868252 22.714292
1 10.601743 21.634348 17.443206 13.619830 13.574586
2 16.346402 2.80519 8.610979 11.656930 23.822052
3 3.100282 17.24039 10.871604 13.625602 22.695311
4 17.240093 23.069574 0.832129 22.055441 3.771150
5 22.676472 23.644936 10.721542 10.563838 17.297389
6 12.54929 0.988218 16.113930 19.572034 7.090997
7 11.76189 10.733782 3.819583 6.998412 14.439809
8 19.371690 5.172882 19.620361 3.148623 23.348465
9 5.924958 14.746566 9.069269 0.560508 15.120616
Example code
import pandas as pd
import dask.dataframe
import numpy as np
def myfunc():
data1 = np.random.uniform(low=0, high=25, size=(5,))
data2 = np.random.uniform(low=0, high=25, size=(5,))
# Just a example dataframe to show
df = pd.DataFrame([data1, data2])
return df
df = pd.DataFrame({
'val1': [1, 2, 3, 4, 5],
'val2': [1, 2, 3, 4, 5]
})
ddf = dask.dataframe.from_pandas(df, npartitions=2)
output = ddf.map_partitions(lambda part: part.apply(lambda x: myfunc(), axis=1), meta=object).compute()
print('\nDask\n',output)
答案1
得分: 1
以下是代码部分的翻译:
import dask.dataframe
import numpy as np
import pandas as pd
def myfunc():
data1 = np.random.uniform(low=0, high=25, size=(5,))
data2 = np.random.uniform(low=0, high=25, size=(5,))
# Just a example dataframe to show
df = pd.DataFrame([data1, data2])
return df
df = pd.DataFrame({"val1": [1, 2, 3, 4, 5], "val2": [1, 2, 3, 4, 5]})
ddf = dask.dataframe.from_pandas(df, npartitions=2)
output = ddf.map_partitions(
lambda part: part.groupby(level=0).apply(lambda x: myfunc()).reset_index(drop=True)
).compute()
print("\nDask\n", output)
# Dask
# 0 1 2 3 4
# 0 7.637879 4.500902 8.425876 6.675338 17.931817
# 1 9.882583 23.298732 24.421378 17.417822 2.893575
# 2 1.896213 1.922125 9.760842 7.247802 14.793969
# 3 7.430976 17.858296 15.876671 9.029288 8.419667
# 4 11.753582 13.832828 12.253387 3.830268 18.501223
# 5 5.305862 16.067552 17.386537 4.970188 3.042073
# 0 24.383611 9.722760 20.581926 24.238883 0.702946
# 1 7.233365 12.857647 12.775054 7.642043 20.058167
# 2 10.934543 2.675608 7.165713 11.392718 11.593768
# 3 18.797131 0.804332 6.857919 15.742198 9.981382
请注意,索引不如您所需,因为在分区内进行了重新索引。有一种方法可以在分区之间获得唯一索引,但为了保持简单(它可能对您的特定用例也不是必需的),这里没有添加。
特别的挑战是pandas.DataFrame.apply
无法很好地处理多行返回的情况,所以我们可以使用上面代码片段中的groupby.apply
来解决这个问题。
英文:
Here's a sample solution that might need adjustment to handle specific situations:
import dask.dataframe
import numpy as np
import pandas as pd
def myfunc():
data1 = np.random.uniform(low=0, high=25, size=(5,))
data2 = np.random.uniform(low=0, high=25, size=(5,))
# Just a example dataframe to show
df = pd.DataFrame([data1, data2])
return df
df = pd.DataFrame({"val1": [1, 2, 3, 4, 5], "val2": [1, 2, 3, 4, 5]})
ddf = dask.dataframe.from_pandas(df, npartitions=2)
output = ddf.map_partitions(
lambda part: part.groupby(level=0).apply(lambda x: myfunc()).reset_index(drop=True)
).compute()
print("\nDask\n", output)
# Dask
# 0 1 2 3 4
# 0 7.637879 4.500902 8.425876 6.675338 17.931817
# 1 9.882583 23.298732 24.421378 17.417822 2.893575
# 2 1.896213 1.922125 9.760842 7.247802 14.793969
# 3 7.430976 17.858296 15.876671 9.029288 8.419667
# 4 11.753582 13.832828 12.253387 3.830268 18.501223
# 5 5.305862 16.067552 17.386537 4.970188 3.042073
# 0 24.383611 9.722760 20.581926 24.238883 0.702946
# 1 7.233365 12.857647 12.775054 7.642043 20.058167
# 2 10.934543 2.675608 7.165713 11.392718 11.593768
# 3 18.797131 0.804332 6.857919 15.742198 9.981382
Note that the index is not as you desire because re-indexing is happening within a partition. There is a way to get unique index across partitions, but it's not added here to keep things simple (it might also be not essential for your specific use-case).
The particular challenge is that pandas.DataFrame.apply
does not handle nicely the case of multi-row returns, so we can work around it with groupby.apply
as in the snippet above.
答案2
得分: 1
你可以在打印之前使用 pandas concat()
来将输出结果展平:
output = pd.concat(output.to_list(), ignore_index=True)
print('Dask', output)
这将以单个数据框的形式呈现结果:
输出(随机数据,但结果应该类似所期望的):
Dask
0 1 2 3 4
0 5.718550 6.237734 21.148321 23.136265 11.644001
1 4.154657 12.591685 11.868645 11.260228 3.802258
2 6.688080 6.709124 9.170346 12.900095 7.538030
3 16.818043 18.826502 23.405016 15.024944 24.822155
4 4.405004 22.673484 11.130296 1.411436 21.202253
5 6.420442 1.414739 2.240358 7.151456 4.942321
6 7.443220 21.675140 20.287533 11.467862 12.751785
7 17.511607 17.788686 17.326715 24.051668 4.398992
8 0.881609 8.175566 23.253465 8.862715 19.432905
9 2.645422 10.262120 23.801481 16.172546 18.551709
英文:
TRY: You can just flatten the output result before printing using pandas concat()
output = pd.concat(output.to_list(), ignore_index=True)
print('\nDask\n',output)
This would give the result in a single dataframe
Output (Random data but the result should be like the desired):
Dask
0 1 2 3 4
0 5.718550 6.237734 21.148321 23.136265 11.644001
1 4.154657 12.591685 11.868645 11.260228 3.802258
2 6.688080 6.709124 9.170346 12.900095 7.538030
3 16.818043 18.826502 23.405016 15.024944 24.822155
4 4.405004 22.673484 11.130296 1.411436 21.202253
5 6.420442 1.414739 2.240358 7.151456 4.942321
6 7.443220 21.675140 20.287533 11.467862 12.751785
7 17.511607 17.788686 17.326715 24.051668 4.398992
8 0.881609 8.175566 23.253465 8.862715 19.432905
9 2.645422 10.262120 23.801481 16.172546 18.551709
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论