在PySpark中如何标记行。

huangapple go评论129阅读模式
英文:

How to label rows in PySpark

问题

这是你尝试的代码,但存在问题:

def label(df_):
    remove = ['type1']
    df_ = (
        df_
        .withColumn('type1', F.when((F.col("Event") == 0) & (F.lag(F.col("Event"), 1).over(Window.partitionBy('ID').orderBy('Timestamp')) == 1), 
                                         F.lit('after')))
        .withColumn('type2', F.when((F.col("isHypoProtectEnabled") == 0) & ((F.lag(F.col("Event"), 1).over(Window.partitionBy('ID').orderBy('Timestamp')) == 1) | (F.lag(F.col("type1"), 1).over(Window.partitionBy('ID').orderBy('Timestamp')) == 'after')), 
                                         F.lit('after')).otherwise(F.lit('before')))
    )
    df_ = df_.drop(*remove)
    return df_

你提到代码中存在问题,它只返回了两个'after'标签,但你希望每个ID都有更多的'after'标签。这个问题可能是由于窗口函数的分区和排序不正确引起的。

要解决这个问题,你可以尝试以下更改:

def label(df_):
    remove = ['type1']
    df_ = (
        df_
        .withColumn('type1', F.when((F.col("Event") == 0) & (F.lag(F.col("Event"), 1).over(Window.partitionBy('ID').orderBy('Timestamp')) == 1), 
                                         F.lit('after')))
        .withColumn('type2', F.when((F.col("isHypoProtectEnabled") == 0) & ((F.col("type1") == 'after') | (F.lag(F.col("type1"), 1).over(Window.partitionBy('ID').orderBy('Timestamp')) == 'after')), 
                                         F.lit('after')).otherwise(F.lit('before')))
    )
    df_ = df_.drop(*remove)
    return df_

这个更改应该能够正确地为每个ID生成更多的'after'标签,取决于前一个'after'标签的数量。

英文:

I have the following dataframe in Pyspark:

ID Timestamp Event
1 1657610298 0
1 1657610299 0
1 1657610300 0
1 1657610301 1
1 1657610302 0
1 1657610303 0
1 1657610304 0
2 1657610298 0
2 1657610299 0
2 1657610300 0
2 1657610301 1
2 1657610302 0
2 1657610303 0
2 1657610304 0

I need the following output:

| ID | Timestamp | Event | Type |
| -------- | -------- |-----------
| 1 | 1657610298 | 0 | before
| 1 | 1657610299 | 0 | before
| 1 | 1657610300 | 0 | before
| 1 | 1657610301 | 1 | event
| 1 | 1657610302 | 0 | after
| 1 | 1657610303 | 0 | after
| 1 | 1657610304 | 0 | after
| 2 | 1657610298 | 0 | before
| 2 | 1657610299 | 0 | before
| 2 | 1657610300 | 0 | before
| 2 | 1657610301 | 1 | event
| 2 | 1657610302 | 0 | after
| 2 | 1657610303 | 0 | after
| 2 | 1657610304 | 0 | after

I tried the following function:

def label(df_):
    remove = ['type1']
    df_ = (
        df_
        .withColumn('type1', F.when((F.col("Event") == 0) & (F.lag(F.col("Event"), 1).over(Window.partitionBy('ID').orderBy('Timestamp')) == 1), 
                                         F.lit('after')))
        .withColumn('type2', F.when((F.col("isHypoProtectEnabled") == 0) & ((F.lag(F.col("Event"), 1).over(Window.partitionBy('ID').orderBy('Timestamp')) == 1) | (F.lag(F.col("type1"), 1).over(Window.partitionBy('ID').orderBy('Timestamp')) == 'after')), 
                                         F.lit('after')).otherwise(F.lit('before')))
    )
    df_ = df_.drop(*remove)
    return df_

What I get:

| **ID**   | **Timestamp** | **Event** | **type2** |
| -------- | --------      |-----------
| 1        | 1657610298    | 0         | before
| 1        | 1657610299    | 0         | before
| 1        | 1657610300    | 0         | before
| 1        | 1657610301    | 1         | event
| 1        | 1657610302    | 0         | after
| 1        | 1657610303    | 0         | after
| 1        | 1657610304    | 0         | *before <- error* should be after
| 2        | 1657610298    | 0         | before
| 2        | 1657610299    | 0         | before
| 2        | 1657610300    | 0         | before
| 2        | 1657610301    | 1         | event
| 2        | 1657610302    | 0         | after
| 2        | 1657610303    | 0         | after
| 2        | 1657610304    | 0         | *before <- error* should be after

Obviously is not working, because in order for it to label all the 'after' correctly I would have to loop the function... right now I'm only getting TWO 'after's per ID, when it should be more, I'm sure there's another way to do it, but I'm blocked please help me

答案1

得分: 1

根据您提供的内容,以下是翻译后的代码部分:

df_windw = df.withColumn("temp_col", max(col("Event")).over(Window.partitionBy('ID').orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow))).\
  withColumn("type2", when((col("event") == 0) & (col("temp_col") == 0), lit("before")).\
    when((col("event") == 0) & (col("temp_col") == 1), lit("after")).\
      otherwise(lit("event"))).\
        drop("temp_col")
df_windw.show(100,False)

希望这对您有所帮助!

英文:

Create temp_col based on the max values from unboundedPreceeding, current_row and then check

  • if both columns are 0 then before
  • if event column is 0 and if temp_col columns are 1 then after.
  • else its an event.

Example:

