Pyspark的嵌套for循环的连接操作等效方法是什么?

huangapple go评论103阅读模式
英文:

Join operation equivalent of nested for loop for pyspark?

问题

我明白你的请求,以下是翻译的部分:

问题描述 - 查找客户预算高于产品价格的次数(在客户预算时间戳之前选择最新的产品价格),以及客户预算和产品价格之间的最大差值。

基本上,我需要一个等效于以下Python代码的PySpark查询,我在pandas上运行了下面的代码,对于小数据集,它可以正常工作,但对于大数据集,pandas无法处理。我了解了PySpark,并且了解到它更快,但似乎我们无法在PySpark中编写嵌套循环。

  1. count_instances_budget_more_than_price = 0
  2. map_customer_id_max_delta = {}
  3. processed_product_for_customer = set()
  4. for customer_row in customer_dataset:
  5. max_delta = 0
  6. if customer_id in map_customer_id_max_delta:
  7. max_delta = map_customer_id_max_delta.get(customer_id)
  8. for product_row in product_dataset:
  9. if product_id in map_customer_id_product_id[customer_id]:
  10. if product_id not in processed_product_for_customer:
  11. processed_product_for_customer.add(product_id)
  12. if product_timestamp < customer_timestamp and product_price < customer_budget:
  13. count_instances_budget_more_than_price += 1
  14. max_delta = max(max_delta, customer_budget - product_price)
  15. map_customer_id_max_delta[customer_id] = max_delta
  16. processed_product_for_customer.clear()

请注意,这只是Python代码的翻译,具体的PySpark查询可能需要更多的上下文和适应性,以便在PySpark中实现相同的逻辑。

英文:

I have below tables -

Audit ID Customer ID Budget TimeStamp
1 123 100 2023-05-01 07:40:56
2 456 70 2023-05-01 12:20:50
3 456 70 2023-05-01 17:30:50
Audit ID Product ID Price TimeStamp
5 5556 5 2023-05-01 06:40:56
6 5556 90 2023-05-01 06:40:56
7 7778 20 2023-05-01 12:20:50
9 7987 60 2023-05-01 05:50:00
10 7987 50 2023-05-04 05:50:00
Customer ID Product ID
123 5556
123 7987
456 7778
456 7987

Problem statement - Find count where customer budget is more than product price (pick latest product price before customer budget timestamp) and also the max delta between customer budget and product price.

Basically i need query equivalent of below python code for PySpark , i ran below code on pandas and it worked fine for small dataset but for large data set pandas is not able to process it. I came across PySpark and read that its faster but it seems we cannot write nested loop in pyspark.

  1. count_intances_budget_more_than_price = 0;
  2. map_customer_id_max_delta = {}
  3. processed_product_for_customer = new set()
  4. for cusomter_row in customer_dataset:
  5. max_delta = 0
  6. if customer_id in map_customer_id_max_delta:
  7. max_delta = map_customer_id_max_delta.get(customer_id)
  8. for product_row in product_dataset:
  9. if product_id in map_customer_id_product_id[customer_id]:
  10. if product_id not in processed_product_for_customer:
  11. processed_product_for_customer.add(product_id)
  12. if product_timestamp &lt; customer_timestamp and product_price &lt; customer_budget:
  13. count_intances_budget_more_than_price +=1
  14. max_delta = max(max_delta,customer_budget - product_price )
  15. map_customer_id_max_delta[customer_id] = max_delta
  16. processed_product_for_customer.clear()

答案1

得分: 2

以下是您提供的代码的翻译:

  1. # 导入必要的库
  2. from datetime import datetime
  3. import pyspark.sql.functions as F
  4. from pyspark.sql.types import *
  5. from pyspark.sql.window import Window
  6. # 创建名为customerProductDf的数据框
  7. customerProductDf = spark.createDataFrame(
  8. [(123, 5556),
  9. (123, 7987),
  10. (456, 7778),
  11. (456, 7987)],
  12. StructType([
  13. StructField("CustomerId", IntegerType(), True),
  14. StructField("ProductId", IntegerType(), True)
  15. ]))
  16. # 创建名为customersDf的数据框
  17. customersDf = spark.createDataFrame(
  18. [(123, 100, datetime.strptime('2023-05-01 07:40:56', '%Y-%m-%d %H:%M:%S')),
  19. (456, 70, datetime.strptime('2023-05-01 12:20:50', '%Y-%m-%d %H:%M:%S')),
  20. (456, 70, datetime.strptime('2023-05-01 17:30:50', '%Y-%m-%d %H:%M:%S'))],
  21. StructType([
  22. StructField("CustomerId", IntegerType(), True),
  23. StructField("Budget", IntegerType(), True),
  24. StructField("TimeStamp", TimestampType(), True)
  25. ]))
  26. # 创建名为productsDf的数据框
  27. productsDf = spark.createDataFrame(
  28. [(5556, 5, datetime.strptime('2023-05-01 06:40:56', '%Y-%m-%d %H:%M:%S')),
  29. (5556, 90, datetime.strptime('2023-05-01 05:40:56', '%Y-%m-%d %H:%M:%S')),
  30. (7778, 20, datetime.strptime('2023-05-01 12:20:50', '%Y-%m-%d %H:%M:%S')),
  31. (7987, 60, datetime.strptime('2023-05-01 05:50:00', '%Y-%m-%d %H:%M:%S')),
  32. (7987, 50, datetime.strptime('2023-05-04 05:50:00', '%Y-%m-%d %H:%M:%S'))],
  33. StructType([
  34. StructField("ProductId", IntegerType(), True),
  35. StructField("Price", IntegerType(), True),
  36. StructField("TimeStamp", TimestampType(), True)
  37. ]))
  1. # 创建窗口
  2. window = Window.partitionBy(customersDf.CustomerId, productsDf.ProductId).orderBy(productsDf.TimeStamp)
  3. # 创建customerVsDeltaDf数据框
  4. customerVsDeltaDf = customerProductDf \
  5. .join(customersDf, 'CustomerId') \
  6. .join(productsDf, 'ProductId') \
  7. .filter((customersDf.TimeStamp > productsDf.TimeStamp) & (customersDf.Budget > productsDf.Price)) \
  8. .withColumn("LatestPrice", F.last(productsDf.Price).over(window)) \
  9. .drop(productsDf.Price) \
  10. .distinct() \ # 删除任何重复项以不影响计数
  11. .groupBy(customersDf.CustomerId) \
  12. .agg( \
  13. F.count(productsDf.ProductId).alias('Count'), \
  14. F.max(customersDf.Budget - F.col('LatestPrice')).alias('MaxPriceDiff') \
  15. )
  1. # 显示customerVsDeltaDf的结果
  2. customerVsDeltaDf.show()
  3. # 计算总计数并显示
  4. customerVsDeltaDf.agg(F.sum('Count').alias("TotalCount")).show()

