Dask/pandas应用函数并返回多行

huangapple go评论68阅读模式
英文:

Dask/pandas apply function and return multiple rows

问题

在Dask的map_partitions函数中返回一个DataFrame是一个有挑战性的任务,因为它要求每个分区产生一个DataFrame,最后将这些分区合并成一个完整的DataFrame。你提供的示例代码中,myfunc 函数返回的是一个Pandas DataFrame,而不是Dask DataFrame,这会导致合并时出现问题。

为了实现你期望的结果,你可以使用dask.delayed来处理每个分区的计算,然后将它们合并成一个新的Dask DataFrame。下面是修改后的示例代码:

import pandas as pd
import dask.dataframe as dd
import dask
import numpy as np

def myfunc():
    data1 = np.random.uniform(low=0, high=25, size=(5,))
    data2 = np.random.uniform(low=0, high=25, size=(5,))

    # Just an example DataFrame to show
    df = pd.DataFrame([data1, data2])

    return df

df = pd.DataFrame({
    'val1': [1, 2, 3, 4, 5],
    'val2': [1, 2, 3, 4, 5]
})

ddf = dd.from_pandas(df, npartitions=2)

def process_partition(part):
    return dask.delayed(part.apply(lambda x: myfunc(), axis=1))

result = ddf.map_partitions(process_partition, meta=('object'))
result = result.compute()

print('\nDask\n', result)

这样,process_partition 函数会在每个分区上应用 myfunc,使用 dask.delayed 延迟执行,最后将结果合并成一个新的Dask DataFrame。

请注意,Dask的 compute 函数用于执行延迟操作以获取最终结果。希望这能帮助你得到期望的结果。

英文:

I'm trying to return a dataframe from the dask map_partitions function. The example code I provided returns a 2 row dataframe in the function. However only 1 row is shown in the end result. Which is in this case only the column name row. I removed the column names in previous test examples but even then only 1 row is shown. I also have this exact same result with pandas only.

How can I make this map_partitions function return multiple rows (or dataframe with multiple rows) to a new dask dataframe? A solution with dask delayed might even be better. I need to apply this function on every cell of the dataframe and the result should be a complete new dataframe (with more rows) based on every cell of the dataframe.

Current result

Dask
0               0          1          2          3 ...
1               0          1          2          3 ...
2               0          1          2          3 ...
3               0          1          2          3 ...
4               0          1          2          3 ...

Desired result:

Dask
           0          1          2          3          4
0  11.760715  14.591147   3.058529  19.868252  22.714292
1  10.601743  21.634348  17.443206  13.619830  13.574586
2  16.346402   2.80519    8.610979  11.656930  23.822052
3   3.100282  17.24039   10.871604  13.625602  22.695311
4  17.240093  23.069574   0.832129  22.055441   3.771150
5  22.676472  23.644936  10.721542  10.563838  17.297389
6  12.54929    0.988218  16.113930  19.572034   7.090997
7  11.76189   10.733782   3.819583   6.998412  14.439809
8  19.371690   5.172882  19.620361   3.148623  23.348465
9   5.924958  14.746566   9.069269   0.560508  15.120616

Example code

import pandas as pd
import dask.dataframe
import numpy as np

def myfunc():
    data1 = np.random.uniform(low=0, high=25, size=(5,))
    data2 = np.random.uniform(low=0, high=25, size=(5,))

    # Just a example dataframe to show
    df = pd.DataFrame([data1, data2])
    
    return df

df = pd.DataFrame({
    'val1': [1, 2, 3, 4, 5],
    'val2': [1, 2, 3, 4, 5]
})

ddf = dask.dataframe.from_pandas(df, npartitions=2)

output = ddf.map_partitions(lambda part: part.apply(lambda x: myfunc(), axis=1), meta=object).compute()

print('\nDask\n',output)

答案1

得分: 1

以下是代码部分的翻译:

import dask.dataframe
import numpy as np
import pandas as pd

def myfunc():
    data1 = np.random.uniform(low=0, high=25, size=(5,))
    data2 = np.random.uniform(low=0, high=25, size=(5,))
    
    # Just a example dataframe to show
    df = pd.DataFrame([data1, data2])

    return df

df = pd.DataFrame({"val1": [1, 2, 3, 4, 5], "val2": [1, 2, 3, 4, 5]})

ddf = dask.dataframe.from_pandas(df, npartitions=2)

output = ddf.map_partitions(
    lambda part: part.groupby(level=0).apply(lambda x: myfunc()).reset_index(drop=True)
).compute()

print("\nDask\n", output)

