英文:
Pyspark generate same id for consecutive rows
问题
我有一个包含user_id和flags的数据框,我想要为相同user_id内连续的false flags 创建单独的分组,但我在尝试时遇到了困难。
数据框看起来像这样:
+-------+-----+
|user_id| keep|
+-------+-----+
| 1| true|
| 1| true|
| 1|false|
| 1|false|
| 1| true|
| 1| true|
| 2| true|
| 2| true|
| 2|false|
| 2|false|
| 2|false|
| 2| true|
| 2|false|
| 2|false|
| 2| true|
| 3| true|
| 4| true|
| 5| true|
| 5|false|
| 5|false|
+-------+-----+
预期结果:
+-------+-----+-----+
|user_id| flag|group|
+-------+-----+-----+
| 1| true| 0|
| 1| true| 0|
| 1|false| 1|
| 1|false| 1|
| 1| true| 0|
| 1| true| 0|
| 2| true| 0|
| 2| true| 0|
| 2|false| 1|
| 2|false| 1|
| 2|false| 1|
| 2| true| 0|
| 2|false| 2|
| 2|false| 2|
| 2| true| 0|
| 3| true| 0|
| 4| true| 0|
| 5| true| 0|
| 5|false| 1|
| 5|false| 1|
+-------+-----+-----+
对于如何实现这个结果,你有什么想法吗?
英文:
I have a dataframe with user_id and flags, and I would like to create separate groups for consecutive false flags within the same user_id, but I'm having a hard time getting this to work.
The dataframe look like this:
+-------+-----+
|user_id| keep|
+-------+-----+
| 1| true|
| 1| true|
| 1|false|
| 1|false|
| 1| true|
| 1| true|
| 2| true|
| 2| true|
| 2|false|
| 2|false|
| 2|false|
| 2| true|
| 2|false|
| 2|false|
| 2| true|
| 3| true|
| 4| true|
| 5| true|
| 5|false|
| 5|false|
+-------+-----+
Expected result:
+-------+-----+-----+
|user_id| flag|group|
+-------+-----+-----+
| 1| true| 0|
| 1| true| 0|
| 1|false| 1|
| 1|false| 1|
| 1| true| 0|
| 1| true| 0|
| 2| true| 0|
| 2| true| 0|
| 2|false| 1|
| 2|false| 1|
| 2|false| 1|
| 2| true| 0|
| 2|false| 2|
| 2|false| 2|
| 2| true| 0|
| 3| true| 0|
| 4| true| 0|
| 5| true| 0|
| 5|false| 1|
| 5|false| 1|
+-------+-----+-----+
Any idea on how I can get this to work?
答案1
得分: 1
希望我理解您的问题正确。请检查我的答案以查看是否符合您的意图。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("consecutive_false_flags").getOrCreate()
data = [
(1, True),
(1, True),
(1, False),
(1, False),
(1, True),
(1, True),
(2, True),
(2, True),
(2, False),
(2, False),
(2, False),
(2, True),
(2, False),
(2, False),
(2, True),
(3, True),
(4, True),
(5, True),
(5, False),
(5, False)
]
df = spark.createDataFrame(data, ["user_id", "keep"])
# 创建一个有序的unique_id列,以便更容易使用窗口函数
df = df.withColumn("unique_id", monotonically_increasing_id())
# 使用lag函数创建一个标识新组开始的列
df = df.withColumn("new_group",
when(col("keep") == "true", 0)
.otherwise((lag("keep").over(Window.partitionBy("user_id").orderBy(col("unique_id"))) != col("keep")).
cast("int")))
df.show()
# 将new_group值相加以获取group值
df = df.withColumn("group", when(col("keep") == "true", 0)
.otherwise(sum("new_group").over(Window.partitionBy("user_id").orderBy(col("unique_id")))))
df.show()
# 删除两个多余的列以获得预期结果
df = df.drop("new_group", "unique_id")
df.show()
此代码用于在PySpark中处理数据,将相邻的False值分组,并生成新的group列。
英文:
Hope that I understand your question right. Please check my answer to see if it works as your intended
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("consecutive_false_flags").getOrCreate()
data = [
(1, True),
(1, True),
(1, False),
(1, False),
(1, True),
(1, True),
(2, True),
(2, True),
(2, False),
(2, False),
(2, False),
(2, True),
(2, False),
(2, False),
(2, True),
(3, True),
(4, True),
(5, True),
(5, False),
(5, False)
]
df = spark.createDataFrame(data, ["user_id", "keep"])
# Create an ordered unique_id column to make working with window fuction easier
df = df.withColumn("unique_id", monotonically_increasing_id())
# create a column that identifies when a new group starts using lag func
# by checking if it's change from true -> false or false -> true
# if it changes then convert to 1, else 0
# convert all the 'new_group' value to 0 if 'keep' is 'true', only care about 'false' value
df = df.withColumn("new_group",
when(col("keep") == "true", 0)
.otherwise((lag("keep").over(Window.partitionBy("user_id").orderBy(col("unique_id"))) != col("keep")).
cast("int")))
df.show()
# +-------+-----+-----------+---------+
# |user_id| keep| unique_id|new_group|
# +-------+-----+-----------+---------+
# | 1| true| 0| 0|
# | 1| true| 1| 0|
# | 1|false| 2| 1|
# | 1|false| 3| 0|
# | 1| true| 4| 0|
# | 1| true| 8589934592| 0|
# | 2| true| 8589934593| 0|
# | 2| true| 8589934594| 0|
# | 2|false| 8589934595| 1|
# | 2|false| 8589934596| 0|
# | 2|false|17179869184| 0|
# | 2| true|17179869185| 0|
# | 2|false|17179869186| 1|
# | 2|false|17179869187| 0|
# | 2| true|17179869188| 0|
# | 3| true|25769803776| 0|
# | 4| true|25769803777| 0|
# | 5| true|25769803778| 0|
# | 5|false|25769803779| 1|
# | 5|false|25769803780| 0|
# +-------+-----+-----------+---------+
# add up the new_group values to get the group value
# still convert all the 'group' value to 0 if 'keep' is 'true', only care about 'false' value
df = df.withColumn("group", when(col("keep") == "true", 0)
.otherwise(sum("new_group").over(Window.partitionBy("user_id").orderBy(col("unique_id")))))
df.show()
# +-------+-----+-----------+---------+-----+
# |user_id| keep| unique_id|new_group|group|
# +-------+-----+-----------+---------+-----+
# | 1| true| 0| 0| 0|
# | 1| true| 1| 0| 0|
# | 1|false| 2| 1| 1|
# | 1|false| 3| 0| 1|
# | 1| true| 4| 0| 0|
# | 1| true| 8589934592| 0| 0|
# | 2| true| 8589934593| 0| 0|
# | 2| true| 8589934594| 0| 0|
# | 2|false| 8589934595| 1| 1|
# | 2|false| 8589934596| 0| 1|
# | 2|false|17179869184| 0| 1|
# | 2| true|17179869185| 0| 0|
# | 2|false|17179869186| 1| 2|
# | 2|false|17179869187| 0| 2|
# | 2| true|17179869188| 0| 0|
# | 3| true|25769803776| 0| 0|
# | 4| true|25769803777| 0| 0|
# | 5| true|25769803778| 0| 0|
# | 5|false|25769803779| 1| 1|
# | 5|false|25769803780| 0| 1|
# +-------+-----+-----------+---------+-----+
# drop the 2 spare column to get expected result
df = df.drop("new_group", "unique_id")
df.show()
# +-------+-----+-----+
# |user_id| keep|group|
# +-------+-----+-----+
# | 1| true| 0|
# | 1| true| 0|
# | 1|false| 1|
# | 1|false| 1|
# | 1| true| 0|
# | 1| true| 0|
# | 2| true| 0|
# | 2| true| 0|
# | 2|false| 1|
# | 2|false| 1|
# | 2|false| 1|
# | 2| true| 0|
# | 2|false| 2|
# | 2|false| 2|
# | 2| true| 0|
# | 3| true| 0|
# | 4| true| 0|
# | 5| true| 0|
# | 5|false| 1|
# | 5|false| 1|
# +-------+-----+-----+
答案2
得分: 1
以下是代码的翻译部分:
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window
dataframe = dataframe.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())))
w = Window.partitionBy("user_id").orderBy('user_id')
ww = Window.partitionBy("user_id",'keep').orderBy('row_number')
dataframe = dataframe.withColumn("row_number", row_number().over(w)).withColumn("rank",rank().over(ww)).withColumn('diff',col('row_number')-col('rank'))
ww = Window.partitionBy("user_id",'keep').orderBy('diff')
dataframe = dataframe.withColumn("resu", dense_rank().over(ww)).withColumn('resu', when(col('keep')=='true',0).otherwise(col('resu')))
dataframe[["user_id",'keep','resu']].display()
以下是输出的翻译部分:
+-------+-----+-----+
|user_id| keep| resu|
+-------+-----+-----+
| 1| true| 0|
| 1| true| 0|
| 1|false| 1|
| 1|false| 1|
| 1| true| 0|
| 1| true| 0|
| 2| true| 0|
| 2| true| 0|
| 2|false| 1|
| 2|false| 1|
| 2|false| 1|
| 2| true| 0|
| 2|false| 2|
| 2|false| 2|
| 2| true| 0|
| 3| true| 0|
| 4| true| 0|
| 5| true| 0|
| 5|false| 1|
| 5|false| 1|
+-------+-----+-----+
英文:
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window
dataframe = dataframe.withColumn( "index", row_number().over(Window.orderBy(monotonically_increasing_id())))
w = Window.partitionBy("user_id").orderBy('user_id')
ww = Window.partitionBy("user_id",'keep').orderBy('row_number')
dataframe = dataframe.withColumn("row_number", row_number().over(w)).withColumn("rank",rank().over(ww)).withColumn('diff',col('row_number')-col('rank'))
ww = Window.partitionBy("user_id",'keep').orderBy('diff')
dataframe = dataframe.withColumn("resu", dense_rank().over(ww)).withColumn('resu', when(col('keep')=='true',0).otherwise(col('resu')))
dataframe[["user_id",'keep','resu']].display()
<b>Output:</b>
+-------+-----+-----+
|user_id| keep| resu|
+-------+-----+-----+
| 1| true| 0|
| 1| true| 0|
| 1|false| 1|
| 1|false| 1|
| 1| true| 0|
| 1| true| 0|
| 2| true| 0|
| 2| true| 0|
| 2|false| 1|
| 2|false| 1|
| 2|false| 1|
| 2| true| 0|
| 2|false| 2|
| 2|false| 2|
| 2| true| 0|
| 3| true| 0|
| 4| true| 0|
| 5| true| 0|
| 5|false| 1|
| 5|false| 1|
+-------+-----+-----+
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论