使用`GroupBy`在 PySpark 中多次会有什么坏处吗?

huangapple go评论57阅读模式
英文:

Is it bad to use `GroupBy` multiple times in pyspark?

问题

这是一个教育性质的问题。

我有一个包含几个工厂用电记录的文本文件 - 通过唯一标识符 - 进行标识。该文件包含以下列:

factory_id, city, country, date, consumption

其中 date 的格式为 mm/YYYY。我想计算哪些国家拥有少于 20 个城市(包括那些有 0 个城市)的工厂电力消耗连续两年呈下降趋势。这实际上就是该城市工厂的年度总消耗。

为此,我多次使用了 groupBy + agg,如下所示:

import pyspark.sql.functions as F
import pyspark.sql.types as T

df = df.withColumn("year", F.split("Date", "/")[1])

# 计算每个城市的年度消耗
df_consump = df.groupBy("Country", "City", "year").agg(
   F.sum("consumption").alias("consumption")
)

@F.udf(returnType=T.IntegerType())
def had_a_decrease(structs):
   structs = sorted(structs, key=lambda s: s.year)
   # 如果列表单调增长则返回 0,否则返回 1
   cur_cons = pairs[0].consumption
   for struct in structs[1:]:
       cons = struct.consumption
       if cons <= cur_cons:
         return 1
       cur_cons = cons
    
   return 0

df_cons_decrease = df_consump.groupBy("Country", "City").agg(
    had_a_decrease(F.collect_list(F.struct("year", "consumption"))).alias("had_decrease")
)

df_cons_decrease.groupBy("Country").agg(
        F.sum("had_decrease").alias("num_cities_with_decrease")
).filter("num_cities_with_decrease < 20")\
.write.csv(outputFolder)

然而,我在思考:

  • 这种做法是否不好(例如低效)?
  • 对于这个任务,DataFrame 是否比 RDD 更合适?
  • 你是否推荐比多次分组更好的方法?
英文:

This is an educational question.

I have a text file containing several records of power consumption of factories - identified by a unique id -. The file contains the following columns

factory_id, city, country, date, consumption

where date is in the format mm/YYYY. I want to compute which countries have less than 20 cities (including those with 0) which experienced a decrease in factories' consumption in two consecutive years. This is nothing but the total yearly consumption of the factories located in that city.

To do this, I used multiple times a groupBy + agg as follows

import pyspark.sql.functions as F
import pyspark.sql.types as T

df = df.withColumn(&quot;year&quot;, F.split(&quot;Date&quot;, &quot;/&quot;)[1])

# compute for each city the yearly consumption
df_consump = df.groupBy(&quot;Country&quot;, &quot;City&quot;, &quot;year&quot;).agg(
   F.sum(&quot;consumption&quot;).alias(&quot;consumption&quot;)
)


@F.udf(returnType=T.IntegerType())
def had_a_decrease(structs):
   structs = sorted(structs, key=lambda s: s.year)
   # retrieve 0 if list is monotonically growing, 1 otherwise
   cur_cons = pairs[0].consumption
   for struct in structs[1:]:
       cons = struct.consumption
       if cons &lt;= cur_cons:
         return 1
       cur_cons = cons
    
   return 0


df_cons_decrease = df_consump.groupBy(&quot;Country&quot;, &quot;City&quot;).agg(
    # here I collect a list of structs containing (year, consumption)
    # which is needed because collect_list doesn&#39;t guarantee the order
    # is respected so I keep the info on the year to sort this (small)
    # list first in the udf &quot;had_a_decrease&quot; defined above.
    # eventually this yields a column with a 1 if we had a decrease, 0 otherwise,
    # which I sum afterwards.
    had_a_decrease(F.collect_list(F.struct(&quot;year&quot;, &quot;consumption&quot;))).alias(&quot;had_decrease&quot;)
)

df_cons_decrease.groupBy(&quot;Country&quot;).agg(
        F.sum(&quot;had_decrease&quot;).alias(&quot;num_cities_with_decrease&quot;)
).filter(&quot;num_cities_with_decrease &lt; 20&quot;)\
.write.csv(outputFolder)


however I was wondering:

  • is this a bad practice (e.g. inefficient) ?
  • are dataframe better suited than RDDs for this ?
  • would you recommend a better approach than grouping this many times ?

答案1

得分: 1

以下是代码部分的翻译:

data = [
    [1, 1, 1, '01/2022', 100],
    [1, 1, 1, '01/2021', 90],
    [1, 1, 1, '01/2020', 80],
    [1, 1, 2, '01/2022', 100],
    [1, 1, 2, '01/2021', 110],
    [1, 1, 2, '01/2020', 120]
]
cols = ['factory_id', 'city', 'country', 'date', 'consumption']

df = spark.createDataFrame(data, cols) \
  .withColumn('year', f.split('date', '/')[1])

w = Window.partitionBy('country', 'city').orderBy('year')

df.groupBy('country', 'city', 'year') \
  .agg(f.sum('consumption').alias('consumption')) \
  .withColumn('consumption-1', f.lag('consumption', 1).over(w)) \
  .withColumn('consumption-2', f.lag('consumption', 2).over(w)) \
  .withColumn('is_decreased', f.expr('if(`consumption` < `consumption-1` and `consumption-1` < `consumption-2`, true, false)')) \
  .filter('is_decreased = true') \
  .select('country', 'city').distinct() \
  .groupBy('country').count() \
  .filter('count < 20') \
  .select('country') \
  .show()

希望这有帮助。如果您有其他问题或需要进一步的帮助,请随时提问。

英文:

Compare the consumption with the consomption 1 year and 2 year ago by using Window and lag function without udf and then group by.

data = [
    [1, 1, 1, &#39;01/2022&#39;, 100],
    [1, 1, 1, &#39;01/2021&#39;, 90],
    [1, 1, 1, &#39;01/2020&#39;, 80],
    [1, 1, 2, &#39;01/2022&#39;, 100],
    [1, 1, 2, &#39;01/2021&#39;, 110],
    [1, 1, 2, &#39;01/2020&#39;, 120]
]
cols = [&#39;factory_id&#39;, &#39;city&#39;, &#39;country&#39;, &#39;date&#39;, &#39;consumption&#39;]

df = spark.createDataFrame(data, cols) \
  .withColumn(&#39;year&#39;, f.split(&#39;date&#39;, &#39;/&#39;)[1])

w = Window.partitionBy(&#39;country&#39;, &#39;city&#39;).orderBy(&#39;year&#39;)

df.groupBy(&#39;country&#39;, &#39;city&#39;, &#39;year&#39;) \
  .agg(f.sum(&#39;consumption&#39;).alias(&#39;consumption&#39;)) \
  .withColumn(&#39;consumption-1&#39;, f.lag(&#39;consumption&#39;, 1).over(w)) \
  .withColumn(&#39;consumption-2&#39;, f.lag(&#39;consumption&#39;, 2).over(w)) \
  .withColumn(&#39;is_decreased&#39;, f.expr(&#39;if(`consumption` &lt; `consumption-1` and `consumption-1` &lt; `consumption-2`, true, false)&#39;)) \
  .filter(&#39;is_decreased = true&#39;) \
  .select(&#39;country&#39;, &#39;city&#39;).distinct() \
  .groupBy(&#39;country&#39;).count() \
  .filter(&#39;count &lt; 20&#39;) \
  .select(&#39;country&#39;) \
  .show()

+-------+
|country|
+-------+
|      2|
+-------+

huangapple
  • 本文由 发表于 2023年2月19日 00:59:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/75494910.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定