在Pyspark中,在数据框中添加带有时间间隔的新时间戳列。

huangapple go评论113阅读模式
英文:

Add new timestamp column with interval in dataframe in pyspark

问题

我正在使用PySpark,并且有一个Spark数据框。我想要添加一个新列"timestamp interval",间隔为15分钟。请问有人可以帮忙吗?

我的数据集如下:

+-------------+-----+-------+
|id           |model|price  |
+-------------+-----+-------+
|2187233      |1    |54.13  |
|2187233      |1    |44.94  |
|2187233      |1    |39.84  |
|2187233      |1    |36.95  |
|99999653468  |1    |108.06 |
|99999653468  |1    |108.96 |
|99999653468  |1    |108.84 |
|99999653468  |1    |108.86 |
+-------------+-----+-------+

假设当前时间是2023-07-30 00:00:00

那么结果应该如下:

+-------------+-----+------------------+-------------------+
|id           |model|price             |ds                 |
+-------------+-----+------------------+-------------------+
|2187233      |1    |54.13             |2023-07-30 00:00:00|
|2187233      |1    |44.94             |2023-07-30 00:15:00|
|2187233      |1    |39.84             |2023-07-30 00:30:00|
|2187233      |1    |36.95             |2023-07-30 00:45:00|
|99999653468  |1    |108.06            |2023-07-30 00:00:00|
|99999653468  |1    |108.96            |2023-07-30 00:15:00|
|99999653468  |1    |108.84            |2023-07-30 00:30:00|
|99999653468  |1    |108.86            |2023-07-30 00:45:00|
+-------------+-----+------------------+-------------------+
英文:

I'm using PySpark and I have a Spark dataframe. I want to add a new column timestamp interval with 15 minutes. Can anyone help please.

<pre>
My Dataset:
+-------------+-----+-------
|id |model|price |
+-------------+-----+-------
|2187233 |1 |54.13 |
|2187233 |1 |44.94 |
|2187233 |1 |39.84 |
|2187233 |1 |36.95 |
|99999653468|1 |108.06 |
|99999653468|1 |108.96 |
|99999653468|1 |108.84 |
|99999653468|1 |108.86 |
+-------------+-----+--------
</pre>

Suppose current time is 2023-07-30 00:00:00

then Result should be:

<pre>
+-------------+-----+------------------+-------------------+
|id |model|price |ds |
+-------------+-----+------------------+-------------------+
|2187233 |1 |54.13 |2023-07-30 00:00:00|
|2187233 |1 |44.94 |2023-07-30 00:15:00|
|2187233 |1 |39.84 |2023-07-30 00:30:00|
|2187233 |1 |36.95 |2023-07-30 00:45:00|
|99999653468|1 |108.06 |2023-07-30 00:00:00|
|99999653468|1 |108.96 |2023-07-30 00:15:00|
|99999653468|1 |108.84 |2023-07-30 00:30:00|
|99999653468|1 |108.86 |2023-07-30 00:45:00|
+-------------+-----+------------------+-------------------+
</pre>

答案1

得分: 0

使用PySpark中的withColumn函数和expr函数,您可以实现这一点。您需要导入必要的函数并创建一个具有所需时间戳间隔的新列

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_timestamp
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("TimestampInterval").getOrCreate()
interval_minutes = 15

# 您的DataFrame
data = [
    (2187233, 1, 54.13),
    (2187233, 1, 44.94),
    (2187233, 1, 39.84),
    (2187233, 1, 36.95),
    (99999653468, 1, 108.06),
    (99999653468, 1, 108.96),
    (99999653468, 1, 108.84),
    (99999653468, 1, 108.86)
]

columns = ["id", "model", "price"]
df = spark.createDataFrame(data, columns)

window_spec = Window.partitionBy("id").orderBy("model")
df = df.withColumn("interval_num", (expr("row_number() OVER PARTITION BY id ORDER BY model") - 1))
df = df.withColumn("interval", expr(f"INTERVAL {interval_minutes} MINUTES * interval_num"))
df = df.withColumn("ds", current_timestamp() + col("interval"))

df = df.drop("interval_num", "interval")
df.show(truncate=False)

请注意,以上是您提供的代码的翻译部分。

英文:

U can achieve this using the withColumn function along with the expr function in PySpark. U need to import the necessary functions and create a new column with the desired timestamp intervals

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_timestamp
from pyspark.sql.window import Window

spark = SparkSession.builder.appName(&quot;TimestampInterval&quot;).getOrCreate()
interval_minutes = 15

# Your DataFrame
data = [
    (2187233, 1, 54.13),
    (2187233, 1, 44.94),
    (2187233, 1, 39.84),
    (2187233, 1, 36.95),
    (99999653468, 1, 108.06),
    (99999653468, 1, 108.96),
    (99999653468, 1, 108.84),
    (99999653468, 1, 108.86)
]

columns = [&quot;id&quot;, &quot;model&quot;, &quot;price&quot;]
df = spark.createDataFrame(data, columns)

window_spec = Window.partitionBy(&quot;id&quot;).orderBy(&quot;model&quot;)
df = df.withColumn(&quot;interval_num&quot;, (expr(&quot;row_number() OVER PARTITION BY id ORDER BY model&quot;) - 1))
df = df.withColumn(&quot;interval&quot;, expr(f&quot;INTERVAL {interval_minutes} MINUTES * interval_num&quot;))
df = df.withColumn(&quot;ds&quot;, current_timestamp() + col(&quot;interval&quot;))

df = df.drop(&quot;interval_num&quot;, &quot;interval&quot;)
df.show(truncate=False)

huangapple
  • 本文由 发表于 2023年8月10日 14:20:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/76873074.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定