PySpark多条件筛选

huangapple go评论104阅读模式
英文:

PySpark filtering on multiple criteria

问题

以下是您要求的代码部分的中文翻译:

  1. from pyspark.sql import functions as f
  2. from pyspark.sql import Window
  3. df = spark.createDataFrame(data).toDF('Name', 'ID', 'ContractDate', 'LoanSum', 'ClosingDate')
  4. df.show()
  5. cols = df.columns
  6. w = Window.partitionBy('ID').orderBy('ContractDate')
  7. df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) \
  8. .withColumn('Target', f.expr('datediff(ContractDate, PreviousContractDate) == 0')) \
  9. .withColumn('Target', f.col('Target') | f.lead('Target').over(w)) \
  10. .filter('Target == True')

请注意,这是您提供的代码的翻译部分。如果您需要进一步的帮助或有其他问题,请随时提出。

英文:

My example (dataframe) is like this:

  1. Name ID ContractDate LoanSum ClosingDate
  2. A ID1 2022-10-10 10 2022-10-16
  3. A ID1 2022-10-10 15 2022-10-18
  4. A ID1 2022-10-20 20 2022-10-31
  5. A ID1 2022-10-20 20 2022-10-30
  6. A ID1 2022-11-10 14 2022-11-22
  7. A ID1 2022-11-10 15 2022-11-22
  8. B ID2 2022-11-11 15 2022-11-15
  9. B ID2 2022-11-11 30 2022-11-18
  10. B ID2 2022-11-17 35 2022-11-22
  11. B ID2 2022-11-17 35 2022-11-24
  12. C ID3 2022-12-19 19 2022-11-10

My goal is to create a new dataframe that contains all loans issued to specific borrowers (group by unique ID) given the following conditions:

  • two loans should be granted at the same day;
  • next two loans should be also granted at the same day but the time difference between ClosingDate of any of the previously issued loans should not exceed 5 days from the newly issued two loans.

In other words my desirable outcome is like this:

  1. Name ID ContractDate LoanSum ClosingDate
  2. A ID1 2022-10-10 10 2022-10-16
  3. A ID1 2022-10-10 15 2022-10-18
  4. A ID1 2022-10-20 20 2022-10-31
  5. A ID1 2022-10-20 20 2022-10-30

(the difference between first ClosingDate and next ContractDate in this example is 4 (2020-10-20 minus 2022-10-16))

I've performed the following code that gives me an opportunity to get two or more loans issued to a specific borrower at the same date (grouped by ID):

  1. from pyspark.sql import functions as f
  2. from pyspark.sql import Window
  3. df = spark.createDataFrame(data).toDF('Name','ID','ContractDate','LoanSum','ClosingDate')
  4. df.show()
  5. cols = df.columns
  6. w = Window.partitionBy('ID').orderBy('ContractDate')
  7. df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) \
  8. .withColumn('Target', f.expr('datediff(ContractDate, PreviousContractDate) == 0')) \
  9. .withColumn('Target', f.col('Target') | f.lead('Target').over(w)) \
  10. .filter('Target == True')

But I am stuck to filter.

Any help is highly appreciated!

答案1

得分: 1

使用groupBy来获取一天的贷款信息,并使用Window应用条件。

  1. w = Window.partitionBy('ID').orderBy('MaxClosingDate')
  2. df.groupBy('Name', 'ID', 'ContractDate') \
  3. .agg(
  4. f.collect_list(f.struct('LoanSum', 'ClosingDate')).alias('LoanInfo'),
  5. f.max('ClosingDate').alias('MaxClosingDate')
  6. ) \
  7. .withColumn('is2LoanTarget', f.size('LoanInfo') == 2) \
  8. .withColumn('is5DaysTarget', f.datediff(f.col('ContractDate'), f.lag('MaxClosingDate').over(w)).between(0, 5)) \
  9. .withColumn('is5DaysTarget', f.col('is5DaysTarget') | f.lead('is5DaysTarget').over(w)) \
  10. .filter('is2LoanTarget and is5DaysTarget') \
  11. .withColumn('LoanInfo', f.explode('LoanInfo')) \
  12. .select('Name', 'ID', 'ContractDate', 'LoanInfo.*') \
  13. .show()

+----+---+------------+-------+-----------+
|Name| ID|ContractDate|LoanSum|ClosingDate|
+----+---+------------+-------+-----------+
| A|ID1| 2022-10-10| 10| 2022-10-16|
| A|ID1| 2022-10-10| 15| 2022-10-18|
| A|ID1| 2022-10-20| 20| 2022-10-31|
| A|ID1| 2022-10-20| 20| 2022-10-30|
+----+---+------------+-------+-----------+

  1. <details>
  2. <summary>英文:</summary>
  3. Use `groupBy` to get the loans info for a day and apply the conditions by using the `Window`.

w = Window.partitionBy('ID').orderBy('MaxClosingDate')

df.groupBy('Name', 'ID', 'ContractDate')
.agg(
f.collect_list(f.struct('LoanSum', 'ClosingDate')).alias('LoanInfo'),
f.max('ClosingDate').alias('MaxClosingDate')
)
.withColumn('is2LoanTarget', f.size('LoanInfo') == 2)
.withColumn('is5DaysTarget', f.datediff(f.col('ContractDate'), f.lag('MaxClosingDate').over(w)).between(0, 5))
.withColumn('is5DaysTarget', f.col('is5DaysTarget') | f.lead('is5DaysTarget').over(w))
.filter('is2LoanTarget and is5DaysTarget')
.withColumn('LoanInfo', f.explode('LoanInfo'))
.select('Name', 'ID', 'ContractDate', 'LoanInfo.*')
.show()

+----+---+------------+-------+-----------+
|Name| ID|ContractDate|LoanSum|ClosingDate|
+----+---+------------+-------+-----------+
| A|ID1| 2022-10-10| 10| 2022-10-16|
| A|ID1| 2022-10-10| 15| 2022-10-18|
| A|ID1| 2022-10-20| 20| 2022-10-31|
| A|ID1| 2022-10-20| 20| 2022-10-30|
+----+---+------------+-------+-----------+

  1. </details>

huangapple
  • 本文由 发表于 2023年7月28日 05:24:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/76783485.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定