Pyspark的嵌套for循环的连接操作等效方法是什么?

huangapple go评论74阅读模式
英文:

Join operation equivalent of nested for loop for pyspark?

问题

我明白你的请求,以下是翻译的部分:

问题描述 - 查找客户预算高于产品价格的次数(在客户预算时间戳之前选择最新的产品价格),以及客户预算和产品价格之间的最大差值。

基本上,我需要一个等效于以下Python代码的PySpark查询,我在pandas上运行了下面的代码,对于小数据集,它可以正常工作,但对于大数据集,pandas无法处理。我了解了PySpark,并且了解到它更快,但似乎我们无法在PySpark中编写嵌套循环。

count_instances_budget_more_than_price = 0
map_customer_id_max_delta = {}
processed_product_for_customer = set()

for customer_row in customer_dataset:
  max_delta = 0
  if customer_id in map_customer_id_max_delta:
      max_delta = map_customer_id_max_delta.get(customer_id)
  for product_row in product_dataset:
    if product_id in map_customer_id_product_id[customer_id]:
      if product_id not in processed_product_for_customer:
        processed_product_for_customer.add(product_id)
        if product_timestamp < customer_timestamp and product_price < customer_budget:
          count_instances_budget_more_than_price += 1
          max_delta = max(max_delta, customer_budget - product_price)
  map_customer_id_max_delta[customer_id] = max_delta
  processed_product_for_customer.clear()

请注意,这只是Python代码的翻译,具体的PySpark查询可能需要更多的上下文和适应性,以便在PySpark中实现相同的逻辑。

英文:

I have below tables -

Audit ID Customer ID Budget TimeStamp
1 123 100 2023-05-01 07:40:56
2 456 70 2023-05-01 12:20:50
3 456 70 2023-05-01 17:30:50
Audit ID Product ID Price TimeStamp
5 5556 5 2023-05-01 06:40:56
6 5556 90 2023-05-01 06:40:56
7 7778 20 2023-05-01 12:20:50
9 7987 60 2023-05-01 05:50:00
10 7987 50 2023-05-04 05:50:00
Customer ID Product ID
123 5556
123 7987
456 7778
456 7987

Problem statement - Find count where customer budget is more than product price (pick latest product price before customer budget timestamp) and also the max delta between customer budget and product price.

Basically i need query equivalent of below python code for PySpark , i ran below code on pandas and it worked fine for small dataset but for large data set pandas is not able to process it. I came across PySpark and read that its faster but it seems we cannot write nested loop in pyspark.

count_intances_budget_more_than_price = 0;
map_customer_id_max_delta = {}
processed_product_for_customer = new set()

for cusomter_row in customer_dataset:
  max_delta = 0
  if customer_id in map_customer_id_max_delta:
      max_delta = map_customer_id_max_delta.get(customer_id)
  for product_row in product_dataset:
    if product_id in map_customer_id_product_id[customer_id]:
      if product_id not in processed_product_for_customer:
        processed_product_for_customer.add(product_id)
        if product_timestamp &lt; customer_timestamp and product_price &lt; customer_budget:
          count_intances_budget_more_than_price +=1
          max_delta = max(max_delta,customer_budget - product_price )
  map_customer_id_max_delta[customer_id] = max_delta 
  processed_product_for_customer.clear()

答案1

得分: 2

以下是您提供的代码的翻译:

# 导入必要的库
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

# 创建名为customerProductDf的数据框
customerProductDf = spark.createDataFrame(
    [(123, 5556),
     (123, 7987),
     (456, 7778),
     (456, 7987)],
     StructType([
        StructField("CustomerId", IntegerType(), True), 
        StructField("ProductId", IntegerType(), True)
    ]))

# 创建名为customersDf的数据框
customersDf = spark.createDataFrame(
    [(123, 100, datetime.strptime('2023-05-01 07:40:56', '%Y-%m-%d %H:%M:%S')),
     (456, 70, datetime.strptime('2023-05-01 12:20:50', '%Y-%m-%d %H:%M:%S')),
     (456, 70, datetime.strptime('2023-05-01 17:30:50', '%Y-%m-%d %H:%M:%S'))],
     StructType([
        StructField("CustomerId", IntegerType(), True), 
        StructField("Budget", IntegerType(), True), 
        StructField("TimeStamp", TimestampType(), True)
    ]))

# 创建名为productsDf的数据框
productsDf = spark.createDataFrame(
    [(5556, 5, datetime.strptime('2023-05-01 06:40:56', '%Y-%m-%d %H:%M:%S')),
     (5556, 90, datetime.strptime('2023-05-01 05:40:56', '%Y-%m-%d %H:%M:%S')),
     (7778, 20, datetime.strptime('2023-05-01 12:20:50', '%Y-%m-%d %H:%M:%S')),
     (7987, 60, datetime.strptime('2023-05-01 05:50:00', '%Y-%m-%d %H:%M:%S')),
     (7987, 50, datetime.strptime('2023-05-04 05:50:00', '%Y-%m-%d %H:%M:%S'))],
     StructType([
        StructField("ProductId", IntegerType(), True), 
        StructField("Price", IntegerType(), True), 
        StructField("TimeStamp", TimestampType(), True)
    ]))
