基于PySpark DataFrame中的日期差异创建二进制指示列

huangapple go评论82阅读模式
英文:

Creating a Binary Indicator Column Based on Date Differences in PySpark DataFrame

问题

I can help you with the translation. Here's the translated code:

我正在使用一个包含列 'ID''date''bool' 的 PySpark DataFrame'bool' 是一个指示器

要简化问题让我们假设我将数据框分区到变量 ID 上为了更清晰我只考虑 ID 为 1092829 的情况请注意DataFrame 根据日期升序排列

我想要创建一个名为 'D' 的列如果行 X 的变量 bool 为 1且行 Y位于行 X 之下的日期差小于 2'D' 等于 1否则等于 0

这将产生以下数据框

我知道如何对固定数量的行进行操作使用 pyspark lag 函数但这远非最优因为数据框中可能有 10 行观测在不到 2 周的时间跨度内

如何以计算高效的方式处理任意数量的行

Please note that the code parts are not translated, as requested. If you have any further questions or need assistance with the code, feel free to ask.

英文:

I am working with a PySpark DataFrame that contains columns 'ID', 'date', and 'bool'. 'bool' is an indicator.

 #+-------+-------------+----+
 #|   ID  |     date    |bool|
 #+-------+-------------+----+
 #|1092829|  2019-10-29 |  0 |
 #|3091902|  2019-12-14 |  1 |
 #|3091902|  2020-07-13 |  0 |
 #|1092829|  2020-07-15 |  1 |
 #|1092829|  2020-07-17 |  1 |
 #|1092829|  2020-08-18 |  1 |
 #|  ...  |     ...     | ...|
 #+-------+-------------+----+

To simplify, let's imagine that I partition my dataframe over the variable ID and, to be even clearer, that I only consider ID 1092829. Note that the DataFrame is ordered in ascending order based on the dates.

 #+-------+-------------+----+
 #|   ID  |     date    |bool|
 #+-------+-------------+----+
 #|1092829|  2019-10-29 |  0 |
 #|1092829|  2019-12-14 |  1 |
 #|1092829|  2020-07-15 |  1 |
 #|1092829|  2020-07-17 |  1 |
 #|1092829|  2020-07-19 |  0 |
 #|1092829|  2020-08-15 |  1 |
 #|1092829|  2020-09-10 |  0 |
 #|1092829|  2020-09-15 |  0 |
 #|1092829|  2020-09-20 |  1 | 
 #|  ...  |     ...     | ...|
 #+-------+-------------+----+

I want to create a column 'D' that is equal to 1 for a line X if the variable bool of a line Y (which is below line X) is equal to 1 and if the date difference between line X and line Y is less than 2 weeks, and 0 otherwhise.

This would yield the following dataframe

 #+-------+-------------+----+---+
 #|   ID  |     date    |bool| D |
 #+-------+-------------+----+---+
 #|1092829|  2019-10-29 |  0 | 0 |
 #|1092829|  2019-12-14 |  1 | 0 |
 #|1092829|  2020-07-15 |  1 | 1 |
 #|1092829|  2020-07-17 |  1 | 0 |
 #|1092829|  2020-07-19 |  0 | 0 |
 #|1092829|  2020-08-15 |  1 | 0 |
 #|1092829|  2020-09-10 |  0 | 1 |
 #|1092829|  2020-09-15 |  0 | 0 |
 #|1092829|  2020-09-20 |  1 | 0 |
 #|  ...  |     ...     | ...|...|
 #+-------+-------------+----+---+

I know how to do it for a fixed number of rows and with the use of pyspark lag function but it's far from optimal as there might be 10 lines of observations in less than a 2 week span in the dataframe.

An example with two lags :

import pyspark.sql.functions as F

df = df.withColumn("D", F.when((F.datediff(df.date, F.lag(df.date,-1))<14) & (F.lag(df.bool,-1)==1)\
| (F.datediff(df.date, F.lag(df.date,-2))<14) & (F.lag(df.bool,-2)==1),1).otherwise(0))

How could I do it for any number of rows in a computationally efficient way?

答案1

得分: 1

I think this logic is possible. 在两周范围内计算布尔值为1的次数,如果大于1,则为目标。

