英文:
Fast Fourier Transform (fft) aggregation on Spark Dataframe groupby
问题
我正在尝试在Spark DataFrame中使用NumPy的FFT函数对窗口进行FFT计算,代码如下:
import numpy as np
df_grouped = df.groupBy(
"id",
"type",
"mode",
func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
percentile_approx("value", 0.25).alias("quantile_1(value)"),
percentile_approx("magnitude", 0.25).alias("quantile_1(magnitude)"),
percentile_approx("value", 0.5).alias("quantile_2(value)"),
percentile_approx("magnitude", 0.5).alias("quantile_2(magnitude)"),
percentile_approx("value", 0.75).alias("quantile_3(value)"),
percentile_approx("magnitude", 0.75).alias("quantile_3(magnitude)"),
avg("value"),
avg("magnitude"),
min("value"),
min("magnitude"),
max("value"),
max("magnitude"),
kurtosis("value"),
kurtosis("magnitude"),
var_samp("value"),
var_samp("magnitude"),
stddev_samp("value"),
stddev_samp("magnitude"),
np.fft.fft("value"),
np.fft.fft("magnitude"),
np.fft.rfft("value"),
np.fft.rfft("magnitude"),
)
每个聚合函数都可以正常工作,但对于FFT,您遇到了以下错误:
tuple index out of range
我理解您的数据是浮点数,但问题可能是由于Spark Row的数据结构而引起的。您可以尝试使用F.udf
函数来将Spark的Row转换为NumPy数组,然后再应用FFT。以下是一种可能的解决方法:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
# 定义一个UDF来将Spark Row转换为NumPy数组
def row_to_numpy(row):
return [float(row["value_0"])]
# 注册UDF
row_to_numpy_udf = udf(row_to_numpy, ArrayType(DoubleType()))
# 在DataFrame中使用UDF进行转换
df_transformed = df.withColumn("value_array", row_to_numpy_udf(df))
# 应用NumPy的FFT
df_grouped = df_transformed.groupBy(
"id",
"type",
"mode",
func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
# 其他聚合函数
# ...
np.fft.fft("value_array"),
# ...
)
通过这种方法,您将首先将Spark的Row数据转换为NumPy数组,然后再应用FFT函数。这应该解决您遇到的问题。
英文:
I am trying to get the fft over a window using numpy fft with spark dataframe like this:
import numpy as np
df_grouped = df.groupBy(
"id",
"type",
"mode",
func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
percentile_approx("value", 0.25).alias("quantile_1(value)"),
percentile_approx("magnitude", 0.25).alias("quantile_1(magnitude)"),
percentile_approx("value", 0.5).alias("quantile_2(value)"),
percentile_approx("magnitude", 0.5).alias("quantile_2(magnitude)"),
percentile_approx("value", 0.75).alias("quantile_3(value)"),
percentile_approx("magnitude", 0.75).alias("quantile_3(magnitude)"),
avg("value"),
avg("magnitude"),
min("value"),
min("magnitude"),
max("value"),
max("magnitude"),
kurtosis("value"),
kurtosis("magnitude"),
var_samp("value"),
var_samp("magnitude"),
stddev_samp("value"),
stddev_samp("magnitude"),
np.fft.fft("value"),
np.fft.fft("magnitude"),
np.fft.rfft("value"),
np.fft.rfft("magnitude"),
)
Every aggregation function works fine, however for the fft I get:
tuple index out of range
and I don't understand why. Do I need to do anything particular to the values in order for numpy fft to work? The values are all floats. When I print
the column it looks like this:
[Row(value_0=6.247499942779541), Row(value_0=63.0), Row(value_0=54.54375076293945), Row(value_0=0.7088077664375305), Row(value_0=51.431251525878906), Row(value_0=0.09377499669790268), Row(value_0=0.09707500040531158), Row(value_0=6.308750152587891), Row(value_0=8.503950119018555), Row(value_0=295.8463134765625), Row(value_0=7.938048839569092), Row(value_0=8.503950119018555), Row(value_0=0.7090428471565247), Row(value_0=0.7169944643974304), Row(value_0=0.5659012794494629)]
I am guessing the spark row might be an issue, but I am unsure of how to convert it in this context.
答案1
得分: 0
np.fft.fft是一个numpy函数,不是pyspark函数。因此,你不能直接将它应用于一个dataframe。
此外,它的输入是一个数组。`"value"`是一个字符串。函数`fft`不能推断出它是列`"value"`的值的聚合列表。你必须手动处理。
```python
from pyspark.sql import functions as F, types as T
df_grouped = df.groupBy(
"id",
"type",
"mode",
func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
F.percentile_approx("value", 0.25).alias("quantile_1(value)"),
...,
F.stddev_samp("magnitude"), # 我将np.fft.fft替换为collect_list
F.collect_list("value").alias("values"),
F.collect_list("magnitude").alias("magnitudes"),
)
# 定义UDF fft。对于rfft做相同的操作
@F.udf(T.ArrayType(T.FloatType()))
def fft_udf(array):
return [float(x) for x in np.fft.fft(array)]
# 对所有的列都这样做
df_grouped.withColumn("ftt_values", fft_udf(F.col("values")))
英文:
np.fft.fft
is a numpy function, not a pyspark function. Therefore, you cannot apply it directly to a dataframe.
Moreover, it takes as entry an array. "value"
is a string. The function fft
cannot infer that as being the aggregated list of the values of the column "value"
. You have to do it manually.
from pyspark.sql import functions as F, types as T
df_grouped = df.groupBy(
"id",
"type",
"mode",
func.window("timestamp", "10 seconds", "5 seconds"),
).agg(
F.percentile_approx("value", 0.25).alias("quantile_1(value)"),
...,
F.stddev_samp("magnitude"), # I replace the np.fft.fft with a collect_list
F.collect_list("value").alias("values"),
F.collect_list("magnitude").alias("magnitudes"),
)
# Definition of the UDF fft. Do the same for rfft
@F.udf(T.ArrayType(T.FloatType()))
def fft_udf(array):
return [float(x) for x in np.fft.fft(array)]
# Do that for all your columns.
df_grouped.withColumn("ftt_values", fft_udf(F.col("values")))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论