Pyspark生成连续行的相同ID。

huangapple go评论57阅读模式
英文:

Pyspark generate same id for consecutive rows

问题

我有一个包含user_id和flags的数据框,我想要为相同user_id内连续的false flags 创建单独的分组,但我在尝试时遇到了困难。

数据框看起来像这样:

+-------+-----+
|user_id| keep|
+-------+-----+
|      1| true|
|      1| true|
|      1|false|
|      1|false|
|      1| true|
|      1| true|
|      2| true|
|      2| true|
|      2|false|
|      2|false|
|      2|false|
|      2| true|
|      2|false|
|      2|false|
|      2| true|
|      3| true|
|      4| true|
|      5| true|
|      5|false|
|      5|false|
+-------+-----+

预期结果:

+-------+-----+-----+
|user_id| flag|group|
+-------+-----+-----+
|      1| true|    0|
|      1| true|    0|
|      1|false|    1|
|      1|false|    1|
|      1| true|    0|
|      1| true|    0|
|      2| true|    0|
|      2| true|    0|
|      2|false|    1|
|      2|false|    1|
|      2|false|    1|
|      2| true|    0|
|      2|false|    2|
|      2|false|    2|
|      2| true|    0|
|      3| true|    0|
|      4| true|    0|
|      5| true|    0|
|      5|false|    1|
|      5|false|    1|
+-------+-----+-----+

对于如何实现这个结果,你有什么想法吗?

英文:

I have a dataframe with user_id and flags, and I would like to create separate groups for consecutive false flags within the same user_id, but I'm having a hard time getting this to work.

The dataframe look like this:

+-------+-----+
|user_id| keep|
+-------+-----+
|      1| true|
|      1| true|
|      1|false|
|      1|false|
|      1| true|
|      1| true|
|      2| true|
|      2| true|
|      2|false|
|      2|false|
|      2|false|
|      2| true|
|      2|false|
|      2|false|
|      2| true|
|      3| true|
|      4| true|
|      5| true|
|      5|false|
|      5|false|
+-------+-----+

Expected result:

+-------+-----+-----+
|user_id| flag|group|
+-------+-----+-----+
|      1| true|    0|
|      1| true|    0|
|      1|false|    1|
|      1|false|    1|
|      1| true|    0|
|      1| true|    0|
|      2| true|    0|
|      2| true|    0|
|      2|false|    1|
|      2|false|    1|
|      2|false|    1|
|      2| true|    0|
|      2|false|    2|
|      2|false|    2|
|      2| true|    0|
|      3| true|    0|
|      4| true|    0|
|      5| true|    0|
|      5|false|    1|
|      5|false|    1|
+-------+-----+-----+

Any idea on how I can get this to work?

答案1

得分: 1

希望我理解您的问题正确。请检查我的答案以查看是否符合您的意图。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("consecutive_false_flags").getOrCreate()

data = [
    (1, True),
    (1, True),
    (1, False),
    (1, False),
    (1, True),
    (1, True),
    (2, True),
    (2, True),
    (2, False),
    (2, False),
    (2, False),
    (2, True),
    (2, False),
    (2, False),
    (2, True),
    (3, True),
    (4, True),
    (5, True),
    (5, False),
    (5, False)
]

df = spark.createDataFrame(data, ["user_id", "keep"])

# 创建一个有序的unique_id列,以便更容易使用窗口函数
df = df.withColumn("unique_id", monotonically_increasing_id())

# 使用lag函数创建一个标识新组开始的列
df = df.withColumn("new_group",
                   when(col("keep") == "true", 0)
                   .otherwise((lag("keep").over(Window.partitionBy("user_id").orderBy(col("unique_id"))) != col("keep")).
                              cast("int")))
df.show()

# 将new_group值相加以获取group值
df = df.withColumn("group", when(col("keep") == "true", 0)
                   .otherwise(sum("new_group").over(Window.partitionBy("user_id").orderBy(col("unique_id")))))
df.show()

# 删除两个多余的列以获得预期结果
df = df.drop("new_group", "unique_id")
df.show()

此代码用于在PySpark中处理数据,将相邻的False值分组,并生成新的group列。

英文:

Hope that I understand your question right. Please check my answer to see if it works as your intended

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("consecutive_false_flags").getOrCreate()

data = [
    (1, True),
    (1, True),
    (1, False),
    (1, False),
    (1, True),
    (1, True),
    (2, True),
    (2, True),
    (2, False),
    (2, False),
    (2, False),
    (2, True),
    (2, False),
    (2, False),
    (2, True),
    (3, True),
    (4, True),
    (5, True),
    (5, False),
    (5, False)
]

df = spark.createDataFrame(data, ["user_id", "keep"])

# Create an ordered unique_id column to make working with window fuction easier
df = df.withColumn("unique_id", monotonically_increasing_id())

# create a column that identifies when a new group starts using lag func
# by checking if it's change from true -> false or false -> true
# if it changes then convert to 1, else 0
# convert all the 'new_group' value to 0 if 'keep' is 'true', only care about 'false' value
df = df.withColumn("new_group",
                   when(col("keep") == "true", 0)
                   .otherwise((lag("keep").over(Window.partitionBy("user_id").orderBy(col("unique_id"))) != col("keep")).
                              cast("int")))