from pyspark.sql import functions as f
from pyspark.sql import Window

w = Window.partitionBy('ID').orderBy('timestamp').rangeBetween(0, 14 * 86400)

df.withColumn('timestamp', f.unix_timestamp('date', 'y-M-d')) \
  .withColumn('bool_in_2weeks', f.count(f.when(f.col('bool') == f.lit(1), True)).over(w)) \
  .withColumn('D', f.expr('bool = 1 and bool_in_2weeks > 1').cast('int')) \
  .show(truncate=False)
+-------+----+----------+----------+--------------+---+
|ID     |bool|date      |timestamp |bool_in_2weeks|D  |
+-------+----+----------+----------+--------------+---+
|1092829|0   |2019-10-29|1572307200|0             |0  |
|1092829|1   |2019-12-14|1576281600|1             |0  |
|1092829|1   |2020-07-15|1594771200|2             |1  |
|1092829|1   |2020-07-17|1594944000|1             |0  |
|1092829|0   |2020-07-19|1595116800|0             |0  |
|1092829|1   |2020-08-15|1597449600|1             |0  |
|1092829|1   |2020-09-10|1599696000|1             |0  |
+-------+----+----------+----------+--------------+---+
英文:

I think this logic is possible. Count bool = 1 between 2 weeks range and if it is larget than 1, it is the target.

from pyspark.sql import functions as f
from pyspark.sql import Window

w = Window.partitionBy('ID').orderBy('timestamp').rangeBetween(0, 14 * 86400)
​
df.withColumn('timestamp', f.unix_timestamp('date', 'y-M-d')) \
  .withColumn('bool_in_2weeks', f.count(f.when(f.col('bool') == f.lit(1), True)).over(w)) \
  .withColumn('D', f.expr('bool = 1 and bool_in_2weeks > 1').cast('int')) \
  .show(truncate=False)
+-------+----+----------+----------+--------------+---+
|ID     |bool|date      |timestamp |bool_in_2weeks|D  |
+-------+----+----------+----------+--------------+---+
|1092829|0   |2019-10-29|1572307200|0             |0  |
|1092829|1   |2019-12-14|1576281600|1             |0  |
|1092829|1   |2020-07-15|1594771200|2             |1  |
|1092829|1   |2020-07-17|1594944000|1             |0  |
|1092829|0   |2020-07-19|1595116800|0             |0  |
|1092829|1   |2020-08-15|1597449600|1             |0  |
|1092829|1   |2020-09-10|1599696000|1             |0  |
+-------+----+----------+----------+--------------+---+

答案2

得分: 0

以下是翻译好的部分:

我拿了你的示例并运行了一些测试。这是我得出的结果:

df = spark.createDataFrame(
    [
        {"ID": 1092829, "date": "2019-10-29", "bool": 0},
        {"ID": 1092829, "date": "2019-12-14", "bool": 1},
        {"ID": 1092829, "date": "2020-07-15", "bool": 1},
        {"ID": 1092829, "date": "2020-07-17", "bool": 1},
        {"ID": 1092829, "date": "2020-07-19", "bool": 0},
        {"ID": 1092829, "date": "2020-08-15", "bool": 1},
        {"ID": 1092829, "date": "2020-09-10", "bool": 1},
    ]
)

对于你的情况,使用窗口函数既更简单又更合适。窗口根据你的“ID”列对DataFrame进行分区。这将确保在应用延迟时,不会混淆两个不同的“ID”值的日期/布尔值。在窗口内,你需要按升序对日期进行排序以进行datediff操作。

from pyspark.sql.window import Window
import pyspark.sql.functions as F

window = Window.partitionBy("ID").orderBy(F.col("ID").asc(), F.col("date").asc())

transformed_df = df.withColumn(
    "D",
    F.coalesce(
        # 对“date”应用lag操作,-1表示将下一行的值带到当前行的级别以进行`datediff`的2周条件检查。
        (F.datediff(F.lag("date", -1, None).over(window), F.col("date")) < 14)
        # 对“bool”应用lag操作,-1表示比较给定行的下一行是否为1。
        & (
            F.lag("bool", -1, None).over(window) == F.lit(1)
        ),  # 比较和“&”生成布尔列
        F.lit(
            False
        ),  # 对于给定“ID”的最后一个`date`值由于lag而为NULL的情况,我们默认将其设置为False。
    ).cast(
        "integer"
    ),  # 将布尔结果转换为二进制值的整数。
)

