Dask map_overlap 以非按时间顺序传递分区。

huangapple go评论67阅读模式
英文:

Dask map_overlap passes partitions in non-chronological order

问题

Dask的df.map_overlap()函数以非按照时间顺序的方式传递分区。对于具有大量重叠的大型数据集的计算来说,这是极其低效的。当然,我们已经在并行运行,所以操作的顺序不能保证,但这似乎是一个设计缺陷,导致了比必要的内存负载更高。有没有人有办法解决这个问题?下面是一个可以打印开始和结束日期的最小验证示例(MVE)。您可以看到排序似乎是随机的。有没有什么想法如何解决这个问题?

import pandas as pd
import dask.dataframe as dd
import numpy as np

date_range = pd.date_range(start='2023-06-01', end='2023-07-01', freq='T')
df = pd.DataFrame({
    'timestamp': date_range,
    'quantity': np.random.rand(len(date_range))
})

df = dd.from_pandas(df, npartitions=5)
df['timestamp'] = df['timestamp'].dt.ceil('1h')
df = df.set_index('timestamp', sorted=True)

df = df.repartition(freq='1h')

def custom_func(ser):
    print(f'from: {ser.index.min()}, to: {ser.index.max()}')
    return pd.Series(np.mean(ser), index=ser.index)

results = df['quantity'].map_overlap(custom_func, before=pd.Timedelta(days=7), after=0, meta=pd.Series(dtype='float64'), align_dataframes=False, enforce_metadata=False).drop_duplicates()

results.compute()
英文:

Dask's df.map_overlap() function passes the partitions in non chronological order. This is extremely inefficient for computations on large datasets with a lot of overlap. Of course, we are running parallelized anyway, so the order of operations is not guaranteed, but this seems like a design flaw that contributes to a higher than necessary memory load. Does anyone have an idea of how this could be solved? Here is a MVE that prints the start and end dates. You can see that the ordering seems to be random. Any ideas how to solve this?

import pandas as pd
import dask.dataframe as dd
import numpy as np

date_range = pd.date_range(start='2023-06-01', end='2023-07-01', freq='T')
df = pd.DataFrame({
    'timestamp': date_range,
    'quantity': np.random.rand(len(date_range))
})

df = dd.from_pandas(df, npartitions=5)
df['timestamp'] = df['timestamp'].dt.ceil('1h')
df = df.set_index('timestamp', sorted=True)

df = df.repartition(freq='1h')

def custom_func(ser):
    print(f'from: {ser.index.min()}, to: {ser.index.max()}')
    return pd.Series(np.mean(ser), index=ser.index)

results = df['quantity'].map_overlap(custom_func, before=pd.Timedelta(days=7), after=0, meta=pd.Series(dtype='float64'), align_dataframes=False, enforce_metadata=False).drop_duplicates()

results.compute()

答案1

得分: 1

这不是一个完整的答案,但希望能为你提供一些有前途的探索方向。

可以构建一个定制的有向无环图(DAG),以满足你所期望的顺序。假设你的数据已经被索引或排序,你可以使用明确的分区范围来运行计算。例如:

from dask import compute
from dask.datasets import timeseries
df = timeseries(end='2000-01-10')

n_partitions = df.npartitions
days_offset = 7
partition_combinations = [(i-days_offset, i) for i in range(n_partitions) if i-days_offset>=0]

all_results = []
for start, end in partition_combinations:
   print(start, end)
   subset_result = df.partitions[start:end].mean()
   all_results.append(subset_result)

computed = compute(*all_results)

请注意,在这里没有使用 .map.map_overlap,因为我们在计算中明确控制了分区范围。在所有的惰性/延迟计算被形成后,最后一行将触发实际的计算。

英文:

This is not a complete answer, but hopefully gives you some promising directions to explore.

It's possible to construct a custom DAG that satisfies your desired ordering. Assuming your data is indexed/order, you can use the explicit partition ranges to run the computations. For example:

