Spark 2.3 中的 Pyspark 序列等效部分

huangapple go评论61阅读模式
英文:

Pyspark sequence equivalent in spark 2.3

问题

我能够使用序列函数(从Spark 2.4版本开始提供)生成在两个日期之间发生的日期列的时间序列。
我的生产系统使用的是Spark 2.3版本。如何在Spark 2.3中实现相同的功能?
以下是使用序列函数的代码片段。

data1 = [
    (1, "2022-09-01", "2023-01-01", 1),
    (1, "2022-09-01", "2023-02-01", 1),
    (1, "2022-09-11", "2023-01-01", 2),
    (1, "2022-09-01", "2023-01-01", 2),
    (1, "2022-09-21", "2023-01-01", 1),
]
df1 = spark.createDataFrame(
    data1, ["item", "start_d", "activation_d", "dept_id"]
)
df1 = df1.withColumn(
    "week_start",
    SF.explode(SF.expr("sequence(start_d, activation_d, interval 7 day)")),
)

如何在不使用序列函数的情况下实现相同的功能。

英文:

I am able to generate a time series of date column that occurs between 2 dates using sequence function(available from spark 2.4)
My production system has spark 2.3. How can I achieve same thing using spark 2.3
Below is the code snippet using sequence function.

data1 = [
    (1, "2022-09-01", "2023-01-01", 1),
    (1, "2022-09-01", "2023-02-01", 1),
    (1, "2022-09-11", "2023-01-01", 2),
    (1, "2022-09-01", "2023-01-01", 2),
    (1, "2022-09-21", "2023-01-01", 1),
]
df1 = spark.createDataFrame(
    data1, ["item", "start_d", "activation_d", "dept_id"]
)
df1 = df1.withColumn(
    "week_start",
    SF.explode(SF.expr("sequence(start_d, activation_d, interval 7 day)")),
)

How do the same thing without using sequence function.

答案1

得分: 0

我可以使用udf函数来实现相同的功能。以下是udf函数及其用法供参考:

def generate_dates(start_date, end_date, interval):
    dates = []
    current_date = start_date
    while current_date <= end_date:
        dates.append(current_date)
        current_date += timedelta(days=interval)
    return dates

spark.udf.register("generate_dates", generate_dates, ArrayType(DateType()))

data1 = [
    (1, "2022-09-01", "2023-01-01", 1),
    (1, "2022-09-01", "2023-02-01", 1),
    (1, "2022-09-11", "2023-01-01", 2),
    (1, "2022-09-01", "2023-01-01", 2),
    (1, "2022-09-21", "2023-01-01", 1),
]
df1 = spark.createDataFrame(
    data1, ["item", "start_date", "activation_date", "dept_id"]
)

df1 = df1.withColumn(
    "week_start",
    SF.explode(SF.expr("generate_dates(start_date, activation_date, 7)")),
)
英文:

I was able to achieve the same thing using udf function. Here is the udf function and its use for reference

def generate_dates(start_date, end_date, interval):
    dates = []
    current_date = start_date
    while current_date &lt;= end_date:
        dates.append(current_date)
        current_date += timedelta(days=interval)
    return dates

spark.udf.register(&quot;generate_dates&quot;, generate_dates, ArrayType(DateType()))

data1 = [
    (1, &quot;2022-09-01&quot;, &quot;2023-01-01&quot;, 1),
    (1, &quot;2022-09-01&quot;, &quot;2023-02-01&quot;, 1),
    (1, &quot;2022-09-11&quot;, &quot;2023-01-01&quot;, 2),
    (1, &quot;2022-09-01&quot;, &quot;2023-01-01&quot;, 2),
    (1, &quot;2022-09-21&quot;, &quot;2023-01-01&quot;, 1),
]
df1 = spark.createDataFrame(
    data1, [&quot;item&quot;, &quot;start_d&quot;, &quot;activation_d&quot;, &quot;dept_id&quot;]
)

df1 = df1.withColumn(
    &quot;week_start&quot;,
    SF.explode(SF.expr(&quot;generate_dates(start_date, activation_date, 7)&quot;)),
)

huangapple
  • 本文由 发表于 2023年6月19日 18:51:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/76505913.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定