# Dask
#             0          1          2          3          4
# 0   7.637879   4.500902   8.425876   6.675338  17.931817
# 1   9.882583  23.298732  24.421378  17.417822   2.893575
# 2   1.896213   1.922125   9.760842   7.247802  14.793969
# 3   7.430976  17.858296  15.876671   9.029288   8.419667
# 4  11.753582  13.832828  12.253387   3.830268  18.501223
# 5   5.305862  16.067552  17.386537   4.970188   3.042073
# 0  24.383611   9.722760  20.581926  24.238883   0.702946
# 1   7.233365  12.857647  12.775054   7.642043  20.058167
# 2  10.934543   2.675608   7.165713  11.392718  11.593768
# 3  18.797131   0.804332   6.857919  15.742198   9.981382

请注意,索引不如您所需,因为在分区内进行了重新索引。有一种方法可以在分区之间获得唯一索引,但为了保持简单(它可能对您的特定用例也不是必需的),这里没有添加。

特别的挑战是pandas.DataFrame.apply无法很好地处理多行返回的情况,所以我们可以使用上面代码片段中的groupby.apply来解决这个问题。

英文:

Here's a sample solution that might need adjustment to handle specific situations:

import dask.dataframe
import numpy as np
import pandas as pd


def myfunc():
    data1 = np.random.uniform(low=0, high=25, size=(5,))
    data2 = np.random.uniform(low=0, high=25, size=(5,))

    # Just a example dataframe to show
    df = pd.DataFrame([data1, data2])

    return df


df = pd.DataFrame({"val1": [1, 2, 3, 4, 5], "val2": [1, 2, 3, 4, 5]})

ddf = dask.dataframe.from_pandas(df, npartitions=2)

output = ddf.map_partitions(
    lambda part: part.groupby(level=0).apply(lambda x: myfunc()).reset_index(drop=True)
).compute()

print("\nDask\n", output)

# Dask
#             0          1          2          3          4
# 0   7.637879   4.500902   8.425876   6.675338  17.931817
# 1   9.882583  23.298732  24.421378  17.417822   2.893575
# 2   1.896213   1.922125   9.760842   7.247802  14.793969
# 3   7.430976  17.858296  15.876671   9.029288   8.419667
# 4  11.753582  13.832828  12.253387   3.830268  18.501223
# 5   5.305862  16.067552  17.386537   4.970188   3.042073
# 0  24.383611   9.722760  20.581926  24.238883   0.702946
# 1   7.233365  12.857647  12.775054   7.642043  20.058167
# 2  10.934543   2.675608   7.165713  11.392718  11.593768
# 3  18.797131   0.804332   6.857919  15.742198   9.981382

Note that the index is not as you desire because re-indexing is happening within a partition. There is a way to get unique index across partitions, but it's not added here to keep things simple (it might also be not essential for your specific use-case).

The particular challenge is that pandas.DataFrame.apply does not handle nicely the case of multi-row returns, so we can work around it with groupby.apply as in the snippet above.

答案2

得分: 1

你可以在打印之前使用 pandas concat() 来将输出结果展平:

output = pd.concat(output.to_list(), ignore_index=True)

print('Dask', output)

这将以单个数据框的形式呈现结果:

输出(随机数据,但结果应该类似所期望的):

Dask
           0          1          2          3          4
0   5.718550   6.237734  21.148321  23.136265  11.644001 
1   4.154657  12.591685  11.868645  11.260228   3.802258 
2   6.688080   6.709124   9.170346  12.900095   7.538030 
3  16.818043  18.826502  23.405016  15.024944  24.822155 
4   4.405004  22.673484  11.130296   1.411436  21.202253 
5   6.420442   1.414739   2.240358   7.151456   4.942321 
6   7.443220  21.675140  20.287533  11.467862  12.751785 
7  17.511607  17.788686  17.326715  24.051668   4.398992 
8   0.881609   8.175566  23.253465   8.862715  19.432905 
9   2.645422  10.262120  23.801481  16.172546  18.551709
英文:

TRY: You can just flatten the output result before printing using pandas concat()

output = pd.concat(output.to_list(), ignore_index=True)

print('\nDask\n',output)

This would give the result in a single dataframe

Output (Random data but the result should be like the desired):

Dask
           0          1          2          3          4
0   5.718550   6.237734  21.148321  23.136265  11.644001 
1   4.154657  12.591685  11.868645  11.260228   3.802258 
2   6.688080   6.709124   9.170346  12.900095   7.538030 
3  16.818043  18.826502  23.405016  15.024944  24.822155 
4   4.405004  22.673484  11.130296   1.411436  21.202253 
5   6.420442   1.414739   2.240358   7.151456   4.942321 
6   7.443220  21.675140  20.287533  11.467862  12.751785 
7  17.511607  17.788686  17.326715  24.051668   4.398992 
8   0.881609   8.175566  23.253465   8.862715  19.432905 
9   2.645422  10.262120  23.801481  16.172546  18.551709

huangapple
  • 本文由 发表于 2023年3月7日 06:40:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/75656499.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定