英文:
Fast Fourier Transform (fft) aggregation on Spark Dataframe groupby
问题
我正在尝试在Spark DataFrame中使用NumPy的FFT函数对窗口进行FFT计算,代码如下:
import numpy as np
df_grouped = df.groupBy(
    "id",
    "type",
    "mode",
    func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
    percentile_approx("value", 0.25).alias("quantile_1(value)"),
    percentile_approx("magnitude", 0.25).alias("quantile_1(magnitude)"),
    percentile_approx("value", 0.5).alias("quantile_2(value)"),
    percentile_approx("magnitude", 0.5).alias("quantile_2(magnitude)"),
    percentile_approx("value", 0.75).alias("quantile_3(value)"),
    percentile_approx("magnitude", 0.75).alias("quantile_3(magnitude)"),
    avg("value"),
    avg("magnitude"),
    min("value"),
    min("magnitude"),
    max("value"),
    max("magnitude"),
    kurtosis("value"),
    kurtosis("magnitude"),
    var_samp("value"),
    var_samp("magnitude"),
    stddev_samp("value"),
    stddev_samp("magnitude"),
    np.fft.fft("value"),
    np.fft.fft("magnitude"),
    np.fft.rfft("value"),
    np.fft.rfft("magnitude"),
)
每个聚合函数都可以正常工作,但对于FFT,您遇到了以下错误:
tuple index out of range
我理解您的数据是浮点数,但问题可能是由于Spark Row的数据结构而引起的。您可以尝试使用F.udf函数来将Spark的Row转换为NumPy数组,然后再应用FFT。以下是一种可能的解决方法:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
# 定义一个UDF来将Spark Row转换为NumPy数组
def row_to_numpy(row):
    return [float(row["value_0"])]
# 注册UDF
row_to_numpy_udf = udf(row_to_numpy, ArrayType(DoubleType()))
# 在DataFrame中使用UDF进行转换
df_transformed = df.withColumn("value_array", row_to_numpy_udf(df))
# 应用NumPy的FFT
df_grouped = df_transformed.groupBy(
    "id",
    "type",
    "mode",
    func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
    # 其他聚合函数
    # ...
    np.fft.fft("value_array"),
    # ...
)
通过这种方法,您将首先将Spark的Row数据转换为NumPy数组,然后再应用FFT函数。这应该解决您遇到的问题。
英文:
I am trying to get the fft over a window using numpy fft with spark dataframe like this:
import numpy as np
df_grouped = df.groupBy(
    "id",
    "type",
    "mode",
    func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
    percentile_approx("value", 0.25).alias("quantile_1(value)"),
    percentile_approx("magnitude", 0.25).alias("quantile_1(magnitude)"),
    percentile_approx("value", 0.5).alias("quantile_2(value)"),
    percentile_approx("magnitude", 0.5).alias("quantile_2(magnitude)"),
    percentile_approx("value", 0.75).alias("quantile_3(value)"),
    percentile_approx("magnitude", 0.75).alias("quantile_3(magnitude)"),
    avg("value"),
    avg("magnitude"),
    min("value"),
    min("magnitude"),
    max("value"),
    max("magnitude"),
    kurtosis("value"),
    kurtosis("magnitude"),
    var_samp("value"),
    var_samp("magnitude"),
    stddev_samp("value"),
    stddev_samp("magnitude"),
    np.fft.fft("value"),
    np.fft.fft("magnitude"),
    np.fft.rfft("value"),
    np.fft.rfft("magnitude"),
)
Every aggregation function works fine, however for the fft I get:
tuple index out of range
and I don't understand why. Do I need to do anything particular to the values in order for numpy fft to work? The values are all floats. When I print the column it looks like this:
[Row(value_0=6.247499942779541), Row(value_0=63.0), Row(value_0=54.54375076293945), Row(value_0=0.7088077664375305), Row(value_0=51.431251525878906), Row(value_0=0.09377499669790268), Row(value_0=0.09707500040531158), Row(value_0=6.308750152587891), Row(value_0=8.503950119018555), Row(value_0=295.8463134765625), Row(value_0=7.938048839569092), Row(value_0=8.503950119018555), Row(value_0=0.7090428471565247), Row(value_0=0.7169944643974304), Row(value_0=0.5659012794494629)]
I am guessing the spark row might be an issue, but I am unsure of how to convert it in this context.
答案1
得分: 0
np.fft.fft是一个numpy函数,不是pyspark函数。因此,你不能直接将它应用于一个dataframe。
此外,它的输入是一个数组。`"value"`是一个字符串。函数`fft`不能推断出它是列`"value"`的值的聚合列表。你必须手动处理。
```python
from pyspark.sql import functions as F, types as T
df_grouped = df.groupBy(
    "id",
    "type",
    "mode",
    func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
    F.percentile_approx("value", 0.25).alias("quantile_1(value)"),
    ...,
    F.stddev_samp("magnitude"), # 我将np.fft.fft替换为collect_list
    F.collect_list("value").alias("values"),
    F.collect_list("magnitude").alias("magnitudes"),
)
# 定义UDF fft。对于rfft做相同的操作
@F.udf(T.ArrayType(T.FloatType()))
def fft_udf(array):
    return [float(x) for x in np.fft.fft(array)]
# 对所有的列都这样做
df_grouped.withColumn("ftt_values", fft_udf(F.col("values")))
英文:
np.fft.fft is a numpy function, not a pyspark function. Therefore, you cannot apply it directly to a dataframe.
Moreover, it takes as entry an array. "value" is a string. The function fft cannot infer that as being the aggregated list of the values of the column "value". You have to do it manually.
from pyspark.sql import functions as F, types as T
df_grouped = df.groupBy(
    "id",
    "type",
    "mode",
    func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
    F.percentile_approx("value", 0.25).alias("quantile_1(value)"),
    ...,
    F.stddev_samp("magnitude"), # I replace the np.fft.fft with a collect_list
    F.collect_list("value").alias("values"),
    F.collect_list("magnitude").alias("magnitudes"),
)
#  Definition of the UDF fft. Do the same for rfft
@F.udf(T.ArrayType(T.FloatType()))
def fft_udf(array):
    return [float(x) for x in np.fft.fft(array)]
# Do that for all your columns.
df_grouped.withColumn("ftt_values", fft_udf(F.col("values")))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论