英文:
Join operation equivalent of nested for loop for pyspark?
问题
我明白你的请求,以下是翻译的部分:
问题描述 - 查找客户预算高于产品价格的次数(在客户预算时间戳之前选择最新的产品价格),以及客户预算和产品价格之间的最大差值。
基本上,我需要一个等效于以下Python代码的PySpark查询,我在pandas上运行了下面的代码,对于小数据集,它可以正常工作,但对于大数据集,pandas无法处理。我了解了PySpark,并且了解到它更快,但似乎我们无法在PySpark中编写嵌套循环。
count_instances_budget_more_than_price = 0
map_customer_id_max_delta = {}
processed_product_for_customer = set()
for customer_row in customer_dataset:
max_delta = 0
if customer_id in map_customer_id_max_delta:
max_delta = map_customer_id_max_delta.get(customer_id)
for product_row in product_dataset:
if product_id in map_customer_id_product_id[customer_id]:
if product_id not in processed_product_for_customer:
processed_product_for_customer.add(product_id)
if product_timestamp < customer_timestamp and product_price < customer_budget:
count_instances_budget_more_than_price += 1
max_delta = max(max_delta, customer_budget - product_price)
map_customer_id_max_delta[customer_id] = max_delta
processed_product_for_customer.clear()
请注意,这只是Python代码的翻译,具体的PySpark查询可能需要更多的上下文和适应性,以便在PySpark中实现相同的逻辑。
英文:
I have below tables -
Audit ID | Customer ID | Budget | TimeStamp |
---|---|---|---|
1 | 123 | 100 | 2023-05-01 07:40:56 |
2 | 456 | 70 | 2023-05-01 12:20:50 |
3 | 456 | 70 | 2023-05-01 17:30:50 |
Audit ID | Product ID | Price | TimeStamp |
---|---|---|---|
5 | 5556 | 5 | 2023-05-01 06:40:56 |
6 | 5556 | 90 | 2023-05-01 06:40:56 |
7 | 7778 | 20 | 2023-05-01 12:20:50 |
9 | 7987 | 60 | 2023-05-01 05:50:00 |
10 | 7987 | 50 | 2023-05-04 05:50:00 |
Customer ID | Product ID |
---|---|
123 | 5556 |
123 | 7987 |
456 | 7778 |
456 | 7987 |
Problem statement - Find count where customer budget is more than product price (pick latest product price before customer budget timestamp) and also the max delta between customer budget and product price.
Basically i need query equivalent of below python code for PySpark , i ran below code on pandas and it worked fine for small dataset but for large data set pandas is not able to process it. I came across PySpark and read that its faster but it seems we cannot write nested loop in pyspark.
count_intances_budget_more_than_price = 0;
map_customer_id_max_delta = {}
processed_product_for_customer = new set()
for cusomter_row in customer_dataset:
max_delta = 0
if customer_id in map_customer_id_max_delta:
max_delta = map_customer_id_max_delta.get(customer_id)
for product_row in product_dataset:
if product_id in map_customer_id_product_id[customer_id]:
if product_id not in processed_product_for_customer:
processed_product_for_customer.add(product_id)
if product_timestamp < customer_timestamp and product_price < customer_budget:
count_intances_budget_more_than_price +=1
max_delta = max(max_delta,customer_budget - product_price )
map_customer_id_max_delta[customer_id] = max_delta
processed_product_for_customer.clear()
答案1
得分: 2
以下是您提供的代码的翻译:
# 导入必要的库
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
# 创建名为customerProductDf的数据框
customerProductDf = spark.createDataFrame(
[(123, 5556),
(123, 7987),
(456, 7778),
(456, 7987)],
StructType([
StructField("CustomerId", IntegerType(), True),
StructField("ProductId", IntegerType(), True)
]))
# 创建名为customersDf的数据框
customersDf = spark.createDataFrame(
[(123, 100, datetime.strptime('2023-05-01 07:40:56', '%Y-%m-%d %H:%M:%S')),
(456, 70, datetime.strptime('2023-05-01 12:20:50', '%Y-%m-%d %H:%M:%S')),
(456, 70, datetime.strptime('2023-05-01 17:30:50', '%Y-%m-%d %H:%M:%S'))],
StructType([
StructField("CustomerId", IntegerType(), True),
StructField("Budget", IntegerType(), True),
StructField("TimeStamp", TimestampType(), True)
]))
# 创建名为productsDf的数据框
productsDf = spark.createDataFrame(
[(5556, 5, datetime.strptime('2023-05-01 06:40:56', '%Y-%m-%d %H:%M:%S')),
(5556, 90, datetime.strptime('2023-05-01 05:40:56', '%Y-%m-%d %H:%M:%S')),
(7778, 20, datetime.strptime('2023-05-01 12:20:50', '%Y-%m-%d %H:%M:%S')),
(7987, 60, datetime.strptime('2023-05-01 05:50:00', '%Y-%m-%d %H:%M:%S')),
(7987, 50, datetime.strptime('2023-05-04 05:50:00', '%Y-%m-%d %H:%M:%S'))],
StructType([
StructField("ProductId", IntegerType(), True),
StructField("Price", IntegerType(), True),
StructField("TimeStamp", TimestampType(), True)
]))
# 创建窗口
window = Window.partitionBy(customersDf.CustomerId, productsDf.ProductId).orderBy(productsDf.TimeStamp)
# 创建customerVsDeltaDf数据框
customerVsDeltaDf = customerProductDf \
.join(customersDf, 'CustomerId') \
.join(productsDf, 'ProductId') \
.filter((customersDf.TimeStamp > productsDf.TimeStamp) & (customersDf.Budget > productsDf.Price)) \
.withColumn("LatestPrice", F.last(productsDf.Price).over(window)) \
.drop(productsDf.Price) \
.distinct() \ # 删除任何重复项以不影响计数
.groupBy(customersDf.CustomerId) \
.agg( \
F.count(productsDf.ProductId).alias('Count'), \
F.max(customersDf.Budget - F.col('LatestPrice')).alias('MaxPriceDiff') \
)
# 显示customerVsDeltaDf的结果
customerVsDeltaDf.show()
# 计算总计数并显示
customerVsDeltaDf.agg(F.sum('Count').alias("TotalCount")).show()
请注意,此翻译中已删除了不必要的注释。
英文:
I think you just need to join the 3 tables and aggregate on customer-id and count the number of matched products and calculate the max difference for each customer
Input:
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
customerProductDf = spark.createDataFrame(
[(123, 5556),
(123, 7987),
(456, 7778),
(456, 7987)],
StructType([
StructField("CustomerId", IntegerType(), True),
StructField("ProductId", IntegerType(), True)
]))
customersDf = spark.createDataFrame(
[(123, 100, datetime.strptime('2023-05-01 07:40:56', '%Y-%m-%d %H:%M:%S')),
(456, 70, datetime.strptime('2023-05-01 12:20:50', '%Y-%m-%d %H:%M:%S')),
(456, 70, datetime.strptime('2023-05-01 17:30:50', '%Y-%m-%d %H:%M:%S'))],
StructType([
StructField("CustomerId", IntegerType(), True),
StructField("Budget", IntegerType(), True),
StructField("TimeStamp", TimestampType(), True)
]))
productsDf = spark.createDataFrame(
[(5556, 5, datetime.strptime('2023-05-01 06:40:56', '%Y-%m-%d %H:%M:%S')),
(5556, 90, datetime.strptime('2023-05-01 05:40:56', '%Y-%m-%d %H:%M:%S')),
(7778, 20, datetime.strptime('2023-05-01 12:20:50', '%Y-%m-%d %H:%M:%S')),
(7987, 60, datetime.strptime('2023-05-01 05:50:00', '%Y-%m-%d %H:%M:%S')),
(7987, 50, datetime.strptime('2023-05-04 05:50:00', '%Y-%m-%d %H:%M:%S'))],
StructType([
StructField("ProductId", IntegerType(), True),
StructField("Price", IntegerType(), True),
StructField("TimeStamp", TimestampType(), True)
]))
Calculations:
window = Window.partitionBy(customersDf.CustomerId, productsDf.ProductId).orderBy(productsDf.TimeStamp)
customerVsDeltaDf = customerProductDf \
.join(customersDf, 'CustomerId') \
.join(productsDf, 'ProductId') \
.filter((customersDf.TimeStamp > productsDf.TimeStamp) & (customersDf.Budget > productsDf.Price)) \
.withColumn("LatestPrice", F.last(productsDf.Price).over(window)) \
.drop(productsDf.Price) \
.distinct() \ # Drop any duplicates to not affect the count
.groupBy(customersDf.CustomerId) \
.agg( \
F.count(productsDf.ProductId).alias('Count'), \
F.max(customersDf.Budget - F.col('LatestPrice')).alias('MaxPriceDiff') \
)
Result:
>>> customerVsDeltaDf.show()
+----------+-----+------------+
|CustomerId|Count|MaxPriceDiff|
+----------+-----+------------+
| 456| 3| 50|
| 123| 2| 95|
+----------+-----+------------+
>>> customerVsDeltaDf.agg(F.sum('Count').alias("TotalCount")).show()
+----------+
|TotalCount|
+----------+
| 5|
+----------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论