英文:
Add new timestamp column with interval in dataframe in pyspark
问题
我正在使用PySpark,并且有一个Spark数据框。我想要添加一个新列"timestamp interval",间隔为15分钟。请问有人可以帮忙吗?
我的数据集如下:
+-------------+-----+-------+
|id |model|price |
+-------------+-----+-------+
|2187233 |1 |54.13 |
|2187233 |1 |44.94 |
|2187233 |1 |39.84 |
|2187233 |1 |36.95 |
|99999653468 |1 |108.06 |
|99999653468 |1 |108.96 |
|99999653468 |1 |108.84 |
|99999653468 |1 |108.86 |
+-------------+-----+-------+
假设当前时间是2023-07-30 00:00:00
那么结果应该如下:
+-------------+-----+------------------+-------------------+
|id |model|price |ds |
+-------------+-----+------------------+-------------------+
|2187233 |1 |54.13 |2023-07-30 00:00:00|
|2187233 |1 |44.94 |2023-07-30 00:15:00|
|2187233 |1 |39.84 |2023-07-30 00:30:00|
|2187233 |1 |36.95 |2023-07-30 00:45:00|
|99999653468 |1 |108.06 |2023-07-30 00:00:00|
|99999653468 |1 |108.96 |2023-07-30 00:15:00|
|99999653468 |1 |108.84 |2023-07-30 00:30:00|
|99999653468 |1 |108.86 |2023-07-30 00:45:00|
+-------------+-----+------------------+-------------------+
英文:
I'm using PySpark and I have a Spark dataframe. I want to add a new column timestamp interval with 15 minutes. Can anyone help please.
<pre>
My Dataset:
+-------------+-----+-------
|id |model|price |
+-------------+-----+-------
|2187233 |1 |54.13 |
|2187233 |1 |44.94 |
|2187233 |1 |39.84 |
|2187233 |1 |36.95 |
|99999653468|1 |108.06 |
|99999653468|1 |108.96 |
|99999653468|1 |108.84 |
|99999653468|1 |108.86 |
+-------------+-----+--------
</pre>
Suppose current time is 2023-07-30 00:00:00
then Result should be:
<pre>
+-------------+-----+------------------+-------------------+
|id |model|price |ds |
+-------------+-----+------------------+-------------------+
|2187233 |1 |54.13 |2023-07-30 00:00:00|
|2187233 |1 |44.94 |2023-07-30 00:15:00|
|2187233 |1 |39.84 |2023-07-30 00:30:00|
|2187233 |1 |36.95 |2023-07-30 00:45:00|
|99999653468|1 |108.06 |2023-07-30 00:00:00|
|99999653468|1 |108.96 |2023-07-30 00:15:00|
|99999653468|1 |108.84 |2023-07-30 00:30:00|
|99999653468|1 |108.86 |2023-07-30 00:45:00|
+-------------+-----+------------------+-------------------+
</pre>
答案1
得分: 0
使用PySpark
中的withColumn
函数和expr
函数,您可以实现这一点。您需要导入必要的函数并创建一个具有所需时间戳
间隔的新列
。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_timestamp
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("TimestampInterval").getOrCreate()
interval_minutes = 15
# 您的DataFrame
data = [
(2187233, 1, 54.13),
(2187233, 1, 44.94),
(2187233, 1, 39.84),
(2187233, 1, 36.95),
(99999653468, 1, 108.06),
(99999653468, 1, 108.96),
(99999653468, 1, 108.84),
(99999653468, 1, 108.86)
]
columns = ["id", "model", "price"]
df = spark.createDataFrame(data, columns)
window_spec = Window.partitionBy("id").orderBy("model")
df = df.withColumn("interval_num", (expr("row_number() OVER PARTITION BY id ORDER BY model") - 1))
df = df.withColumn("interval", expr(f"INTERVAL {interval_minutes} MINUTES * interval_num"))
df = df.withColumn("ds", current_timestamp() + col("interval"))
df = df.drop("interval_num", "interval")
df.show(truncate=False)
请注意,以上是您提供的代码的翻译部分。
英文:
U can achieve this using the withColumn
function along with the expr
function in PySpark
. U need to import the necessary functions and create a new column
with the desired timestamp
intervals
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_timestamp
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("TimestampInterval").getOrCreate()
interval_minutes = 15
# Your DataFrame
data = [
(2187233, 1, 54.13),
(2187233, 1, 44.94),
(2187233, 1, 39.84),
(2187233, 1, 36.95),
(99999653468, 1, 108.06),
(99999653468, 1, 108.96),
(99999653468, 1, 108.84),
(99999653468, 1, 108.86)
]
columns = ["id", "model", "price"]
df = spark.createDataFrame(data, columns)
window_spec = Window.partitionBy("id").orderBy("model")
df = df.withColumn("interval_num", (expr("row_number() OVER PARTITION BY id ORDER BY model") - 1))
df = df.withColumn("interval", expr(f"INTERVAL {interval_minutes} MINUTES * interval_num"))
df = df.withColumn("ds", current_timestamp() + col("interval"))
df = df.drop("interval_num", "interval")
df.show(truncate=False)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论