英文:
Get the max(datetime) in Pyspark
问题
我有一个数据集,类似这样:
分类 | 日期时间 | 值 |
---|---|---|
a | 日期1 | 10 |
a | 日期2 | 30 |
a | 日期3 | 20 |
a | 日期4 | 50 |
a | 日期5 | 30 |
b | 日期6 | 20 |
b | 日期7 | 15 |
b | 日期8 | 30 |
b | 日期9 | 40 |
c | 日期10 | 10 |
c | 日期11 | 10 |
c | 日期12 | 30 |
我想要为每个分类获取最大值的日期时间,就像这样:
分类 | 日期时间 | 值 | 最大日期时间 |
---|---|---|---|
a | 日期1 | 10 | 日期4 |
a | 日期2 | 30 | 日期4 |
a | 日期3 | 20 | 日期4 |
a | 日期4 | 50 | 日期4 |
a | 日期5 | 30 | 日期4 |
b | 日期6 | 20 | 日期9 |
b | 日期7 | 15 | 日期9 |
b | 日期8 | 30 | 日期9 |
b | 日期9 | 40 | 日期9 |
c | 日期10 | 10 | 日期12 |
c | 日期11 | 10 | 日期12 |
c | 日期12 | 30 | 日期12 |
提前感谢!
英文:
I have a dataset like this :
category | datetime | value |
---|---|---|
a | date1 | 10 |
a | date2 | 30 |
a | date3 | 20 |
a | date4 | 50 |
a | date5 | 30 |
b | date6 | 20 |
b | date7 | 15 |
b | date8 | 30 |
b | date9 | 40 |
c | date10 | 10 |
c | date11 | 10 |
c | date12 | 30 |
And I want to get for each categoy the datetime of the max(value)
In this example I want to get this :
category | datetime | value | datetimeMax |
---|---|---|---|
a | date1 | 10 | date4 |
a | date2 | 30 | date4 |
a | date3 | 20 | date4 |
a | date4 | 50 | date4 |
a | date5 | 30 | date4 |
b | date6 | 20 | date9 |
b | date7 | 15 | date9 |
b | date8 | 30 | date9 |
b | date9 | 40 | date9 |
c | date10 | 10 | date12 |
c | date11 | 10 | date12 |
c | date12 | 30 | date12 |
Thanks in advance !
答案1
得分: 1
使用MAX_BY
在pyspark >= 3.3.0
中:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w = (
Window
.partitionBy('category')
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)
mdt = F.max_by('datetime', 'value').over(w)
df2 = df.withColumn('datetime_max', mdt)
英文:
Using MAX_BY
in pyspark >= 3.3.0
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w = (
Window
.partitionBy('category')
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)
mdt = F.max_by('datetime', 'value').over(w)
df2 = df.withColumn('datetime_max', mdt)
答案2
得分: 0
from pyspark.sql.functions import max
df = df.join(
df.groupby('category').
agg(max('value').alias('datetimeMax')),
on=['category'])
df.show()
英文:
from pyspark.sql.functions import max
df = df.join(
df.groupby('category'). \
agg(max('value').alias('datetimeMax')), \
on=['category'])
df.show()
答案3
得分: 0
使用窗口函数(row_number
,max
)来处理这个情况,通过在category
上定义分区并按value
降序排序。
when(row_number().over(w) == 1,
-> 当row_number
等于1时,获取日期时间值,否则保留为nullmax(when(row_number().over(w) == 1, col("datetime")))
-> 获取窗口中的最大值,并在所有行上填充
示例:
from pyspark.sql import Window
from pyspark.sql.functions import *
w = Window.partitionBy('category').orderBy(desc('value'))
df.withColumn("datetimeMax", max(when(row_number().over(w) == 1, col("datetime"))).over(w)).show(100, False)
# +--------+--------+-----+-----------+
# |category|datetime|value|datetimeMax|
# +--------+--------+-----+-----------+
# |a |date4 |50 |date4 |
# |a |date2 |30 |date4 |
# |a |date5 |30 |date4 |
# |a |date3 |20 |date4 |
# |a |date1 |10 |date4 |
# +--------+--------+-----+-----------+
(注意:上述示例是使用PySpark编写的代码,用于处理数据集中的窗口函数操作。)
英文:
Use window
functions(row_number,max
) for this case, by defining the partition by on category
and order by on value
descending.
when(row_number().over(w) == 1,
-> when row_number=1 then get datetime value otherwise keep as nullmax(when(row_number().over(w) == 1,col("datetime")))
-> get max value for the window and populate on all rows
Example:
from pyspark.sql import Window
from pyspark.sql.functions import *
w = Window.partitionBy('category').orderBy(desc('value'))
w = Window.partitionBy('category').orderBy(desc('value'))
df.withColumn("datetimeMax",max(when(row_number().over(w) == 1,col("datetime"))).over(w)).show(100,False)
#+--------+--------+-----+-----------+
#|category|datetime|value|datetimeMax|
#+--------+--------+-----+-----------+
#|a |date4 |50 |date4 |
#|a |date2 |30 |date4 |
#|a |date5 |30 |date4 |
#|a |date3 |20 |date4 |
#|a |date1 |10 |date4 |
#+--------+--------+-----+-----------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论