请注意,此翻译中已删除了不必要的注释。

英文:

I think you just need to join the 3 tables and aggregate on customer-id and count the number of matched products and calculate the max difference for each customer

Input:

  1. from datetime import datetime
  2. import pyspark.sql.functions as F
  3. from pyspark.sql.types import *
  4. from pyspark.sql.window import Window
  5. customerProductDf = spark.createDataFrame(
  6. [(123, 5556),
  7. (123, 7987),
  8. (456, 7778),
  9. (456, 7987)],
  10. StructType([
  11. StructField(&quot;CustomerId&quot;, IntegerType(), True),
  12. StructField(&quot;ProductId&quot;, IntegerType(), True)
  13. ]))
  14. customersDf = spark.createDataFrame(
  15. [(123, 100, datetime.strptime(&#39;2023-05-01 07:40:56&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
  16. (456, 70, datetime.strptime(&#39;2023-05-01 12:20:50&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
  17. (456, 70, datetime.strptime(&#39;2023-05-01 17:30:50&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;))],
  18. StructType([
  19. StructField(&quot;CustomerId&quot;, IntegerType(), True),
  20. StructField(&quot;Budget&quot;, IntegerType(), True),
  21. StructField(&quot;TimeStamp&quot;, TimestampType(), True)
  22. ]))
  23. productsDf = spark.createDataFrame(
  24. [(5556, 5, datetime.strptime(&#39;2023-05-01 06:40:56&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
  25. (5556, 90, datetime.strptime(&#39;2023-05-01 05:40:56&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
  26. (7778, 20, datetime.strptime(&#39;2023-05-01 12:20:50&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
  27. (7987, 60, datetime.strptime(&#39;2023-05-01 05:50:00&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
  28. (7987, 50, datetime.strptime(&#39;2023-05-04 05:50:00&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;))],
  29. StructType([
  30. StructField(&quot;ProductId&quot;, IntegerType(), True),
  31. StructField(&quot;Price&quot;, IntegerType(), True),
  32. StructField(&quot;TimeStamp&quot;, TimestampType(), True)
  33. ]))

Calculations:

  1. window = Window.partitionBy(customersDf.CustomerId, productsDf.ProductId).orderBy(productsDf.TimeStamp)
  2. customerVsDeltaDf = customerProductDf \
  3. .join(customersDf, &#39;CustomerId&#39;) \
  4. .join(productsDf, &#39;ProductId&#39;) \
  5. .filter((customersDf.TimeStamp &gt; productsDf.TimeStamp) &amp; (customersDf.Budget &gt; productsDf.Price)) \
  6. .withColumn(&quot;LatestPrice&quot;, F.last(productsDf.Price).over(window)) \
  7. .drop(productsDf.Price) \
  8. .distinct() \ # Drop any duplicates to not affect the count
  9. .groupBy(customersDf.CustomerId) \
  10. .agg( \
  11. F.count(productsDf.ProductId).alias(&#39;Count&#39;), \
  12. F.max(customersDf.Budget - F.col(&#39;LatestPrice&#39;)).alias(&#39;MaxPriceDiff&#39;) \
  13. )

Result:

  1. &gt;&gt;&gt; customerVsDeltaDf.show()
  2. +----------+-----+------------+
  3. |CustomerId|Count|MaxPriceDiff|
  4. +----------+-----+------------+
  5. | 456| 3| 50|
  6. | 123| 2| 95|
  7. +----------+-----+------------+
  8. &gt;&gt;&gt; customerVsDeltaDf.agg(F.sum(&#39;Count&#39;).alias(&quot;TotalCount&quot;)).show()
  9. +----------+
  10. |TotalCount|
  11. +----------+
  12. | 5|
  13. +----------+

huangapple
  • 本文由 发表于 2023年5月22日 17:11:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76304612.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定