df_windw = df.withColumn("temp_col",max(col("Event")).over(Window.partitionBy('ID').orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow))).\
  withColumn("type2", when((col("event")== 0) & (col("temp_col")==0),lit("before")).\
    when((col("event")== 0) & (col("temp_col")==1),lit("after")).\
      otherwise(lit("event"))).\
        drop("temp_col")
df_windw.show(100,False)
#+---+----------+-----+------+
#|ID |timestamp |event|type2 |
#+---+----------+-----+------+
#|1  |1657610298|0    |before|
#|1  |1657610299|0    |before|
#|1  |1657610300|0    |before|
#|1  |1657610301|1    |event |
#|1  |1657610302|0    |after |
#|1  |1657610303|0    |after |
#|1  |1657610304|0    |after |
#|2  |1657610298|0    |before|
#|2  |1657610299|0    |before|
#|2  |1657610300|0    |before|
#|2  |1657610301|1    |event |
#|2  |1657610302|0    |after |
#|2  |1657610303|0    |after |
#|2  |1657610304|0    |after |
#+---+----------+-----+------+

答案2

得分: 1

以下是另一种解决方案,但没有使用窗口函数:

from pyspark.sql.functions import when

# 创建样本数据
data = [
    (1, 1657610298, 0),
    (1, 1657610299, 0),
    (1, 1657610300, 0),
    (1, 1657610301, 1),
    (1, 1657610302, 0),
    (1, 1657610303, 0),
    (1, 1657610304, 0),
    (2, 1657610298, 0),
    (2, 1657610299, 0),
    (2, 1657610300, 0),
    (2, 1657610301, 1),
    (2, 1657610302, 0),
    (2, 1657610303, 0),
    (2, 1657610304, 0)
]

# 从样本数据创建数据框(包括所有数据和仅事件)
df_all = spark.createDataFrame(data, ["ID", "Timestamp", "Event"])
df_events = df_all.filter("Event==1")

# 根据ID将df_all与df_events连接
joined_df = df_all.join(
    df_events.withColumnRenamed('Timestamp', 'EventTimestamp').withColumnRenamed('Event', 'EventEvent'), "ID", "left")

# 根据条件添加“Type”列
result_df = joined_df.withColumn(
    "Type",
    when(joined_df.Timestamp < joined_df.EventTimestamp, "before")
    .when(joined_df.Timestamp == joined_df.EventTimestamp, "event")
    .otherwise("after")
)

# 删除连接创建的重复列
result_df = result_df.drop("EventTimestamp").drop("EventEvent")

# 打印输出
result_df.show(1000, False)

输出如下:

+---+----------+-----+------+
|ID |Timestamp |Event|Type  |
+---+----------+-----+------+
|1  |1657610298|0    |before|
|1  |1657610299|0    |before|
|1  |1657610300|0    |before|
|1  |1657610301|1    |event |
|1  |1657610302|0    |after |
|1  |1657610303|0    |after |
|1  |1657610304|0    |after |
|2  |1657610298|0    |before|
|2  |1657610299|0    |before|
|2  |1657610300|0    |before|
|2  |1657610301|1    |event |
|2  |1657610302|0    |after |
|2  |1657610303|0    |after |
|2  |1657610304|0    |after |
+---+----------+-----+------+
英文:

Here is another solution, but without window function:

from pyspark.sql.functions import when

# Create sample data
data = [
    (1, 1657610298, 0),
    (1, 1657610299, 0),
    (1, 1657610300, 0),
    (1, 1657610301, 1),
    (1, 1657610302, 0),
    (1, 1657610303, 0),
    (1, 1657610304, 0),
    (2, 1657610298, 0),
    (2, 1657610299, 0),
    (2, 1657610300, 0),
    (2, 1657610301, 1),
    (2, 1657610302, 0),
    (2, 1657610303, 0),
    (2, 1657610304, 0)
]
# Create DataFrames from the sample data (all data and just events)
df_all = spark.createDataFrame(data, [&quot;ID&quot;, &quot;Timestamp&quot;, &quot;Event&quot;])
df_events = df_all.filter(&quot;Event==1&quot;)

# Join df_all with df_events based on ID
joined_df = df_all.join(
    df_events.withColumnRenamed(&#39;Timestamp&#39;, &#39;EventTimestamp&#39;).withColumnRenamed(&#39;Event&#39;, &#39;EventEvent&#39;), &quot;ID&quot;, &quot;left&quot;)

# Add the &quot;Type&quot; column based on conditions
result_df = joined_df.withColumn(
    &quot;Type&quot;,
    when(joined_df.Timestamp &lt; joined_df.EventTimestamp, &quot;before&quot;)
    .when(joined_df.Timestamp == joined_df.EventTimestamp, &quot;event&quot;)
    .otherwise(&quot;after&quot;)
)

# Drop the duplicated columns created by join
result_df = result_df.drop(&quot;EventTimestamp&quot;).drop(&quot;EventEvent&quot;)

# Print the output
result_df.show(1000, False)

And the output is:

+---+----------+-----+------+                                                   
|ID |Timestamp |Event|Type  |
+---+----------+-----+------+
|1  |1657610298|0    |before|
|1  |1657610299|0    |before|
|1  |1657610300|0    |before|
|1  |1657610301|1    |event |
|1  |1657610302|0    |after |
|1  |1657610303|0    |after |
|1  |1657610304|0    |after |
|2  |1657610298|0    |before|
|2  |1657610299|0    |before|
|2  |1657610300|0    |before|
|2  |1657610301|1    |event |
|2  |1657610302|0    |after |
|2  |1657610303|0    |after |
|2  |1657610304|0    |after |
+---+----------+-----+------+

huangapple
  • 本文由 发表于 2023年5月14日 18:52:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76247075.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定