from dask import compute
from dask.datasets import timeseries
df = timeseries(end='2000-01-10')

n_partitions = df.npartitions
days_offset = 7
partition_combinations = [(i-days_offset, i) for i in range(n_partitions) if i-days_offset>=0]

all_results = []
for start, end in partition_combinations:
   print(start, end)
   subset_result = df.partitions[start:end].mean()
   all_results.append(subset_result)

computed = compute(*all_results)

Note that there is no .map or .map_overlap used since we are explicitly controlling the range of partitions used in a calculation. After all the lazy/delayed computations are formed, the last line will trigger the actual computations.

答案2

得分: -1

我找到了解决这个问题的潜在方法。在使用map_overlap()函数之前,通过手动根据它们的时间戳对分区进行排序,我们可以确保计算按照更加时间顺序进行,从而可能减少内存压力并提高整体效率。

以下是使用分区排序的更新代码:

import pandas as pd
import dask.dataframe as dd
import numpy as np

date_range = pd.date_range(start='2023-06-01', end='2023-07-01', freq='T')
df = pd.DataFrame({
    'timestamp': date_range,
    'quantity': np.random.rand(len(date_range))
})

df = dd.from_pandas(df, npartitions=5)
df['timestamp'] = df['timestamp'].dt.ceil('1h')
df = df.set_index('timestamp', sorted=True)

# 解决方案不需要重新分区,因此我们将其删除。

def custom_func(ser):
    print(f'from: {ser.index.min()}, to: {ser.index.max()}')
    return pd.Series(np.mean(ser), index=ser.index)

# 在应用map_overlap之前,手动根据时间戳对分区进行排序
df = df.map_partitions(lambda part: part.sort_index(), meta=df._meta)

results = df['quantity'].map_overlap(custom_func, before=pd.Timedelta(days=7), after=0, meta=pd.Series(dtype='float64'), align_dataframes=False, enforce_metadata=False).drop_duplicates()

results.compute()

通过使用map_partitions函数和lambda函数在调用map_overlap()之前对分区进行排序,我们可以减轻非时间顺序的问题。这应该有助于优化内存负载并增强我们计算的整体性能,尤其是在处理数据的广泛重叠时。

英文:

I found a potential solution to this issue. By manually sorting the partitions based on their timestamps before using the map_overlap() function, we can ensure that the computation proceeds in a more chronological order, thus potentially reducing memory pressure and improving overall efficiency.

Here's the updated code using the partition sorting:

import pandas as pd
import dask.dataframe as dd
import numpy as np

date_range = pd.date_range(start='2023-06-01', end='2023-07-01', freq='T')
df = pd.DataFrame({
    'timestamp': date_range,
    'quantity': np.random.rand(len(date_range))
})

df = dd.from_pandas(df, npartitions=5)
df['timestamp'] = df['timestamp'].dt.ceil('1h')
df = df.set_index('timestamp', sorted=True)

# Repartitioning is not necessary for the solution, so we'll remove it.

def custom_func(ser):
    print(f'from: {ser.index.min()}, to: {ser.index.max()}')
    return pd.Series(np.mean(ser), index=ser.index)

# Manually sort the partitions based on the timestamp before applying map_overlap
df = df.map_partitions(lambda part: part.sort_index(), meta=df._meta)

results = df['quantity'].map_overlap(custom_func, before=pd.Timedelta(days=7), after=0, meta=pd.Series(dtype='float64'), align_dataframes=False, enforce_metadata=False).drop_duplicates()

results.compute()

By using the map_partitions function with a lambda function to sort the partitions before calling map_overlap(), we can mitigate the non-chronological order issue. This should help optimize the memory load and enhance the overall performance of our computations, especially when dealing with extensive overlaps in the data.

huangapple
  • 本文由 发表于 2023年7月17日 16:11:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/76702576.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定