df.show()
# +-------+-----+-----------+---------+
# |user_id| keep|  unique_id|new_group|
# +-------+-----+-----------+---------+
# |      1| true|          0|        0|
# |      1| true|          1|        0|
# |      1|false|          2|        1|
# |      1|false|          3|        0|
# |      1| true|          4|        0|
# |      1| true| 8589934592|        0|
# |      2| true| 8589934593|        0|
# |      2| true| 8589934594|        0|
# |      2|false| 8589934595|        1|
# |      2|false| 8589934596|        0|
# |      2|false|17179869184|        0|
# |      2| true|17179869185|        0|
# |      2|false|17179869186|        1|
# |      2|false|17179869187|        0|
# |      2| true|17179869188|        0|
# |      3| true|25769803776|        0|
# |      4| true|25769803777|        0|
# |      5| true|25769803778|        0|
# |      5|false|25769803779|        1|
# |      5|false|25769803780|        0|
# +-------+-----+-----------+---------+

# add up the new_group values to get the group value
# still convert all the 'group' value to 0 if 'keep' is 'true', only care about 'false' value
df = df.withColumn("group", when(col("keep") == "true", 0)
                   .otherwise(sum("new_group").over(Window.partitionBy("user_id").orderBy(col("unique_id")))))
df.show()
# +-------+-----+-----------+---------+-----+
# |user_id| keep|  unique_id|new_group|group|
# +-------+-----+-----------+---------+-----+
# |      1| true|          0|        0|    0|
# |      1| true|          1|        0|    0|
# |      1|false|          2|        1|    1|
# |      1|false|          3|        0|    1|
# |      1| true|          4|        0|    0|
# |      1| true| 8589934592|        0|    0|
# |      2| true| 8589934593|        0|    0|
# |      2| true| 8589934594|        0|    0|
# |      2|false| 8589934595|        1|    1|
# |      2|false| 8589934596|        0|    1|
# |      2|false|17179869184|        0|    1|
# |      2| true|17179869185|        0|    0|
# |      2|false|17179869186|        1|    2|
# |      2|false|17179869187|        0|    2|
# |      2| true|17179869188|        0|    0|
# |      3| true|25769803776|        0|    0|
# |      4| true|25769803777|        0|    0|
# |      5| true|25769803778|        0|    0|
# |      5|false|25769803779|        1|    1|
# |      5|false|25769803780|        0|    1|
# +-------+-----+-----------+---------+-----+

# drop the 2 spare column to get expected result
df = df.drop("new_group", "unique_id")
df.show()

# +-------+-----+-----+
# |user_id| keep|group|
# +-------+-----+-----+
# |      1| true|    0|
# |      1| true|    0|
# |      1|false|    1|
# |      1|false|    1|
# |      1| true|    0|
# |      1| true|    0|
# |      2| true|    0|
# |      2| true|    0|
# |      2|false|    1|
# |      2|false|    1|
# |      2|false|    1|
# |      2| true|    0|
# |      2|false|    2|
# |      2|false|    2|
# |      2| true|    0|
# |      3| true|    0|
# |      4| true|    0|
# |      5| true|    0|
# |      5|false|    1|
# |      5|false|    1|
# +-------+-----+-----+

答案2

得分: 1

以下是代码的翻译部分:

from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

dataframe = dataframe.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())))

w = Window.partitionBy("user_id").orderBy('user_id')
ww = Window.partitionBy("user_id",'keep').orderBy('row_number')
dataframe = dataframe.withColumn("row_number", row_number().over(w)).withColumn("rank",rank().over(ww)).withColumn('diff',col('row_number')-col('rank'))

ww = Window.partitionBy("user_id",'keep').orderBy('diff')

dataframe = dataframe.withColumn("resu", dense_rank().over(ww)).withColumn('resu', when(col('keep')=='true',0).otherwise(col('resu')))

dataframe[["user_id",'keep','resu']].display()

以下是输出的翻译部分:

+-------+-----+-----+
|user_id| keep| resu|
+-------+-----+-----+
|      1| true|    0|
|      1| true|    0|
|      1|false|    1|
|      1|false|    1|
|      1| true|    0|
|      1| true|    0|
|      2| true|    0|
|      2| true|    0|
|      2|false|    1|
|      2|false|    1|
|      2|false|    1|
|      2| true|    0|
|      2|false|    2|
|      2|false|    2|
|      2| true|    0|
|      3| true|    0|
|      4| true|    0|
|      5| true|    0|
|      5|false|    1|
|      5|false|    1|
+-------+-----+-----+
英文:
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window 

dataframe = dataframe.withColumn( "index", row_number().over(Window.orderBy(monotonically_increasing_id()))) 

w = Window.partitionBy("user_id").orderBy('user_id') 
ww = Window.partitionBy("user_id",'keep').orderBy('row_number') 
dataframe = dataframe.withColumn("row_number", row_number().over(w)).withColumn("rank",rank().over(ww)).withColumn('diff',col('row_number')-col('rank')) 

ww = Window.partitionBy("user_id",'keep').orderBy('diff') 

dataframe = dataframe.withColumn("resu", dense_rank().over(ww)).withColumn('resu', when(col('keep')=='true',0).otherwise(col('resu'))) 

dataframe[["user_id",'keep','resu']].display() 

<b>Output:</b>


+-------+-----+-----+
|user_id| keep| resu|
+-------+-----+-----+
|      1| true|    0|
|      1| true|    0|
|      1|false|    1|
|      1|false|    1|
|      1| true|    0|
|      1| true|    0|
|      2| true|    0|
|      2| true|    0|
|      2|false|    1|
|      2|false|    1|
|      2|false|    1|
|      2| true|    0|
|      2|false|    2|
|      2|false|    2|
|      2| true|    0|
|      3| true|    0|
|      4| true|    0|
|      5| true|    0|
|      5|false|    1|
|      5|false|    1|
+-------+-----+-----+


</details>



huangapple
  • 本文由 发表于 2023年3月9日 14:45:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/75681230.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定