# 创建窗口
window = Window.partitionBy(customersDf.CustomerId, productsDf.ProductId).orderBy(productsDf.TimeStamp)

# 创建customerVsDeltaDf数据框
customerVsDeltaDf = customerProductDf \
        .join(customersDf, 'CustomerId') \
        .join(productsDf, 'ProductId') \
        .filter((customersDf.TimeStamp > productsDf.TimeStamp) & (customersDf.Budget > productsDf.Price)) \
        .withColumn("LatestPrice", F.last(productsDf.Price).over(window)) \
        .drop(productsDf.Price) \
        .distinct() \ # 删除任何重复项以不影响计数
        .groupBy(customersDf.CustomerId) \
        .agg( \
            F.count(productsDf.ProductId).alias('Count'), \
            F.max(customersDf.Budget - F.col('LatestPrice')).alias('MaxPriceDiff') \
        )
# 显示customerVsDeltaDf的结果
customerVsDeltaDf.show()

# 计算总计数并显示
customerVsDeltaDf.agg(F.sum('Count').alias("TotalCount")).show()

请注意,此翻译中已删除了不必要的注释。

英文:

I think you just need to join the 3 tables and aggregate on customer-id and count the number of matched products and calculate the max difference for each customer

Input:

from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

customerProductDf = spark.createDataFrame(
    [(123, 5556),
     (123, 7987),
     (456, 7778),
     (456, 7987)],
     StructType([
        StructField(&quot;CustomerId&quot;, IntegerType(), True), 
        StructField(&quot;ProductId&quot;, IntegerType(), True)
    ]))
customersDf = spark.createDataFrame(
    [(123, 100, datetime.strptime(&#39;2023-05-01 07:40:56&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
     (456, 70, datetime.strptime(&#39;2023-05-01 12:20:50&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
     (456, 70, datetime.strptime(&#39;2023-05-01 17:30:50&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;))],
     StructType([
        StructField(&quot;CustomerId&quot;, IntegerType(), True), 
        StructField(&quot;Budget&quot;, IntegerType(), True), 
        StructField(&quot;TimeStamp&quot;, TimestampType(), True)
    ]))
productsDf = spark.createDataFrame(
    [(5556, 5, datetime.strptime(&#39;2023-05-01 06:40:56&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
     (5556, 90, datetime.strptime(&#39;2023-05-01 05:40:56&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
     (7778, 20, datetime.strptime(&#39;2023-05-01 12:20:50&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
     (7987, 60, datetime.strptime(&#39;2023-05-01 05:50:00&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;)),
     (7987, 50, datetime.strptime(&#39;2023-05-04 05:50:00&#39;, &#39;%Y-%m-%d %H:%M:%S&#39;))],
     StructType([
        StructField(&quot;ProductId&quot;, IntegerType(), True), 
        StructField(&quot;Price&quot;, IntegerType(), True), 
        StructField(&quot;TimeStamp&quot;, TimestampType(), True)
    ]))

Calculations:

window = Window.partitionBy(customersDf.CustomerId, productsDf.ProductId).orderBy(productsDf.TimeStamp)
customerVsDeltaDf = customerProductDf \
        .join(customersDf, &#39;CustomerId&#39;) \
        .join(productsDf, &#39;ProductId&#39;) \
        .filter((customersDf.TimeStamp &gt; productsDf.TimeStamp) &amp; (customersDf.Budget &gt; productsDf.Price)) \
        .withColumn(&quot;LatestPrice&quot;, F.last(productsDf.Price).over(window)) \
        .drop(productsDf.Price) \
        .distinct() \ # Drop any duplicates to not affect the count
        .groupBy(customersDf.CustomerId) \
        .agg( \
            F.count(productsDf.ProductId).alias(&#39;Count&#39;), \
            F.max(customersDf.Budget - F.col(&#39;LatestPrice&#39;)).alias(&#39;MaxPriceDiff&#39;) \
        )

Result:

&gt;&gt;&gt; customerVsDeltaDf.show()
+----------+-----+------------+                                                    
|CustomerId|Count|MaxPriceDiff|
+----------+-----+------------+
|       456|    3|          50|
|       123|    2|          95|
+----------+-----+------------+

&gt;&gt;&gt; customerVsDeltaDf.agg(F.sum(&#39;Count&#39;).alias(&quot;TotalCount&quot;)).show()
+----------+
|TotalCount|
+----------+
|         5|
+----------+

huangapple
  • 本文由 发表于 2023年5月22日 17:11:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76304612.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定