快速傅立叶变换(FFT)聚合在Spark数据框分组上

huangapple go评论54阅读模式
英文:

Fast Fourier Transform (fft) aggregation on Spark Dataframe groupby

问题

我正在尝试在Spark DataFrame中使用NumPy的FFT函数对窗口进行FFT计算,代码如下:

import numpy as np

df_grouped = df.groupBy(
    "id",
    "type",
    "mode",
    func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
    percentile_approx("value", 0.25).alias("quantile_1(value)"),
    percentile_approx("magnitude", 0.25).alias("quantile_1(magnitude)"),
    percentile_approx("value", 0.5).alias("quantile_2(value)"),
    percentile_approx("magnitude", 0.5).alias("quantile_2(magnitude)"),
    percentile_approx("value", 0.75).alias("quantile_3(value)"),
    percentile_approx("magnitude", 0.75).alias("quantile_3(magnitude)"),
    avg("value"),
    avg("magnitude"),
    min("value"),
    min("magnitude"),
    max("value"),
    max("magnitude"),
    kurtosis("value"),
    kurtosis("magnitude"),
    var_samp("value"),
    var_samp("magnitude"),
    stddev_samp("value"),
    stddev_samp("magnitude"),
    np.fft.fft("value"),
    np.fft.fft("magnitude"),
    np.fft.rfft("value"),
    np.fft.rfft("magnitude"),
)

每个聚合函数都可以正常工作,但对于FFT,您遇到了以下错误:

tuple index out of range

我理解您的数据是浮点数,但问题可能是由于Spark Row的数据结构而引起的。您可以尝试使用F.udf函数来将Spark的Row转换为NumPy数组,然后再应用FFT。以下是一种可能的解决方法:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType

# 定义一个UDF来将Spark Row转换为NumPy数组
def row_to_numpy(row):
    return [float(row["value_0"])]

# 注册UDF
row_to_numpy_udf = udf(row_to_numpy, ArrayType(DoubleType()))

# 在DataFrame中使用UDF进行转换
df_transformed = df.withColumn("value_array", row_to_numpy_udf(df))

# 应用NumPy的FFT
df_grouped = df_transformed.groupBy(
    "id",
    "type",
    "mode",
    func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
    # 其他聚合函数
    # ...

    np.fft.fft("value_array"),
    # ...
)

通过这种方法,您将首先将Spark的Row数据转换为NumPy数组,然后再应用FFT函数。这应该解决您遇到的问题。

英文:

I am trying to get the fft over a window using numpy fft with spark dataframe like this:

import numpy as np

df_grouped = df.groupBy(
    "id",
    "type",
    "mode",
    func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
    percentile_approx("value", 0.25).alias("quantile_1(value)"),
    percentile_approx("magnitude", 0.25).alias("quantile_1(magnitude)"),
    percentile_approx("value", 0.5).alias("quantile_2(value)"),
    percentile_approx("magnitude", 0.5).alias("quantile_2(magnitude)"),
    percentile_approx("value", 0.75).alias("quantile_3(value)"),
    percentile_approx("magnitude", 0.75).alias("quantile_3(magnitude)"),
    avg("value"),
    avg("magnitude"),
    min("value"),
    min("magnitude"),
    max("value"),
    max("magnitude"),
    kurtosis("value"),
    kurtosis("magnitude"),
    var_samp("value"),
    var_samp("magnitude"),
    stddev_samp("value"),
    stddev_samp("magnitude"),
    np.fft.fft("value"),
    np.fft.fft("magnitude"),
    np.fft.rfft("value"),
    np.fft.rfft("magnitude"),
)

Every aggregation function works fine, however for the fft I get:

tuple index out of range

and I don't understand why. Do I need to do anything particular to the values in order for numpy fft to work? The values are all floats. When I print the column it looks like this:

[Row(value_0=6.247499942779541), Row(value_0=63.0), Row(value_0=54.54375076293945), Row(value_0=0.7088077664375305), Row(value_0=51.431251525878906), Row(value_0=0.09377499669790268), Row(value_0=0.09707500040531158), Row(value_0=6.308750152587891), Row(value_0=8.503950119018555), Row(value_0=295.8463134765625), Row(value_0=7.938048839569092), Row(value_0=8.503950119018555), Row(value_0=0.7090428471565247), Row(value_0=0.7169944643974304), Row(value_0=0.5659012794494629)]

I am guessing the spark row might be an issue, but I am unsure of how to convert it in this context.

答案1

得分: 0

np.fft.fft是一个numpy函数不是pyspark函数因此你不能直接将它应用于一个dataframe

此外它的输入是一个数组。`"value"`是一个字符串函数`fft`不能推断出它是列`"value"`的值的聚合列表你必须手动处理

```python
from pyspark.sql import functions as F, types as T

df_grouped = df.groupBy(
    "id",
    "type",
    "mode",
    func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
    F.percentile_approx("value", 0.25).alias("quantile_1(value)"),
    ...,
    F.stddev_samp("magnitude"), # 我将np.fft.fft替换为collect_list
    F.collect_list("value").alias("values"),
    F.collect_list("magnitude").alias("magnitudes"),
)


# 定义UDF fft。对于rfft做相同的操作
@F.udf(T.ArrayType(T.FloatType()))
def fft_udf(array):
    return [float(x) for x in np.fft.fft(array)]

# 对所有的列都这样做
df_grouped.withColumn("ftt_values", fft_udf(F.col("values")))
英文:

np.fft.fft is a numpy function, not a pyspark function. Therefore, you cannot apply it directly to a dataframe.

Moreover, it takes as entry an array. "value" is a string. The function fft cannot infer that as being the aggregated list of the values of the column "value". You have to do it manually.

from pyspark.sql import functions as F, types as T

df_grouped = df.groupBy(
    "id",
    "type",
    "mode",
    func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
    F.percentile_approx("value", 0.25).alias("quantile_1(value)"),
    ...,
    F.stddev_samp("magnitude"), # I replace the np.fft.fft with a collect_list
    F.collect_list("value").alias("values"),
    F.collect_list("magnitude").alias("magnitudes"),
)


#  Definition of the UDF fft. Do the same for rfft
@F.udf(T.ArrayType(T.FloatType()))
def fft_udf(array):
    return [float(x) for x in np.fft.fft(array)]

# Do that for all your columns.
df_grouped.withColumn("ftt_values", fft_udf(F.col("values")))

huangapple
  • 本文由 发表于 2023年2月14日 21:47:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/75448760.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定