transformed_df.show()

结果DataFrame:

#+-------+----------+----+---+
#|     ID|      date|bool|  D|
#+-------+----------+----+---+
#|1092829|2019-10-29|   0|  0|
#|1092829|2019-12-14|   1|  0|
#|1092829|2020-07-15|   1|  1|
#|1092829|2020-07-17|   1|  0|
#|1092829|2020-07-19|   0|  0|
#|1092829|2020-08-15|   1|  0|
#|1092829|2020-09-10|   1|  0|
#+-------+----------+----+---+

希望这回答了你的问题。

英文:

I took your example and ran a few tests. This is what I came up with:

df = spark.createDataFrame(
    [
        {&quot;ID&quot;: 1092829, &quot;date&quot;: &quot;2019-10-29&quot;, &quot;bool&quot;: 0},
        {&quot;ID&quot;: 1092829, &quot;date&quot;: &quot;2019-12-14&quot;, &quot;bool&quot;: 1},
        {&quot;ID&quot;: 1092829, &quot;date&quot;: &quot;2020-07-15&quot;, &quot;bool&quot;: 1},
        {&quot;ID&quot;: 1092829, &quot;date&quot;: &quot;2020-07-17&quot;, &quot;bool&quot;: 1},
        {&quot;ID&quot;: 1092829, &quot;date&quot;: &quot;2020-07-19&quot;, &quot;bool&quot;: 0},
        {&quot;ID&quot;: 1092829, &quot;date&quot;: &quot;2020-08-15&quot;, &quot;bool&quot;: 1},
        {&quot;ID&quot;: 1092829, &quot;date&quot;: &quot;2020-09-10&quot;, &quot;bool&quot;: 1},
    ]
)

For your case, it is both simpler and more adequate to use Window functions.
The window is constructed by partitioning your DataFrame according to your "ID" column. This will guarantee that no two different "ID" values will get their date/bool mixed up when applying the lag. Within the window, you need to sort the date in ascending order for the datediff operation.

from pyspark.sql.window import Window
import pyspark.sql.functions as F

window = Window.partitionBy(&quot;ID&quot;).orderBy(F.col(&quot;ID&quot;).asc(), F.col(&quot;date&quot;).asc())

transformed_df = df.withColumn(
    &quot;D&quot;,
    F.coalesce(
        # Apply lag on &quot;date&quot; by -1 (bring next row&#39;s value to current row&#39;s level) to do `datediff` for the 2-week condition.
        (F.datediff(F.lag(&quot;date&quot;, -1, None).over(window), F.col(&quot;date&quot;)) &lt; 14)
        # Apply lag on &quot;bool&quot; by -1 to compare, for a given row, if the next row has a 1 or not.
        &amp; (
            F.lag(&quot;bool&quot;, -1, None).over(window) == F.lit(1)
        ),  # The comparisons and the &quot;&amp;&quot; produce a boolean column
        F.lit(
            False
        ),  # For the case when the last `date` value for a given &quot;ID&quot; is NULL because of the lag, we set it to False by default.
    ).cast(
        &quot;integer&quot;
    ),  # Cast the boolean result into an integer for a binary value.
)

transformed_df.show()

Resulting DataFrame:

#+-------+----------+----+---+
#|     ID|      date|bool|  D|
#+-------+----------+----+---+
#|1092829|2019-10-29|   0|  0|
#|1092829|2019-12-14|   1|  0|
#|1092829|2020-07-15|   1|  1|
#|1092829|2020-07-17|   1|  0|
#|1092829|2020-07-19|   0|  0|
#|1092829|2020-08-15|   1|  0|
#|1092829|2020-09-10|   1|  0|
#+-------+----------+----+---+

I hope this answers your question.

huangapple
  • 本文由 发表于 2023年7月6日 22:42:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/76630010.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定