英文:
Complex Filtering Operations in PySpark
问题
目前我正在对一个包含借款人还款信息的数据库进行计算。这是一个庞大的数据集,所以我正在使用PySpark,并且刚刚遇到了如何使用高级过滤操作的问题。
我的数据框看起来是这样的:
```plaintext
Name ID 合同日期 借款总额 结束日期
A ID1 2022-10-10 10 2022-10-15
A ID1 2022-10-15 15 null
A ID1 2022-10-30 20 2022-11-10
B ID2 2022-11-11 15 2022-10-14
B ID2 2022-12-10 30 null
B ID2 2022-12-12 35 2022-12-14
C ID3 2022-12-19 19 2022-11-10
D ID4 2022-12-10 10 null
D ID4 2022-12-12 40 2022-11-29
我的目标是创建一个包含所有发放给特定借款人的贷款的数据框(按唯一ID分组),其中第一笔贷款尚未结清,但第二笔已经发放给借款人,并且贷款总额之差小于或等于5。
换句话说,我必须获得以下表格(预期结果):
Name ID 合同日期 借款总额 状态
A ID1 2022-10-15 15 null
A ID1 2022-10-30 20 2022-11-10
B ID3 2022-12-10 30 null
B ID3 2022-12-12 35 2022-12-14
提前感谢
<details>
<summary>英文:</summary>
Currently I'm performing calculations on a database that contains information on how loans are paid by borrowers. It is a huge dataset so I'm using PySpark and have just faced with an issue of how to use advanced filtering operations.
My dataframe looks like this:
Name ID ContractDate LoanSum ClosingDate
A ID1 2022-10-10 10 2022-10-15
A ID1 2022-10-15 15 null
A ID1 2022-10-30 20 2022-11-10
B ID2 2022-11-11 15 2022-10-14
B ID2 2022-12-10 30 null
B ID2 2022-12-12 35 2022-12-14
C ID3 2022-12-19 19 2022-11-10
D ID4 2022-12-10 10 null
D ID4 2022-12-12 40 2022-11-29
My goal is to create a dataframe that contains all loans issued to specific borrowers (group by unique ID) where the the first loan is not yet closed, but the second is already given to a borrower and the difference between loansums is less or equal then 5.
In other words, I have to obtain the following table (expected result):
Name ID ContractDate LoanSum Status
A ID1 2022-10-15 15 null
A ID1 2022-10-30 20 2022-11-10
B ID3 2022-12-10 30 null
B ID3 2022-12-12 35 2022-12-14
Thank you in advance
</details>
# 答案1
**得分**: 1
你可以使用PySpark的[窗口函数](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Window.html)来按照唯一标识分区。要检查下一笔贷款是否已关闭,可以使用[Lead函数](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.lead.html)。类似地,Lag函数可以基于分区获取前一记录的值。
在这个示例中,我使用了Lead和Lag函数来确保两行都满足条件。这对于创建检查列并根据检查输出这些行非常有用。
请检查此解决方案。
```python
from datetime import date
from decimal import Decimal
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import *
import pyspark.sql.functions as f
data = [
('A', 'ID1', '2022-10-10', 10, '2022-10-150'),
('A', 'ID1', '2022-10-15', 15, 'null'),
('A', 'ID1', '2022-10-30', 20, '2022-11-10'),
('B', 'ID2', '2022-11-11', 15, '2022-10-14'),
('B', 'ID2', '2022-12-10', 30, 'null'),
('B', 'ID2', '2022-12-12', 35, '2022-12-14'),
('C', 'ID3', '2022-12-19', 19, '2022-11-10'),
('D', 'ID4', '2022-12-10', 10, 'null'),
('D', 'ID4', '2022-12-12', 40, '2022-11-29')
]
cols = ['Name', 'ID', 'ContractDate', 'LoanSum', 'ClosingDate']
spark = SparkSession.builder.appName('test').getOrCreate()
df = spark.createDataFrame(data, cols)
print('输入---->')
df.show()
next_record = f.lead(f.col('ClosingDate')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("next_ClosingDate")
prev_record = f.lag(f.col('ClosingDate')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("prev_ClosingDate")
next_loan_sum = f.lead(f.col('LoanSum')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("next_LoanSum")
prev_loan_sum = f.lag(f.col('LoanSum')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("prev_LoanSum")
print('输出---->')
df.withColumn(
'check',
f.when(
((f.col('ClosingDate') == 'null') & (next_record != 'null')) |
((f.col('ClosingDate') != 'null') & (prev_record == 'null')), 1
).otherwise(0)
).withColumn('loan_sum_check',
f.when(((next_loan_sum - f.col('LoanSum')) <= 5) | ((f.col('LoanSum') - prev_loan_sum) <= 5), 1)
.otherwise(0)
).filter('check=1 and loan_sum_check=1').drop('check', 'loan_sum_check').show()
结果输出
输入---->
+----+---+------------+-------+-----------+
|Name| ID|ContractDate|LoanSum|ClosingDate|
+----+---+------------+-------+-----------+
| A|ID1| 2022-10-10| 10|2022-10-150|
| A|ID1| 2022-10-15| 15| null|
| A|ID1| 2022-10-30| 20| 2022-11-10|
| B|ID2| 2022-11-11| 15| 2022-10-14|
| B|ID2| 2022-12-10| 30| null|
| B|ID2| 2022-12-12| 35| 2022-12-14|
| C|ID3| 2022-12-19| 19| 2022-11-10|
| D|ID4| 2022-12-10| 10| null|
| D|ID4| 2022-12-12| 40| 2022-11-29|
+----+---+------------+-------+-----------+
输出---->
+----+---+------------+-------+-----------+
|Name| ID|ContractDate|LoanSum|ClosingDate|
+----+---+------------+-------+-----------+
| A|ID1| 2022-10-15| 15| null|
| A|ID1| 2022-10-30| 20| 2022-11-10|
| B|ID2| 2022-12-10| 30| null|
| B|ID2| 2022-12-12| 35| 2022-12-14|
+----+---+------------+-------+-----------+
英文:
You can use PySpark Window functions PartitionBy Unique ID. To check if next loan is already closed you can use Lead Function. Similarly Lag get the previous record value based on partition.
Here in this example, I used Lead and Lag together to make sure that both criteria meet on both rows. This is useful to create a check column and output these rows based on the check.
Check this solution.
from datetime import date
from decimal import Decimal
from pyspark.sql import SparkSession,Window
from pyspark.sql.types import *
import pyspark.sql.functions as f
data = [
('A','ID1','2022-10-10',10,'2022-10-150'),
('A','ID1','2022-10-15',15,'null'),
('A','ID1','2022-10-30',20,'2022-11-10'),
('B','ID2','2022-11-11',15,'2022-10-14'),
('B','ID2','2022-12-10',30,'null'),
('B','ID2','2022-12-12',35,'2022-12-14'),
('C','ID3','2022-12-19',19,'2022-11-10'),
('D','ID4','2022-12-10',10,'null'),
('D','ID4','2022-12-12',40,'2022-11-29')]
cols = ['Name','ID','ContractDate','LoanSum','ClosingDate']
spark = SparkSession.builder.appName('test').getOrCreate()
df = spark.createDataFrame(data, cols)
print('Input---->')
df.show()
next_record = f.lead(f.col('ClosingDate')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("next_ClosingDate")
prev_record = f.lag(f.col('ClosingDate')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("prev_ClosingDate")
next_loan_sum = f.lead(f.col('LoanSum')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("next_LoanSum")
prev_loan_sum = f.lag(f.col('LoanSum')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("prev_LoanSum")
print('Output---->')
df.withColumn(
'check',
f.when(
( (f.col('ClosingDate')=='null') & (next_record !='null') ) |
( (f.col('ClosingDate')!='null') & (prev_record =='null') ) , 1).otherwise(0)
).withColumn('looan_sum_check',
f.when( ((next_loan_sum - f.col('LoanSum') ) <=5) | ((f.col('LoanSum') - prev_loan_sum ) <=5) , 1).otherwise(0)
).filter('check=1 and looan_sum_check=1').drop('check','looan_sum_check').show()
Results Output
Input---->
+----+---+------------+-------+-----------+
|Name| ID|ContractDate|LoanSum|ClosingDate|
+----+---+------------+-------+-----------+
| A|ID1| 2022-10-10| 10|2022-10-150|
| A|ID1| 2022-10-15| 15| null|
| A|ID1| 2022-10-30| 20| 2022-11-10|
| B|ID2| 2022-11-11| 15| 2022-10-14|
| B|ID2| 2022-12-10| 30| null|
| B|ID2| 2022-12-12| 35| 2022-12-14|
| C|ID3| 2022-12-19| 19| 2022-11-10|
| D|ID4| 2022-12-10| 10| null|
| D|ID4| 2022-12-12| 40| 2022-11-29|
+----+---+------------+-------+-----------+
Output---->
+----+---+------------+-------+-----------+
|Name| ID|ContractDate|LoanSum|ClosingDate|
+----+---+------------+-------+-----------+
| A|ID1| 2022-10-15| 15| null|
| A|ID1| 2022-10-30| 20| 2022-11-10|
| B|ID2| 2022-12-10| 30| null|
| B|ID2| 2022-12-12| 35| 2022-12-14|
+----+---+------------+-------+-----------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论